Nova Kwok's Awesome Blog

A Quick Note on Setting Up a Qdrant Cluster on Hetzner with Docker and Migrating Data

Anyone who has worked on RAG-related requirements would know that as your data volume grows, you not only need to address the issue of increasingly poor recall, but you’ll also notice that your vector database keeps expanding until it eventually hits your memory limit. At that point, a large-memory monolithic server is no longer sufficient to meet availability and performance needs (since a single machine will always have a memory ceiling, can’t perform smooth rolling upgrades of the cluster version, and high-memory machines are generally more expensive than multiple smaller ones). For example, there was a recent requirement to migrate a relatively large standalone Qdrant instance to a clustered deployment mode, which led to this brief note.

Realized it’s been almost a year since I last wrote a blog post with some technical content…

Original Standalone Qdrant Deployment

The existing Qdrant deployment is very simple—a standalone docker-compose.yml file, as shown below:

  qdrant:
    image: qdrant/qdrant:v1.14.0
    restart: always
    ports:
      - 6333:6333
      - 6334:6334
    volumes:
      - ./volumes/qdrant:/qdrant/storage

This Qdrant instance runs on a Hetzner CCX53 machine with 32 cores and 128GB of memory. The qdrant directory is about 200GB in size, with over 21,000,000 vectors.

{
  "result": {
    "status": "yellow",
    "optimizer_status": "ok",
    "indexed_vectors_count": 21297932,
    "points_count": 21337945,
    "segments_count": 10,
  }
}

Cluster Environment

Basic Configuration

Here we need to explain the machines used for the cluster. Since Qdrant uses Raft as the consensus protocol, our deployment should involve at least 3 machines. We start with 3 machines in our initial setup, deployed in a test environment on Hetzner, with the following IPs:

  • 10.0.0.6
  • 10.0.0.7
  • 10.0.0.9

Note: These are just for demonstration purposes. If you’re also using Hetzner, you can quickly deploy a Qdrant cluster using Cloud init with internal IPs, as described at the end of this article.

Since we’re using Docker for deployment, we only need to create a docker-compose.yml file on 10.0.0.6 with the following content:

services:
  qdrant_node1:
    image: qdrant/qdrant:v1.14.0
    restart: always
    volumes:
      - ./qdrant_storage:/qdrant/storage
    ports:
      - "6333:6333"
      - "6334:6334"
      - "6335:6335"
    environment:
      QDRANT__CLUSTER__ENABLED: "true"
    command: "./qdrant --uri http://10.0.0.6:6335"

On 10.0.0.7, create a docker-compose.yml file with the following content:

services:
  qdrant_node2:
    image: qdrant/qdrant:v1.14.0
    ports:
      - "6333:6333"
      - "6334:6334"
      - "6335:6335"
    volumes:
      - ./qdrant_storage:/qdrant/storage
    environment:
      QDRANT__CLUSTER__ENABLED: "true"
    command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://10.0.0.7:6335"

On 10.0.0.9, create a docker-compose.yml file with the following content:

services:
  qdrant_node3:
    image: qdrant/qdrant:v1.14.0
    ports:
      - "6333:6333"
      - "6334:6334"
      - "6335:6335"
    volumes:
      - ./qdrant_storage:/qdrant/storage
    environment:
      QDRANT__CLUSTER__ENABLED: "true"
    command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://10.0.0.9:6335"

Then run docker-compose up -d on each machine to start the services.

Once all nodes are up, you can access the cluster status by visiting http://localhost:6333/cluster on any machine. For example:

{
  "result": {
    "status": "enabled",
    "peer_id": 5395257186314509,
    "peers": {
      "3095816753490206": {
        "uri": "http://10.0.0.9:6335/"
      },
      "5395257186314509": {
        "uri": "http://10.0.0.6:6335/"
      },
      "4182395837949771": {
        "uri": "http://10.0.0.7:6335/"
      }
    },
    "raft_info": {
      "term": 1,
      "commit": 41,
      "pending_operations": 0,
      "leader": 5395257186314509,
      "role": "Leader",
      "is_voter": true
    },
    "consensus_thread_status": {
      "consensus_thread_status": "working",
      "last_update": "2025-05-17T02:31:10.703071457Z"
    },
    "message_send_failures": {}
  },
  "status": "ok",
  "time": 0.000011782
}

Creating a Collection

After setting up a distributed Qdrant cluster, as described in the official documentation at https://qdrant.tech/documentation/guides/distributed_deployment/#making-use-of-a-new-distributed-qdrant-cluster, we can see:

When you enable distributed mode and scale up to two or more nodes, your data does not move to the new node automatically; it starts out empty. To make use of your new empty node, do one of the following:

  • Create a new replicated collection by setting the replication_factor to 2 or more and setting the number of shards to a multiple of your number of nodes.
  • If you have an existing collection which does not contain enough shards for each node, you must create a new collection as described in the previous bullet point.
  • If you already have enough shards for each node and you merely need to replicate your data, follow the directions for creating new shard replicas.
  • If you already have enough shards for each node and your data is already replicated, you can move data (without replicating it) onto the new node(s) by moving shards.

Since the current Qdrant instance we have is a single node, the collection on it has only a single shard. The collection configuration is as follows:

{
  "params": {
    "vectors": {
      "size": 1536,
      "distance": "Cosine",
      "on_disk": true
    },
    "shard_number": 1,
    "replication_factor": 1,
    "write_consistency_factor": 1,
    "on_disk_payload": true
  },
}
...

So the first step is to create a new collection on the new cluster with the same parameters as the current collection, except that shard_number and replication_factor need to be modified.

Just like ClickHouse, creating a cluster doesn’t automatically mean your data is distributed and replicated—you have to manually specify your data.

For the settings of the two parameters above, the official documentation recommends:

If you anticipate a lot of growth, we recommend 12 shards since you can expand from 1 node up to 2, 3, 6, and 12 nodes without having to re-shard. Having more than 12 shards in a small cluster may not be worth the performance overhead.

“Anticipate a lot of growth” sounds very appropriate for our scenario, so here I create a new collection with shard_number set to 12 and replication_factor set to 2. The relevant script is as follows:

from qdrant_client import QdrantClient
import qdrant_client.http.models as models

collection_name = "new_collection"

client = QdrantClient(
    host="10.0.0.6", port=6333
)

vectors_config = models.VectorParams(
    size=1536, distance=models.Distance.COSINE, on_disk=True
)
hnsw_config = HnswConfigDiff(
    m=0,
    payload_m=16,
    ef_construct=100,
    full_scan_threshold=10000,
    max_indexing_threads=100,
    on_disk=True,
)
client.create_collection(
    collection_name=collection_name,
    vectors_config=vectors_config,
    shard_number=12,
    replication_factor= 2,
    hnsw_config=hnsw_config
)

Migrating Data

Once the new cluster is created, we need to import the data from the existing cluster. Initially, I tried to create a snapshot on the original cluster and import it into the new cluster, but encountered the following error:

{"status":{"error":"Wrong input: Snapshot is not compatible with existing collection: Collection shard number: 3 Snapshot shard number: 1"},"time":1107.142566774}

Here we need to use a Beta version tool provided by Qdrant to perform the migration: https://github.com/qdrant/migration/

Usage is as follows:

docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration qdrant \
    --source-url 'http://localhost:6334' \
    --source-collection 'new_collection' \
    --target-url 'http://10.0.0.6:6334' \
    --target-collection 'new_collection'

Note: registry.cloud.qdrant.io/library/qdrant-migration is an older version. It’s recommended to build the image manually using the latest code.

It’s also recommended to manually patch the grpc.MaxCallRecvMsgSize parameter and increase the batch-size (default is 50, can be increased to 20000) to achieve faster import speed. Related issue: https://github.com/qdrant/migration/issues/30#issuecomment-2876456943

To ensure maximum import speed, you can refer to this article: https://qdrant.tech/articles/indexing-optimization/#2-disable-hnsw-for-dense-vectors-m0, and disable hnsw_config on the new cluster, while setting a reasonable indexing_threshold, for example:

PATCH /collections/your_collection
{
  "hnsw_config": {
    "m": 0
  },
  "optimizer_config": {
    "indexing_threshold": 10000
  }
}

The former ensures that HNSW indexes are not built during import, and the latter ensures that vectors are flushed to disk when reaching 10,000 to prevent memory overflow due to vector accumulation.

After the import is completed, HNSW can be re-enabled to build the index.

Cluster Information After Migration

To conveniently observe the Shard information on each node, we can use the /collections/<collection_name>/cluster API, for example, it returns the following response:

{"result":{"peer_id":5395257186314509,"shard_count":12,"local_shards":[{"shard_id":1,"points_count":1794606,"state":"Active"},{"shard_id":2,"points_count":1450924,"state":"Active"},{"shard_id":4,"points_count":1902963,"state":"Active"},{"shard_id":5,"points_count":1774613,"state":"Active"},{"shard_id":7,"points_count":1753521,"state":"Active"},{"shard_id":8,"points_count":1687892,"state":"Active"},{"shard_id":10,"points_count":1477543,"state":"Active"},{"shard_id":11,"points_count":2051536,"state":"Active"}],"remote_shards":[{"shard_id":0,"peer_id":4182395837949771,"state":"Active"},{"shard_id":0,"peer_id":3095816753490206,"state":"Active"},{"shard_id":1,"peer_id":4182395837949771,"state":"Active"},{"shard_id":2,"peer_id":3095816753490206,"state":"Active"},{"shard_id":3,"peer_id":3095816753490206,"state":"Active"},{"shard_id":3,"peer_id":4182395837949771,"state":"Active"},{"shard_id":4,"peer_id":4182395837949771,"state":"Active"},{"shard_id":5,"peer_id":3095816753490206,"state":"Active"},{"shard_id":6,"peer_id":3095816753490206,"state":"Active"},{"shard_id":6,"peer_id":4182395837949771,"state":"Active"},{"shard_id":7,"peer_id":4182395837949771,"state":"Active"},{"shard_id":8,"peer_id":3095816753490206,"state":"Active"},{"shard_id":9,"peer_id":3095816753490206,"state":"Active"},{"shard_id":9,"peer_id":4182395837949771,"state":"Active"},{"shard_id":10,"peer_id":4182395837949771,"state":"Active"},{"shard_id":11,"peer_id":3095816753490206,"state":"Active"}],"shard_transfers":[]},"status":"ok","time":0.00011113}

But… this is really not very intuitive. So we need to write a small tool to easily check the distribution of Shards across each node (Peer).

Let’s go with Python!

import requests

base_url = "http://10.0.0.6:6333"

cluster_endpoint = "/cluster"
collections_endpoint = "/collections/<collection_name>/cluster"

def get_data_from_api(endpoint):
    response = requests.get(base_url + endpoint)
    return response.json()

def parse_cluster_peers(cluster_data):
    peers = cluster_data.get("result", {}).get("peers", {})
    ip_peer_map = {}
    for peer_id, peer_info in peers.items():
        uri = peer_info.get("uri", "")
        ip_address = uri.split("//")[-1].split(":")[0]
        ip_peer_map[ip_address] = int(peer_id)
    return ip_peer_map


def parse_shards(collections_data):
    local_shards = collections_data.get("result", {}).get("local_shards", [])
    remote_shards = collections_data.get("result", {}).get("remote_shards", [])

    peer_shard_map = {}

    for shard in local_shards:
        peer_id = collections_data.get("result", {}).get("peer_id")
        shard_id = shard.get("shard_id")
        peer_shard_map.setdefault(peer_id, []).append(shard_id)

    for shard in remote_shards:
        peer_id = shard.get("peer_id")
        shard_id = shard.get("shard_id")
        peer_shard_map.setdefault(peer_id, []).append(shard_id)

    return peer_shard_map

def main():
    cluster_data = get_data_from_api(cluster_endpoint)
    collections_data = get_data_from_api(collections_endpoint)

    ip_peer_map = parse_cluster_peers(cluster_data)

    peer_shard_map = parse_shards(collections_data)

    ip_shard_map = {}
    for ip, peer_id in ip_peer_map.items():
        if peer_id in peer_shard_map:
            ip_shard_map[ip] = peer_shard_map[peer_id]
        else:
            ip_shard_map[ip] = []

    for ip, shard_ids in ip_shard_map.items():
        peer_id = ip_peer_map[ip]
        print(f"IP: {ip}, Peer ID: {peer_id}, Shard IDs: {shard_ids}")

if __name__ == "__main__":
    main()

Once we run the script, we can easily see the Shard distribution on each node:

IP: 10.0.0.7, Peer ID: 4182395837949771, Shard IDs: [0, 1, 3, 4, 6, 7, 9, 10]
IP: 10.0.0.6, Peer ID: 5395257186314509, Shard IDs: [1, 2, 4, 5, 7, 8, 10, 11]
IP: 10.0.0.9, Peer ID: 3095816753490206, Shard IDs: [0, 2, 3, 5, 6, 8, 9, 11]

We can see that all the Shards are evenly distributed across the three machines. At this point, if any single machine goes offline or is damaged, it won’t lead to complete loss of any Shard replica, so no data loss will occur.

Scaling Out

The “ROSE” strategy is designed to ensure that scaling is both effective and safe. It consists of four stages: Resuscitation, Optimization, Stabilization, and Evacuation.

Suppose our business keeps growing and three nodes are no longer sufficient. We’ll need to scale out. Since we initially set shard_number to 12, we can scale in multiples of 3. Now we have 3 nodes, so we add 3 more. The IPs of the new nodes are:

  • 10.0.0.10
  • 10.0.0.11
  • 10.0.0.12

Creating them is the same as before: just set each node’s --url and --bootstrap http://10.0.0.6:6335, and after they join, the /cluster endpoint responds as follows:

{
  "result": {
    "status": "enabled",
    "peer_id": 5395257186314509,
    "peers": {
      "3095816753490206": {
        "uri": "http://10.0.0.9:6335/"
      },
      "4182395837949771": {
        "uri": "http://10.0.0.7:6335/"
      },
      "3841618339255269": {
        "uri": "http://10.0.0.10:6335/"
      },
      "3658649898688837": {
        "uri": "http://10.0.0.12:6335/"
      },
      "5395257186314509": {
        "uri": "http://10.0.0.6:6335/"
      },
      "8689864553665627": {
        "uri": "http://10.0.0.11:6335/"
      }
    },
    "raft_info": {
      "term": 1,
      "commit": 50,
      "pending_operations": 0,
      "leader": 5395257186314509,
      "role": "Leader",
      "is_voter": true
    },
    "consensus_thread_status": {
      "consensus_thread_status": "working",
      "last_update": "2025-05-17T02:49:29.351230053Z"
    },
    "message_send_failures": {}
  },
  "status": "ok",
  "time": 0.000011121
}

Now, if you’re a seasoned GlusterFS user, your first instinct might be to run the following command to rebalance the data:

gluster volume rebalance VOLNAME start

Unfortunately, the open-source version of Qdrant doesn’t support this (though it’s available in their Cloud service):

It’s worth mentioning that Qdrant only provides the necessary building blocks to create an automated failure recovery. Building a completely automatic process of collection scaling would require control over the cluster machines themself. Check out our cloud solution, where we made exactly that.

Shards are evenly distributed across all existing nodes when a collection is first created, but Qdrant does not automatically rebalance shards if your cluster size or replication factor changes (since this is an expensive operation on large clusters). See the next section for how to move shards after scaling operations.

— From: https://qdrant.tech/documentation/guides/distributed_deployment/#choosing-the-right-number-of-shard

At this point, running the script again reveals:

IP: 10.0.0.6, Peer ID: 5395257186314509, Shard IDs: [1, 2, 4, 5, 7, 8, 10, 11]
IP: 10.0.0.9, Peer ID: 3095816753490206, Shard IDs: [0, 2, 3, 5, 6, 8, 9, 11]
IP: 10.0.0.12, Peer ID: 3658649898688837, Shard IDs: []
IP: 10.0.0.7, Peer ID: 4182395837949771, Shard IDs: [0, 1, 3, 4, 6, 7, 9, 10]
IP: 10.0.0.11, Peer ID: 8689864553665627, Shard IDs: []
IP: 10.0.0.10, Peer ID: 3841618339255269, Shard IDs: []

The newly added nodes are just idle — all Shards still reside on the old Peers. So, what should we do next?

Rebalancing Like Catching Rain?

Since the official documentation has provided an API to move shards:

curl -X POST http://localhost:6333/collections/collection_name/cluster \
     -H "api-key: <apiKey>" \
     -H "Content-Type: application/json" \
     -d '{
  "move_shard": {
    "shard_id": 1,
    "to_peer_id": 1000000,
    "from_peer_id": 1000000
  }
}'

A natural solution comes to mind — manually rebalance the shards. First, we calculate how many shards each node (peer) should have. In this scenario:

(Total Shards * Number of Replicas) / Number of Machines

That is, (12*2)/6 = 4

Then we can determine which peers are overfilled and which are underfilled. We can compute a “migration path” — redistribute from the rich to the poor:

⚠️ Important: Do not assign two replicas of the same shard to the same peer! If that peer fails, you lose the data for that shard.

underfilled_peers = []
for peer_id, shard_ids in peer_shard_map.items():
    if len(shard_ids) < average_shards_per_peer:
        underfilled_peers.append(peer_id)

overfilled_peers = []
for peer_id, shard_ids in peer_shard_map.items():
    if len(shard_ids) > average_shards_per_peer:
        overfilled_peers.append(peer_id)

print("underfilled_peers")
print(underfilled_peers)
print("overfilled_peers")
print(overfilled_peers)

rebalance_operations = []
for overfilled_peer in overfilled_peers:
    for underfilled_peer in underfilled_peers:
        for overfilled_peer_shard in peer_shard_map[overfilled_peer]:
            if (len(peer_shard_map[underfilled_peer]) < average_shards_per_peer and 
                overfilled_peer_shard not in peer_shard_map[underfilled_peer] and 
                len(peer_shard_map[overfilled_peer]) > average_shards_per_peer):
                
                print(f"Moving shard_id {overfilled_peer_shard} from peer_id {overfilled_peer} to peer_id {underfilled_peer}")
                rebalance_operations.append((overfilled_peer, underfilled_peer, overfilled_peer_shard))
                peer_shard_map[underfilled_peer].append(overfilled_peer_shard)
                peer_shard_map[overfilled_peer].remove(overfilled_peer_shard)
            else:
                continue

Of course, the logic above is rough and simple — I’m sure you, the reader, can come up with something much better.

Assuming no bugs, we’ll get a list of migration operations like this:

Shard ID lists for each peer:
{5395257186314509: [1, 2, 4, 5, 7, 8, 10, 11], 4182395837949771: [0, 1, 3, 4, 6, 7, 9, 10], 3095816753490206: [0, 2, 3, 5, 6, 8, 9, 11]}
Underfilled peers:
[3658649898688837, 3841618339255269, 8689864553665627]
Overfilled peers:
[5395257186314509, 4182395837949771, 3095816753490206]
Move shard_id 1 from peer_id 5395257186314509 to peer_id 3658649898688837
Move shard_id 4 from peer_id 5395257186314509 to peer_id 3658649898688837
...
Final Shard Mapping After Rebalance:
Peer ID: 5395257186314509, Shard IDs: [2, 5, 8, 11]
Peer ID: 4182395837949771, Shard IDs: [1, 4, 7, 10]
Peer ID: 3095816753490206, Shard IDs: [2, 5, 8, 11]
Peer ID: 3658649898688837, Shard IDs: [1, 4, 7, 10]
Peer ID: 3841618339255269, Shard IDs: [0, 3, 6, 9]
Peer ID: 8689864553665627, Shard IDs: [0, 3, 6, 9]

Now we just need to wrap the shard-moving logic:

def rebalance_shards(from_peer, to_peer, shard_id):
    url = f"{base_url}/collections/new_collection/cluster"
    payload = {
        "move_shard": {
            "shard_id": shard_id,
            "from_peer_id": from_peer,
            "to_peer_id": to_peer
        }
    }
    r = requests.post(url, json=payload)

And execute:

for from_peer, to_peer, shard_id in rebalance_operations:
    rebalance_shards(from_peer, to_peer, shard_id)

⚠️ Always remember: this is an expensive operation on large clusters

After some time, you should end up with a fully rebalanced 6-node cluster. You can then configure your Load Balancer to route to these nodes. Your application can connect to any of them and resume operations!

For downscaling, simply move all shards off the peer to be removed (similar to kubelet drain), then use the API to remove that peer.


Disaster Recovery

With a cluster of 3+ nodes, according to Raft, as long as more than 50% of nodes are online, and no single shard is fully hosted on offline nodes, all operations remain unaffected.

Oddly, Qdrant docs do not mention split-brain scenarios.

So:

  • If some machines go offline but no shard loses all its replicas:

    • Operations are unaffected
    • If you can restore the machine, great
    • If not, delete the peer via API, spin up a new one, and rebalance
  • If all replicas of a shard are on offline machines:

    • Fix the machines ASAP 🤣
    • Or start planning your escape

Good news: With cloud providers, total machine failure is rare unless something catastrophic (e.g. datacenter fire) happens. Most downtime is due to networking or OOM. As long as you have backups, recovery is generally possible.

There’s another gotcha: if a node is restored with a different IP, Qdrant might or might not notify the cluster of the update:

2025-05-10T07:52:31.601762Z  WARN storage::content_manager::consensus::persistent: Replaced address of peer 3994356516252114 from http://10.0.0.5:6335/ to http://10.0.0.9:6335/

If it doesn’t notify, you’ll need to manually edit the /qdrant_storage/raft_state.json file on every node and restart them all.

Hopefully it won’t come to that 😇

If you’re using Hetzner Cloud, here are some extra tips:


Automated Deployment

Remember we said you could deploy quickly on Hetzner?

Hetzner provides internal metadata APIs: https://docs.hetzner.cloud/#server-metadata

For example, on a VM:

curl http://169.254.169.254/hetzner/v1/metadata/private-networks

You’ll get something like:

- ip: 10.0.0.3
  alias_ips: []
  interface_num: 1
  mac_address: 86:00:00:c3:bf:16
  network_id: 3493377
  network_name: us-west-network
  network: 10.0.0.0/16
  subnet: 10.0.0.0/24
  gateway: 10.0.0.1

Use that to generate a cloud-init that installs Docker + deploys a Qdrant node:

#cloud-config
write_files:
  - path: /root/create_docker_compose.sh
    permissions: "0755"
    owner: root:root
    content: |
      #!/bin/bash
      # Fetch the private network metadata
      METADATA=$(curl -s http://169.254.169.254/hetzner/v1/metadata/private-networks)

      # Extract the IP address from the metadata
      PRIVATE_IP=$(echo "$METADATA" | awk -F': ' '/ip:/ {print $2}' | tr -d ' ')
      
      # Generate the docker-compose.yml file
      cat <<EOF > /root/docker-compose.yml
      services:
        qdrant:
          image: qdrant/qdrant:v1.14.0
          restart: always
          volumes:
            - ./qdrant_storage:/qdrant/storage
          ports:
            - "6333:6333"
            - "6334:6334"
            - "6335:6335"
          environment:
            QDRANT__CLUSTER__ENABLED: "true"
          command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://$PRIVATE_IP:6335"
      EOF      

  - path: /root/install_docker.sh
    permissions: "0755"
    owner: root:root
    content: |
      #!/bin/bash
      # Install Docker
      curl -fsSL https://get.docker.com -o install-docker.sh
      bash install-docker.sh

      # Install Docker Compose
      wget https://github.com/docker/compose/releases/download/v2.36.0/docker-compose-linux-x86_64 -O /usr/bin/docker-compose
      chmod +x /usr/bin/docker-compose      

runcmd:
  # Update package lists
  - apt-get update
  - apt-get install -y curl wget 

  # Execute the install docker script
  - /root/install_docker.sh

  # Execute the script to create the docker-compose.yml file
  - /root/create_docker_compose.sh
  
  # Start Docker Compose
  - cd /root && docker-compose up -d

Update the bootstrap IP for the first node, then paste this config when creating new Hetzner VMs:

Very convenient! Hetzner’s low pricing also makes this approach very cost-effective.

VM Price Comparison

Lastly, let’s do some shameless pricing comparison:

Shared CPU VMs

Dedicated CPU VMs

If you find Hetzner’s pricing attractive, feel free to use my referral link: 👉 https://hetzner.cloud/?ref=6moYBzkpMb9s 👈 You’ll get €20 credit on signup, and I get €10 too 😘

That’s all — see you in the next post!

#English #Qdrant