, , ,

Designing a Self-Healing Data Monitoring Pipeline with ClickHouse, Airflow, and Grafana

Gayathri avatar
Designing a Self-Healing Data Monitoring Pipeline with ClickHouse, Airflow, and Grafana

Modern data platforms require continuous monitoring to maintain data quality and system stability. In large-scale environments, duplicate records or incomplete processing can silently affect analytics and reporting accuracy. To address this problem, I designed a self-healing monitoring pipeline using ClickHouse, Airflow, and Grafana. This architecture transitions from manual oversight to an automated remediation model. The system independently detects anomalies, executes corrective cleanup logic, and logs granular execution metrics.

By integrating automation with observability, the pipeline improves data reliability. It also reduces manual intervention in production environments.

Data ingestion pipelines commonly face issues such as duplicate entries caused by retries, batch reprocessing, or unexpected system interruptions. If these issues are not detected promptly, they can affect reporting accuracy, increase storage usage, and reduce system trustworthiness.

Traditional monitoring relies on manual checks or delayed reviews. Therefore, issues can remain unnoticed for extended periods.

To improve reliability, the system was designed to:

  • Detect duplicate records in near real time
  • Execute corrective cleanup automatically
  • Record monitoring metrics for analysis
  • Provide visual dashboards
  • Trigger alerts when thresholds are exceeded

The objective was clear: combine detection with automated correction.

The monitoring pipeline operates on a scheduled interval using Airflow. At regular intervals, a monitoring script connects to ClickHouse and evaluates the target table for duplicate records.

When the script detects duplicates, it executes cleanup queries and triggers table optimization to apply deletions immediately. The system then records execution results and metrics in a logging table.

Grafana reads these metrics to visualize system health and data quality trends. Alert rules continuously evaluate the latest metrics, and the system triggers notifications when it detects abnormal conditions.

This workflow establishes an automated cycle of monitoring, correction, logging, and alerting, enabling a self-healing data reliability process.

The monitoring system operates as an automated workflow that validates data, performs cleanup, logs results, and triggers alerts.

System Architecture Diagram

System Architecture of the Monitoring Pipeline

  1. Airflow Scheduler triggers the monitoring job at regular intervals.
  2. The Python script connects to ClickHouse and checks for duplicate records.
  3. If the script detects duplicates, it executes cleanup queries.
  4. The script stores results in the metrics logging table.
  5. Grafana reads metrics for visualization.
  6. Alert rules evaluate the latest values and send Telegram notifications when thresholds are exceeded.

This workflow enables continuous monitoring and automated remediation.

ComponentTechnologyRole
SchedulerAirflowTriggers monitoring jobs
ProcessingPython ScriptPerforms validation and cleanup
DatabaseClickHouseStores data and metrics
VisualizationGrafanaDisplays monitoring dashboards
AlertingTelegram WebhookSends notifications
DeploymentDockerContainerized environment

I created two tables to separate operational data from monitoring metrics..

  • Events Table: Stores incoming records that require validation.
  • Metrics Log Table: Stores monitoring results for each execution cycle, including: check_time, table_name, duplicate_count, detached_parts, status, and deleted_rows.

The system identifies duplicate records using an aggregation query.

SELECT count() FROM (
    SELECT id FROM events GROUP BY id HAVING count() > 1);

This query detects identifiers that appear more than once in the dataset.

When duplicates are detected, the script executes automated cleanup logic:

  1. Delete duplicate records
  2. Force table optimization
  3. Record number of rows removed
OPTIMIZE TABLE events FINAL;

This step is required because deletions in ClickHouse are asynchronous. Table optimization ensures that cleanup is applied immediately.

Cleanup Execution Output Showing Duplicate Detection and Removal

Each monitoring run stores execution results in the metrics_log table. This table records duplicate counts, cleanup activity, system status, and timestamps for every cycle.

This structured logging enables historical tracking, easier debugging, and real-time observability without manually checking logs.

Monitoring Metrics Stored in metrics_log Table

The monitoring workflow is automated using an Airflow DAG. The system consists of two components:

  1. DAG definition file: controls scheduling and execution
  2. Monitoring script: performs duplicate detection and remediation logic

monitor.py

Python

from clickhouse_driver import Client
from datetime import datetime

# connection
client = Client(
   host="ch_monitor",
   port=9000,
   user="default",
   password="gayathri",
   database="monitor_db")

# duplicate check
duplicate_query = """
SELECT count()
FROM (
   SELECT id
   FROM events
   GROUP BY id
   HAVING count() > 1
)
"""
duplicate_count = client.execute(duplicate_query)[0][0]
original_duplicates = duplicate_count
deleted_rows = 0 

# self-healing: delete duplicates if found
if duplicate_count > 0:
   print("Duplicates detected starting cleanup")
   before_count = client.execute("SELECT count() FROM events")[0][0]
   delete_query = """
   ALTER TABLE events DELETE WHERE id IN (
       SELECT id
       FROM events
       GROUP BY id
       HAVING count() > 1
   )
   """
   client.execute(delete_query)
   client.execute("OPTIMIZE TABLE events FINAL")
   after_count = client.execute("SELECT count() FROM events")[0][0]
   deleted_rows = before_count - after_count
   print("Duplicate rows removed:", deleted_rows)

# detached parts check
detached_query = "SELECT count() FROM system.detached_parts"
detached_count = client.execute(detached_query)[0][0]

# status logic
status = "OK"
if duplicate_count > 0:
   status = "CLEANED"
elif detached_count > 0:
   status = "WARNING"

# insert metrics
insert_query = """
INSERT INTO metrics_log
(check_time, table_name, duplicate_count, detached_parts, status, deleted_rows)
VALUES
"""
client.execute(insert_query, [(
       datetime.now(),
       "events",
       original_duplicates,
       detached_count,
       status,
       deleted_rows if duplicate_count > 0 else 0
   ) ])

monitor_dag.py

Python

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="clickhouse_monitor",
    start_date=datetime(2024,1,1),
    schedule_interval="*/10 * * * *",
    catchup=False
) as dag:    
    run_script = BashOperator(
        task_id="run_monitor_script",
        bash_command="python /opt/airflow/dags/monitor.py"
    )

Airflow DAG execution success view

This automation ensures that monitoring tasks run consistently, detect issues promptly, and apply corrective actions without manual intervention.

Grafana connects to ClickHouse using the official data source plugin. I configured the connection using the database host, port, credentials, and target database.

After the connection is established, I created a time-series dashboard using the metrics_log table. The query retrieves duplicate counts based on execution timestamps, allowing trends to be visualized over time.

This dashboard enables:

  • Duplicate tracking – Identifies when data issues occur
  • Cleanup monitoring – Verifies the effectiveness of remediation
  • Trend analysis – Detects recurring pipeline anomalies
  • Real-time observability – Provides instant visibility into system health

This integration converts raw monitoring metrics into structured visual dashboards, allowing faster diagnosis and simplified operational analysis.

Grafana dashboard showing duplicate metrics graph

I configured an alert rule in Grafana to monitor duplicate counts from the metrics table. When the value exceeds the defined threshold, Grafana sends a notification automatically through a Telegram webhook.

Creating the Telegram Bot To enable these notifications, I used Telegram’s BotFather to create a dedicated alert bot. The process included:

  1. Initiating a chat with @BotFather on Telegram.
  2. Using the /newbot command and following the prompts to set a display name and a unique username.
  3. Receiving the API Token, which acts as the authentication key for Grafana.
  4. Starting a conversation with the new bot to enable message delivery.

Alert Rule Configuration : The configuration defines the query and the specific conditions used to trigger alerts. For this pipeline, any duplicate_count > 0 for a sustained period of 5 minutes triggers an incident.

Alert State History : Grafana maintains a detailed log of alert transitions. This view displays states such as Pending (condition met but duration not reached), Alerting (notification dispatched), and Normal (issue resolved by the self-healing script).

Telegram Alert Notification : When a monitoring threshold is breached, The system delivers a real-time message directly to the Telegram chat. This message contains critical details about the table name and the severity of the duplication, ensuring immediate awareness.

This alerting setup ensures immediate detection and notification of data quality issues, allowing for full transparency even as the system automatically remediates the problem.

Alert State History

Telegram Alert message received

Building the automated ClickHouse monitoring pipeline revealed several practical challenges:

  • Asynchronous Deletions: ClickHouse deletions are asynchronous. Running table optimization after deletion is essential to ensure immediate consistency.
  • Accurate Duplicate Metrics: Always capture anomaly metrics before remediation to ensure the logs reflect the actual issues found.
  • Grafana Alert Configuration: Alert templates must match query outputs exactly to ensure successful notifications.
  • Docker Networking: Validate container networking and dependencies early to ensure seamless service communication.

This architecture can be applied across multiple production scenarios:

  • Data warehouse integrity monitoring
  • Streaming pipeline validation
  • Log ingestion systems
  • Financial transaction platforms
  • Production data engineering pipelines

Any environment that depends on reliable analytical data can benefit from automated validation and remediation.

This implementation demonstrates how open-source technologies can be strategically integrated to build a reliable and automated data monitoring framework. By combining ClickHouse for high-performance storage, Airflow for orchestration, Grafana for visualization, and Telegram for alerting, the system delivers a complete and structured monitoring lifecycle. The architecture not only detects anomalies but also performs automated remediation, logs operational metrics, and provides real-time visibility. As a result, it reduces manual intervention while strengthening data reliability and system stability. Overall, this approach reflects core data engineering principles centered on automation, observability, and operational resilience in production environments.