Modern data systems rely heavily on automating data pipelines to ensure fast, reliable, and repeatable data movement.That’s where ClickHouse + Python becomes a powerful combo. In this blog, we’ll walk through how to automate data ingestion into a distributed ClickHouse cluster using Python, making your pipelines reliable, repeatable, and production-ready.
Why ClickHouse for Data Pipelines?
ClickHouse is a high-performance columnar database designed for analytical workloads. It shines when you need:
- Blazing-fast queries on large datasets.
- Horizontal scalability using shards & replicas.
- Efficient batch & streaming ingestion.
- Efficient storage with column compression.
To Know more about ClickHouse, Check out my previous blog here, ClickHouse OLAP: Why Modern Companies Need OLAP Systems Like ClickHouse.
Python complements ClickHouse by enabling:
- Easy automation.
- Integration with ETL/ML workflows.
- Better error handling and observability.
Architecture Overview
Cluster Setup
- 2 shards × 2 replicas.
- Local tables: employee_local (ReplicatedMergeTree).
- Distributed table: employee_distributed.
- Inserts always go through the Distributed table.
- The distributed table automatically routes data across shards.
Python Script → Distributed Table → Local Tables (on each shard).
Step 1: System Prerequisites (Linux)
Before creating a Python virtual environment, ensure your system supports it.
Install Python venv Support (Ubuntu / Debian)
sudo apt update
sudo apt install -y python3-venv
This step is required only once per system,
If skipped, python3 -m venv may fail.
Step 2: Project Setup
Create Project Directory
mkdir clickhouse-python-client
cd clickhouse-python-client
Create and Activate Virtual Environment
python3 -m venv venv
source venv/bin/activate
After activation, your prompt will look like:
(venv) user@machine:~/clickhouse-python-client$
Step 3: Install Python Dependencies
We use the official clickhouse-connect Python client:
pip install --upgrade pip
pip install clickhouse-connect
Verify installation:
pip list | grep clickhouse
Expected output:
clickhouse-connect 0.x.x
Project Structure
clickhouse-python-client/
├── venv/
└── insert_employee.py
Step 4: Python Ingestion Script
Below is a production-style Python script with:
- Data ingestion and validation
- Logging
- Error handling
- Entry Point
#Imports
import sys
import logging
import clickhouse_connect
#Logging Configuration
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(name)
#ClickHouse Configuration
CLICKHOUSE_CONFIG = {
"host": "localhost",
"port": 8123,
"username": "default",
"password": "",
"database": "company",
"table": "employee_distributed"
}
#Create ClickHouse Client
def create_clickhouse_client():
logger.info("Creating ClickHouse client connection")
return clickhouse_connect.get_client(
host=CLICKHOUSE_CONFIG["host"],
port=CLICKHOUSE_CONFIG["port"],
username=CLICKHOUSE_CONFIG["username"],
password=CLICKHOUSE_CONFIG["password"]
)
#Insert and Validate Data
def insert_and_validate(client):
data = [
(21, "AB", "AI"),
(22, "BC", "ML"),
(23, "QA", "DS"),
(24, "LW", "AI"),
(25, "KR", "ML")
]
logger.info("Inserting data into Distributed table")
client.insert(
table=f"{CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}",
data=data,
column_names=["id", "name", "dept"]
)
logger.info("Insert completed. Validating data")
result = client.query(
f"""
SELECT count(*)
FROM {CLICKHOUSE_CONFIG['database']}.{CLICKHOUSE_CONFIG['table']}
"""
)
row_count = result.result_rows[0][0]
logger.info(f"Total rows visible in Distributed table: {row_count}")
#Entry Point with Error Handling
def main():
try:
logger.info("starting Clickhouse insert job")
client = create_clickhouse_client()
insert_and_validate(client)
logger.info("Job completed successfully")
sys.exit(0)
except Exception:
logger.error("Job failed", exc_info=True)
sys.exit(1)
if __name__== "__main__":
main()
Step 5: Run the Script
Activate the virtual environment and run:
source venv/bin/activate
python insert_employee.py
If everything works, you’ll see logs like:

Screenshot- Successful output
What Happens Behind the Scenes?
When you insert into the Distributed table:
- ClickHouse determines the shard based on the sharding key.
- Data is routed to the correct shard.
- Each shard replicates data internally.
- Queries on the distributed table fetch data from all shards.
This means your Python script doesn’t need to worry about cluster complexity, ClickHouse handles it.
Production Best Practices
- Always insert via Distributed tables.
- Use batch inserts for large volumes.
- Store secrets using env variables.
- Add retries for network failures.
- Monitor using
system.query_log. - Log every pipeline stage.
Note:
Cluster Not Mandatory
Although this blog demonstrates ingestion into a distributed ClickHouse cluster, the same Python-based ingestion approach works perfectly with a single-node ClickHouse setup as well. This example uses a cluster setup purely to illustrate how Python integrates with distributed environments, but the ingestion logic is identical for standalone deployments too.
Handling Mistakes in ClickHouse Inserts
ClickHouse does NOT support traditional ROLLBACK for INSERTs
Unlike transactional databases (like MySQL or PostgreSQL), ClickHouse is designed for analytical workloads, and inserts are append-only. Once data is written, you cannot simply run a ROLLBACK to undo it.
But don’t worry, you still have smart ways to handle mistakes and avoid duplicates.
- Use a ReplacingMergeTree Table.
- Use a Version/timestamp Column.
- insert Using a Deduplication Key.
- Use ALTER TABLE DELETE (For Rare Corrections).
- Design idempotent ingestion logic in Python Script.
ClickHouse is built for speed and scale, not transactional rollbacks. Instead of undoing inserts, design your pipelines to be:
- Idempotent
- Version-aware
- Deduplication-friendly
Conclusion
By Automating data pipelines (ingestion) into ClickHouse using Python gives you:
- Reliable ingestion pipelines
- Scalable distributed storage
- High-performance analytics
- Production-ready observability
This approach bridges the gap between data engineering and analytics, making your systems faster and more reliable.
Reference
Clickhouse Python Driver – https://clickhouse.com/docs/integrations/python
Python client examples – .https://clickhouse.com/docs/knowledgebase/python-clickhouse-connect-example
Advanced inserting with python – https://clickhouse.com/docs/integrations/language-clients/python/advanced-inserting
