, ,

Automating Data Pipelines : Python-Driven Ingestion into ClickHouse

Reshma M avatar
Automating Data Pipelines : Python-Driven Ingestion into ClickHouse

Modern data systems rely heavily on automating data pipelines to ensure fast, reliable, and repeatable data movement.That’s where ClickHouse + Python becomes a powerful combo. In this blog, we’ll walk through how to automate data ingestion into a distributed ClickHouse cluster using Python, making your pipelines reliable, repeatable, and production-ready.

ClickHouse is a high-performance columnar database designed for analytical workloads. It shines when you need:

  • Blazing-fast queries on large datasets.
  • Horizontal scalability using shards & replicas.
  • Efficient batch & streaming ingestion.
  • Efficient storage with column compression.

Python complements ClickHouse by enabling:

  • Easy automation.
  • Integration with ETL/ML workflows.
  • Better error handling and observability.
  • 2 shards × 2 replicas.
  • Local tables: employee_local (ReplicatedMergeTree).
  • Distributed table: employee_distributed.
  • Inserts always go through the Distributed table.
  • The distributed table automatically routes data across shards.
Python Script → Distributed Table → Local Tables (on each shard).                                     

Before creating a Python virtual environment, ensure your system supports it.

Install Python venv Support (Ubuntu / Debian)
sudo apt update
sudo apt install -y python3-venv

This step is required only once per system,
If skipped, python3 -m venv may fail.

mkdir clickhouse-python-client
cd clickhouse-python-client
python3 -m venv venv
source venv/bin/activate

After activation, your prompt will look like:

(venv) user@machine:~/clickhouse-python-client$

We use the official clickhouse-connect Python client:

pip install --upgrade pip
pip install clickhouse-connect
pip list | grep clickhouse
Expected output: 
clickhouse-connect 0.x.x
clickhouse-python-client/
├── venv/
└── insert_employee.py

Step 4: Python Ingestion Script

Below is a production-style Python script with:

  • Data ingestion and validation
  • Logging
  • Error handling
  • Entry Point
import sys
import logging
import clickhouse_connect
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(name)
CLICKHOUSE_CONFIG = {
"host": "localhost",
"port": 8123,
"username": "default",
"password": "",
"database": "company",
"table": "employee_distributed"
}
def create_clickhouse_client():
  logger.info("Creating ClickHouse client connection")
  return clickhouse_connect.get_client(
    host=CLICKHOUSE_CONFIG["host"],
    port=CLICKHOUSE_CONFIG["port"],
    username=CLICKHOUSE_CONFIG["username"],
    password=CLICKHOUSE_CONFIG["password"]
)
def insert_and_validate(client):
data = [
(21, "AB", "AI"),
(22, "BC", "ML"),
(23, "QA", "DS"),
(24, "LW", "AI"),
(25, "KR", "ML")
]
logger.info("Inserting data into Distributed table")

client.insert(
    table=f"{CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}",
    data=data,
    column_names=["id", "name", "dept"]
)

logger.info("Insert completed. Validating data")

result = client.query(
    f"""
    SELECT count(*)
    FROM {CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}
    """
)

row_count = result.result_rows[0][0]
logger.info(f"Total rows visible in Distributed table: {row_count}")
def main():   
  try:
    logger.info("starting Clickhouse insert job")
    client = create_clickhouse_client()
    insert_and_validate(client)

    logger.info("Job completed successfully")
    sys.exit(0)

  except Exception:
    logger.error("Job failed", exc_info=True)
    sys.exit(1)
if __name__== "__main__":
    main()

Activate the virtual environment and run:

source venv/bin/activate
python insert_employee.py

If everything works, you’ll see logs like:

Screenshot- Successful output

When you insert into the Distributed table:

  • ClickHouse determines the shard based on the sharding key.
  • Data is routed to the correct shard.
  • Each shard replicates data internally.
  • Queries on the distributed table fetch data from all shards.

This means your Python script doesn’t need to worry about cluster complexity, ClickHouse handles it.

  • Always insert via Distributed tables.
  • Use batch inserts for large volumes.
  • Store secrets using env variables.
  • Add retries for network failures.
  • Monitor using system.query_log.
  • Log every pipeline stage.

Although this blog demonstrates ingestion into a distributed ClickHouse cluster, the same Python-based ingestion approach works perfectly with a single-node ClickHouse setup as well. This example uses a cluster setup purely to illustrate how Python integrates with distributed environments, but the ingestion logic is identical for standalone deployments too.

ClickHouse does NOT support traditional ROLLBACK for INSERTs

Unlike transactional databases (like MySQL or PostgreSQL), ClickHouse is designed for analytical workloads, and inserts are append-only. Once data is written, you cannot simply run a ROLLBACK to undo it.

But don’t worry, you still have smart ways to handle mistakes and avoid duplicates.

  • Use a ReplacingMergeTree Table.
  • Use a Version/timestamp Column.
  • insert Using a Deduplication Key.
  • Use ALTER TABLE DELETE (For Rare Corrections).
  • Design idempotent ingestion logic in Python Script.

ClickHouse is built for speed and scale, not transactional rollbacks. Instead of undoing inserts, design your pipelines to be:

  • Idempotent
  • Version-aware
  • Deduplication-friendly

By Automating data pipelines (ingestion) into ClickHouse using Python gives you:

  • Reliable ingestion pipelines
  • Scalable distributed storage
  • High-performance analytics
  • Production-ready observability

This approach bridges the gap between data engineering and analytics, making your systems faster and more reliable.