All posts
Build a Replicated ClickHouse® Cluster on Kubernetes with the Altinity® Operator

Build a Replicated ClickHouse® Cluster on Kubernetes with the Altinity® Operator

May 22, 20266 min readMohamed Hussain S
Share:

This is the eighth article in our series on running the ClickHouse® database on Kubernetes with the Altinity® Kubernetes Operator. We have deployed Keeper and connected a cluster to it. Now we make data actually replicate, which is the whole point of running more than one node.

What replication gives you

Replication keeps identical copies of your data on multiple servers. If one server fails, the others still hold the data and keep serving queries, so you get fault tolerance and higher read throughput. In ClickHouse, replication is chosen per table by using a replicated table engine, and the replicas coordinate through Keeper.

Two terms matter. A replica is a full copy of the same data. A shard is a slice of the data; different shards hold different rows. This article uses one shard with two replicas, so both nodes hold the same complete dataset. We add shards in the next article.

Step 1: Deploy Keeper and a two-replica cluster

We combine everything from the last two articles into one file. It deploys a single-node Keeper and a one-shard, two-replica ClickHouse cluster that references it. Save it as replicated.yaml:

apiVersion: "clickhouse-keeper.altinity.com/v1"
kind: "ClickHouseKeeperInstallation"
metadata:
  name: keeper
spec:
  configuration:
    clusters:
      - name: keeper
        layout:
          replicasCount: 1
    settings:
      keeper_server/tcp_port: "2181"
  defaults:
    templates:
      volumeClaimTemplate: keeper-data
  templates:
    podTemplates:
      - name: keeper-pod
        spec:
          containers:
            - name: clickhouse-keeper
              image: clickhouse/clickhouse-keeper:26.3
    volumeClaimTemplates:
      - name: keeper-data
        spec:
          accessModes: ["ReadWriteOnce"]
          resources:
            requests:
              storage: 1Gi
---
apiVersion: "clickhouse.altinity.com/v1"
kind: "ClickHouseInstallation"
metadata:
  name: "ch"
spec:
  configuration:
    zookeeper:
      keeper:
        name: keeper
    users:
      analyst/password: analyst_password
      analyst/networks/ip:
        - 0.0.0.0/0
    clusters:
      - name: "main"
        layout:
          shardsCount: 1
          replicasCount: 2
  defaults:
    templates:
      dataVolumeClaimTemplate: data-volume
  templates:
    podTemplates:
      - name: clickhouse-pod
        spec:
          containers:
            - name: clickhouse
              image: clickhouse/clickhouse-server:26.3
    volumeClaimTemplates:
      - name: data-volume
        spec:
          accessModes: ["ReadWriteOnce"]
          resources:
            requests:
              storage: 5Gi

Apply it and wait for both resources to finish:

kubectl create namespace ch
kubectl apply -n ch -f replicated.yaml
kubectl get chk,chi -n ch -w

When the CHI reports Completed, list the pods:

kubectl get pods -n ch

You will see one Keeper pod and two ClickHouse pods, named chi-ch-main-0-0-0 and chi-ch-main-0-1-0. Those are replica 0 and replica 1 of shard 0.

Step 2: Understand the operator's macros

Before creating a replicated table, you need to know about macros. Every ClickHouse pod the operator creates is given a small set of substitution variables, filled in per pod: {installation} is the CHI name, {cluster} is the cluster name, {shard} is which shard this pod belongs to, and {replica} identifies this specific replica. These let you write one table definition that every pod interprets correctly for itself. You do not define these; the operator injects them.

Step 3: Create a replicated table on the whole cluster

We use ON CLUSTER '{cluster}' so the statement runs on every node at once, and the ReplicatedMergeTree engine so the table's data is kept in sync through Keeper. Open a client on the first replica:

kubectl exec -it -n ch chi-ch-main-0-0-0 -- \
  clickhouse-client -u analyst --password analyst_password

Then create the table:

CREATE TABLE events ON CLUSTER '{cluster}'
(
    event_time DateTime,
    user_id    UInt64,
    action     String
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/{installation}/{cluster}/tables/{shard}/{database}/{table}',
    '{replica}'
)
ORDER BY (event_time, user_id);

The path argument uses the macros so each shard gets a unique path in Keeper, and {replica} tells each node which replica it is. Because of ON CLUSTER, the table is created on both replicas in one command. Modern ClickHouse also accepts a bare ENGINE = ReplicatedMergeTree with no arguments, deriving the path from server defaults the operator configures, but writing the macros out makes what is happening visible.

Step 4: Watch data replicate

Still on replica 0, insert some rows:

INSERT INTO events
SELECT now() - number,
       number % 1000,
       ['view', 'click', 'purchase'][number % 3 + 1]
FROM numbers(100000);
 
SELECT count() FROM events;

Replica 0 reports 100,000 rows. Now leave this client (exit) and open one on the other replica:

kubectl exec -it -n ch chi-ch-main-0-1-0 -- \
  clickhouse-client -u analyst --password analyst_password -q "SELECT count() FROM events"

It also reports 100,000 rows, even though you never inserted anything there. Keeper coordinated the replication and replica 1 pulled the data automatically. That is replication working: write to any replica, read the same data from any other.

Step 5: Query the cluster with a Distributed table

A ReplicatedMergeTree table lives on each node. To query the cluster as one logical table, especially once you have several shards, you add a Distributed table that fans queries out and merges the results. Create it on the cluster:

CREATE TABLE events_all ON CLUSTER '{cluster}' AS events
ENGINE = Distributed('{cluster}', default, events, rand());

This defines events_all as a view over the events tables across the cluster, using the {cluster} topology, the default database, the local events table, and rand() to spread writes. Query it like any table:

SELECT action, count() FROM events_all GROUP BY action ORDER BY action;

With one shard the Distributed table simply reads from the replicas, but the same definition will automatically span every shard once we add them in the next article. Writing your queries against the Distributed table now means they keep working unchanged as the cluster grows.

What the operator did for you

Step back and notice what you did not do. You did not write peer addresses, raft configuration, remote_servers XML, or per-pod identities. You described one shard and two replicas, pointed at a Keeper, and the operator generated the cluster topology, the macros, and the coordination. Replication, the hardest part of running ClickHouse, became a few lines of YAML and one CREATE TABLE.

Clean up

kubectl delete namespace ch

What is next

You have a fault-tolerant, replicated cluster. In the next article we scale out: add shards to spread data across more nodes, spread replicas across availability zones, and use anti-affinity so two copies of the same data never land on the same machine.

References

Work with Quantrail

Expert ClickHouse services

We design, migrate, tune, and run ClickHouse for teams that own their data, from first architecture through day-two operations. Tell us what you are building and we will help.

Talk to an expert

Manage ClickHouse with CHOps

CHOps is our free, open-source ClickHouse admin tool: monitoring, query profiling, backups, visual access control, and alerting in one self-hosted interface, with zero agents on your servers.

Explore CHOps
Share: