A Quick Note on Setting Up a Qdrant Cluster on Hetzner with Docker and Migrating Data
Anyone who has worked on RAG-related requirements would know that as your data volume grows, you not only need to address the issue of increasingly poor recall, but you’ll also notice that your vector database keeps expanding until it eventually hits your memory limit. At that point, a large-memory monolithic server is no longer sufficient to meet availability and performance needs (since a single machine will always have a memory ceiling, can’t perform smooth rolling upgrades of the cluster version, and high-memory machines are generally more expensive than multiple smaller ones). For example, there was a recent requirement to migrate a relatively large standalone Qdrant instance to a clustered deployment mode, which led to this brief note.
Realized it’s been almost a year since I last wrote a blog post with some technical content…
Original Standalone Qdrant Deployment
The existing Qdrant deployment is very simple—a standalone docker-compose.yml
file, as shown below:
qdrant:
image: qdrant/qdrant:v1.14.0
restart: always
ports:
- 6333:6333
- 6334:6334
volumes:
- ./volumes/qdrant:/qdrant/storage
This Qdrant instance runs on a Hetzner CCX53 machine with 32 cores and 128GB of memory. The qdrant
directory is about 200GB in size, with over 21,000,000 vectors.
{
"result": {
"status": "yellow",
"optimizer_status": "ok",
"indexed_vectors_count": 21297932,
"points_count": 21337945,
"segments_count": 10,
}
}
Cluster Environment
Basic Configuration
Here we need to explain the machines used for the cluster. Since Qdrant uses Raft as the consensus protocol, our deployment should involve at least 3 machines. We start with 3 machines in our initial setup, deployed in a test environment on Hetzner, with the following IPs:
- 10.0.0.6
- 10.0.0.7
- 10.0.0.9
Note: These are just for demonstration purposes. If you’re also using Hetzner, you can quickly deploy a Qdrant cluster using Cloud init with internal IPs, as described at the end of this article.
Since we’re using Docker for deployment, we only need to create a docker-compose.yml
file on 10.0.0.6
with the following content:
services:
qdrant_node1:
image: qdrant/qdrant:v1.14.0
restart: always
volumes:
- ./qdrant_storage:/qdrant/storage
ports:
- "6333:6333"
- "6334:6334"
- "6335:6335"
environment:
QDRANT__CLUSTER__ENABLED: "true"
command: "./qdrant --uri http://10.0.0.6:6335"
On 10.0.0.7
, create a docker-compose.yml
file with the following content:
services:
qdrant_node2:
image: qdrant/qdrant:v1.14.0
ports:
- "6333:6333"
- "6334:6334"
- "6335:6335"
volumes:
- ./qdrant_storage:/qdrant/storage
environment:
QDRANT__CLUSTER__ENABLED: "true"
command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://10.0.0.7:6335"
On 10.0.0.9
, create a docker-compose.yml
file with the following content:
services:
qdrant_node3:
image: qdrant/qdrant:v1.14.0
ports:
- "6333:6333"
- "6334:6334"
- "6335:6335"
volumes:
- ./qdrant_storage:/qdrant/storage
environment:
QDRANT__CLUSTER__ENABLED: "true"
command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://10.0.0.9:6335"
Then run docker-compose up -d
on each machine to start the services.
Once all nodes are up, you can access the cluster status by visiting http://localhost:6333/cluster
on any machine. For example:
{
"result": {
"status": "enabled",
"peer_id": 5395257186314509,
"peers": {
"3095816753490206": {
"uri": "http://10.0.0.9:6335/"
},
"5395257186314509": {
"uri": "http://10.0.0.6:6335/"
},
"4182395837949771": {
"uri": "http://10.0.0.7:6335/"
}
},
"raft_info": {
"term": 1,
"commit": 41,
"pending_operations": 0,
"leader": 5395257186314509,
"role": "Leader",
"is_voter": true
},
"consensus_thread_status": {
"consensus_thread_status": "working",
"last_update": "2025-05-17T02:31:10.703071457Z"
},
"message_send_failures": {}
},
"status": "ok",
"time": 0.000011782
}
Creating a Collection
After setting up a distributed Qdrant cluster, as described in the official documentation at https://qdrant.tech/documentation/guides/distributed_deployment/#making-use-of-a-new-distributed-qdrant-cluster, we can see:
When you enable distributed mode and scale up to two or more nodes, your data does not move to the new node automatically; it starts out empty. To make use of your new empty node, do one of the following:
- Create a new replicated collection by setting the replication_factor to 2 or more and setting the number of shards to a multiple of your number of nodes.
- If you have an existing collection which does not contain enough shards for each node, you must create a new collection as described in the previous bullet point.
- If you already have enough shards for each node and you merely need to replicate your data, follow the directions for creating new shard replicas.
- If you already have enough shards for each node and your data is already replicated, you can move data (without replicating it) onto the new node(s) by moving shards.
Since the current Qdrant instance we have is a single node, the collection on it has only a single shard. The collection configuration is as follows:
{
"params": {
"vectors": {
"size": 1536,
"distance": "Cosine",
"on_disk": true
},
"shard_number": 1,
"replication_factor": 1,
"write_consistency_factor": 1,
"on_disk_payload": true
},
}
...
So the first step is to create a new collection on the new cluster with the same parameters as the current collection, except that shard_number
and replication_factor
need to be modified.
Just like ClickHouse, creating a cluster doesn’t automatically mean your data is distributed and replicated—you have to manually specify your data.
For the settings of the two parameters above, the official documentation recommends:
If you anticipate a lot of growth, we recommend 12 shards since you can expand from 1 node up to 2, 3, 6, and 12 nodes without having to re-shard. Having more than 12 shards in a small cluster may not be worth the performance overhead.
“Anticipate a lot of growth” sounds very appropriate for our scenario, so here I create a new collection with shard_number
set to 12 and replication_factor
set to 2. The relevant script is as follows:
from qdrant_client import QdrantClient
import qdrant_client.http.models as models
collection_name = "new_collection"
client = QdrantClient(
host="10.0.0.6", port=6333
)
vectors_config = models.VectorParams(
size=1536, distance=models.Distance.COSINE, on_disk=True
)
hnsw_config = HnswConfigDiff(
m=0,
payload_m=16,
ef_construct=100,
full_scan_threshold=10000,
max_indexing_threads=100,
on_disk=True,
)
client.create_collection(
collection_name=collection_name,
vectors_config=vectors_config,
shard_number=12,
replication_factor= 2,
hnsw_config=hnsw_config
)
Migrating Data
Once the new cluster is created, we need to import the data from the existing cluster. Initially, I tried to create a snapshot on the original cluster and import it into the new cluster, but encountered the following error:
{"status":{"error":"Wrong input: Snapshot is not compatible with existing collection: Collection shard number: 3 Snapshot shard number: 1"},"time":1107.142566774}
Here we need to use a Beta version tool provided by Qdrant to perform the migration: https://github.com/qdrant/migration/
Usage is as follows:
docker run --net=host --rm -it registry.cloud.qdrant.io/library/qdrant-migration qdrant \
--source-url 'http://localhost:6334' \
--source-collection 'new_collection' \
--target-url 'http://10.0.0.6:6334' \
--target-collection 'new_collection'
Note:
registry.cloud.qdrant.io/library/qdrant-migration
is an older version. It’s recommended to build the image manually using the latest code.It’s also recommended to manually patch the
grpc.MaxCallRecvMsgSize
parameter and increase thebatch-size
(default is 50, can be increased to 20000) to achieve faster import speed. Related issue: https://github.com/qdrant/migration/issues/30#issuecomment-2876456943
To ensure maximum import speed, you can refer to this article: https://qdrant.tech/articles/indexing-optimization/#2-disable-hnsw-for-dense-vectors-m0, and disable hnsw_config
on the new cluster, while setting a reasonable indexing_threshold
, for example:
PATCH /collections/your_collection
{
"hnsw_config": {
"m": 0
},
"optimizer_config": {
"indexing_threshold": 10000
}
}
The former ensures that HNSW indexes are not built during import, and the latter ensures that vectors are flushed to disk when reaching 10,000 to prevent memory overflow due to vector accumulation.
After the import is completed, HNSW can be re-enabled to build the index.
Cluster Information After Migration
To conveniently observe the Shard information on each node, we can use the /collections/<collection_name>/cluster
API, for example, it returns the following response:
{"result":{"peer_id":5395257186314509,"shard_count":12,"local_shards":[{"shard_id":1,"points_count":1794606,"state":"Active"},{"shard_id":2,"points_count":1450924,"state":"Active"},{"shard_id":4,"points_count":1902963,"state":"Active"},{"shard_id":5,"points_count":1774613,"state":"Active"},{"shard_id":7,"points_count":1753521,"state":"Active"},{"shard_id":8,"points_count":1687892,"state":"Active"},{"shard_id":10,"points_count":1477543,"state":"Active"},{"shard_id":11,"points_count":2051536,"state":"Active"}],"remote_shards":[{"shard_id":0,"peer_id":4182395837949771,"state":"Active"},{"shard_id":0,"peer_id":3095816753490206,"state":"Active"},{"shard_id":1,"peer_id":4182395837949771,"state":"Active"},{"shard_id":2,"peer_id":3095816753490206,"state":"Active"},{"shard_id":3,"peer_id":3095816753490206,"state":"Active"},{"shard_id":3,"peer_id":4182395837949771,"state":"Active"},{"shard_id":4,"peer_id":4182395837949771,"state":"Active"},{"shard_id":5,"peer_id":3095816753490206,"state":"Active"},{"shard_id":6,"peer_id":3095816753490206,"state":"Active"},{"shard_id":6,"peer_id":4182395837949771,"state":"Active"},{"shard_id":7,"peer_id":4182395837949771,"state":"Active"},{"shard_id":8,"peer_id":3095816753490206,"state":"Active"},{"shard_id":9,"peer_id":3095816753490206,"state":"Active"},{"shard_id":9,"peer_id":4182395837949771,"state":"Active"},{"shard_id":10,"peer_id":4182395837949771,"state":"Active"},{"shard_id":11,"peer_id":3095816753490206,"state":"Active"}],"shard_transfers":[]},"status":"ok","time":0.00011113}
But… this is really not very intuitive. So we need to write a small tool to easily check the distribution of Shards across each node (Peer).
Let’s go with Python!
import requests
base_url = "http://10.0.0.6:6333"
cluster_endpoint = "/cluster"
collections_endpoint = "/collections/<collection_name>/cluster"
def get_data_from_api(endpoint):
response = requests.get(base_url + endpoint)
return response.json()
def parse_cluster_peers(cluster_data):
peers = cluster_data.get("result", {}).get("peers", {})
ip_peer_map = {}
for peer_id, peer_info in peers.items():
uri = peer_info.get("uri", "")
ip_address = uri.split("//")[-1].split(":")[0]
ip_peer_map[ip_address] = int(peer_id)
return ip_peer_map
def parse_shards(collections_data):
local_shards = collections_data.get("result", {}).get("local_shards", [])
remote_shards = collections_data.get("result", {}).get("remote_shards", [])
peer_shard_map = {}
for shard in local_shards:
peer_id = collections_data.get("result", {}).get("peer_id")
shard_id = shard.get("shard_id")
peer_shard_map.setdefault(peer_id, []).append(shard_id)
for shard in remote_shards:
peer_id = shard.get("peer_id")
shard_id = shard.get("shard_id")
peer_shard_map.setdefault(peer_id, []).append(shard_id)
return peer_shard_map
def main():
cluster_data = get_data_from_api(cluster_endpoint)
collections_data = get_data_from_api(collections_endpoint)
ip_peer_map = parse_cluster_peers(cluster_data)
peer_shard_map = parse_shards(collections_data)
ip_shard_map = {}
for ip, peer_id in ip_peer_map.items():
if peer_id in peer_shard_map:
ip_shard_map[ip] = peer_shard_map[peer_id]
else:
ip_shard_map[ip] = []
for ip, shard_ids in ip_shard_map.items():
peer_id = ip_peer_map[ip]
print(f"IP: {ip}, Peer ID: {peer_id}, Shard IDs: {shard_ids}")
if __name__ == "__main__":
main()
Once we run the script, we can easily see the Shard distribution on each node:
IP: 10.0.0.7, Peer ID: 4182395837949771, Shard IDs: [0, 1, 3, 4, 6, 7, 9, 10]
IP: 10.0.0.6, Peer ID: 5395257186314509, Shard IDs: [1, 2, 4, 5, 7, 8, 10, 11]
IP: 10.0.0.9, Peer ID: 3095816753490206, Shard IDs: [0, 2, 3, 5, 6, 8, 9, 11]
We can see that all the Shards are evenly distributed across the three machines. At this point, if any single machine goes offline or is damaged, it won’t lead to complete loss of any Shard replica, so no data loss will occur.
Scaling Out
The “ROSE” strategy is designed to ensure that scaling is both effective and safe. It consists of four stages: Resuscitation, Optimization, Stabilization, and Evacuation.
Suppose our business keeps growing and three nodes are no longer sufficient. We’ll need to scale out. Since we initially set shard_number
to 12, we can scale in multiples of 3. Now we have 3 nodes, so we add 3 more. The IPs of the new nodes are:
- 10.0.0.10
- 10.0.0.11
- 10.0.0.12
Creating them is the same as before: just set each node’s --url
and --bootstrap http://10.0.0.6:6335
, and after they join, the /cluster
endpoint responds as follows:
{
"result": {
"status": "enabled",
"peer_id": 5395257186314509,
"peers": {
"3095816753490206": {
"uri": "http://10.0.0.9:6335/"
},
"4182395837949771": {
"uri": "http://10.0.0.7:6335/"
},
"3841618339255269": {
"uri": "http://10.0.0.10:6335/"
},
"3658649898688837": {
"uri": "http://10.0.0.12:6335/"
},
"5395257186314509": {
"uri": "http://10.0.0.6:6335/"
},
"8689864553665627": {
"uri": "http://10.0.0.11:6335/"
}
},
"raft_info": {
"term": 1,
"commit": 50,
"pending_operations": 0,
"leader": 5395257186314509,
"role": "Leader",
"is_voter": true
},
"consensus_thread_status": {
"consensus_thread_status": "working",
"last_update": "2025-05-17T02:49:29.351230053Z"
},
"message_send_failures": {}
},
"status": "ok",
"time": 0.000011121
}
Now, if you’re a seasoned GlusterFS user, your first instinct might be to run the following command to rebalance the data:
gluster volume rebalance VOLNAME start
Unfortunately, the open-source version of Qdrant doesn’t support this (though it’s available in their Cloud service):
It’s worth mentioning that Qdrant only provides the necessary building blocks to create an automated failure recovery. Building a completely automatic process of collection scaling would require control over the cluster machines themself. Check out our cloud solution, where we made exactly that.
Shards are evenly distributed across all existing nodes when a collection is first created, but Qdrant does not automatically rebalance shards if your cluster size or replication factor changes (since this is an expensive operation on large clusters). See the next section for how to move shards after scaling operations.
— From: https://qdrant.tech/documentation/guides/distributed_deployment/#choosing-the-right-number-of-shard
At this point, running the script again reveals:
IP: 10.0.0.6, Peer ID: 5395257186314509, Shard IDs: [1, 2, 4, 5, 7, 8, 10, 11]
IP: 10.0.0.9, Peer ID: 3095816753490206, Shard IDs: [0, 2, 3, 5, 6, 8, 9, 11]
IP: 10.0.0.12, Peer ID: 3658649898688837, Shard IDs: []
IP: 10.0.0.7, Peer ID: 4182395837949771, Shard IDs: [0, 1, 3, 4, 6, 7, 9, 10]
IP: 10.0.0.11, Peer ID: 8689864553665627, Shard IDs: []
IP: 10.0.0.10, Peer ID: 3841618339255269, Shard IDs: []
The newly added nodes are just idle — all Shards still reside on the old Peers. So, what should we do next?
Rebalancing Like Catching Rain?
Since the official documentation has provided an API to move shards:
curl -X POST http://localhost:6333/collections/collection_name/cluster \
-H "api-key: <apiKey>" \
-H "Content-Type: application/json" \
-d '{
"move_shard": {
"shard_id": 1,
"to_peer_id": 1000000,
"from_peer_id": 1000000
}
}'
A natural solution comes to mind — manually rebalance the shards. First, we calculate how many shards each node (peer) should have. In this scenario:
(Total Shards * Number of Replicas) / Number of Machines
That is, (12*2)/6 = 4
Then we can determine which peers are overfilled and which are underfilled. We can compute a “migration path” — redistribute from the rich to the poor:
⚠️ Important: Do not assign two replicas of the same shard to the same peer! If that peer fails, you lose the data for that shard.
underfilled_peers = []
for peer_id, shard_ids in peer_shard_map.items():
if len(shard_ids) < average_shards_per_peer:
underfilled_peers.append(peer_id)
overfilled_peers = []
for peer_id, shard_ids in peer_shard_map.items():
if len(shard_ids) > average_shards_per_peer:
overfilled_peers.append(peer_id)
print("underfilled_peers")
print(underfilled_peers)
print("overfilled_peers")
print(overfilled_peers)
rebalance_operations = []
for overfilled_peer in overfilled_peers:
for underfilled_peer in underfilled_peers:
for overfilled_peer_shard in peer_shard_map[overfilled_peer]:
if (len(peer_shard_map[underfilled_peer]) < average_shards_per_peer and
overfilled_peer_shard not in peer_shard_map[underfilled_peer] and
len(peer_shard_map[overfilled_peer]) > average_shards_per_peer):
print(f"Moving shard_id {overfilled_peer_shard} from peer_id {overfilled_peer} to peer_id {underfilled_peer}")
rebalance_operations.append((overfilled_peer, underfilled_peer, overfilled_peer_shard))
peer_shard_map[underfilled_peer].append(overfilled_peer_shard)
peer_shard_map[overfilled_peer].remove(overfilled_peer_shard)
else:
continue
Of course, the logic above is rough and simple — I’m sure you, the reader, can come up with something much better.
Assuming no bugs, we’ll get a list of migration operations like this:
Shard ID lists for each peer:
{5395257186314509: [1, 2, 4, 5, 7, 8, 10, 11], 4182395837949771: [0, 1, 3, 4, 6, 7, 9, 10], 3095816753490206: [0, 2, 3, 5, 6, 8, 9, 11]}
Underfilled peers:
[3658649898688837, 3841618339255269, 8689864553665627]
Overfilled peers:
[5395257186314509, 4182395837949771, 3095816753490206]
Move shard_id 1 from peer_id 5395257186314509 to peer_id 3658649898688837
Move shard_id 4 from peer_id 5395257186314509 to peer_id 3658649898688837
...
Final Shard Mapping After Rebalance:
Peer ID: 5395257186314509, Shard IDs: [2, 5, 8, 11]
Peer ID: 4182395837949771, Shard IDs: [1, 4, 7, 10]
Peer ID: 3095816753490206, Shard IDs: [2, 5, 8, 11]
Peer ID: 3658649898688837, Shard IDs: [1, 4, 7, 10]
Peer ID: 3841618339255269, Shard IDs: [0, 3, 6, 9]
Peer ID: 8689864553665627, Shard IDs: [0, 3, 6, 9]
Now we just need to wrap the shard-moving logic:
def rebalance_shards(from_peer, to_peer, shard_id):
url = f"{base_url}/collections/new_collection/cluster"
payload = {
"move_shard": {
"shard_id": shard_id,
"from_peer_id": from_peer,
"to_peer_id": to_peer
}
}
r = requests.post(url, json=payload)
And execute:
for from_peer, to_peer, shard_id in rebalance_operations:
rebalance_shards(from_peer, to_peer, shard_id)
⚠️ Always remember: this is an expensive operation on large clusters
After some time, you should end up with a fully rebalanced 6-node cluster. You can then configure your Load Balancer to route to these nodes. Your application can connect to any of them and resume operations!
For downscaling, simply move all shards off the peer to be removed (similar to kubelet drain
), then use the API to remove that peer.
Disaster Recovery
With a cluster of 3+ nodes, according to Raft, as long as more than 50% of nodes are online, and no single shard is fully hosted on offline nodes, all operations remain unaffected.
Oddly, Qdrant docs do not mention split-brain scenarios.
So:
If some machines go offline but no shard loses all its replicas:
- Operations are unaffected
- If you can restore the machine, great
- If not, delete the peer via API, spin up a new one, and rebalance
If all replicas of a shard are on offline machines:
- Fix the machines ASAP 🤣
Or start planning your escape
Good news: With cloud providers, total machine failure is rare unless something catastrophic (e.g. datacenter fire) happens. Most downtime is due to networking or OOM. As long as you have backups, recovery is generally possible.
There’s another gotcha: if a node is restored with a different IP, Qdrant might or might not notify the cluster of the update:
2025-05-10T07:52:31.601762Z WARN storage::content_manager::consensus::persistent: Replaced address of peer 3994356516252114 from http://10.0.0.5:6335/ to http://10.0.0.9:6335/
If it doesn’t notify, you’ll need to manually edit the /qdrant_storage/raft_state.json
file on every node and restart them all.
Hopefully it won’t come to that 😇
If you’re using Hetzner Cloud, here are some extra tips:
Use Placement Groups to ensure VMs are not on the same physical host: https://docs.hetzner.com/cloud/placement-groups/overview
“In spread Placement Groups, all virtual servers are running on different physical servers.”
Enable Backups
- Backups are stored in a different availability zone (except for Hillsboro, Ashburn, and Singapore which have only one)
- Docs: https://docs.hetzner.com/cloud/servers/backups-snapshots/faq#what-location-are-backupssnapshots-stored-in
Automated Deployment
Remember we said you could deploy quickly on Hetzner?
Hetzner provides internal metadata APIs: https://docs.hetzner.cloud/#server-metadata
For example, on a VM:
curl http://169.254.169.254/hetzner/v1/metadata/private-networks
You’ll get something like:
- ip: 10.0.0.3
alias_ips: []
interface_num: 1
mac_address: 86:00:00:c3:bf:16
network_id: 3493377
network_name: us-west-network
network: 10.0.0.0/16
subnet: 10.0.0.0/24
gateway: 10.0.0.1
Use that to generate a cloud-init
that installs Docker + deploys a Qdrant node:
#cloud-config
write_files:
- path: /root/create_docker_compose.sh
permissions: "0755"
owner: root:root
content: |
#!/bin/bash
# Fetch the private network metadata
METADATA=$(curl -s http://169.254.169.254/hetzner/v1/metadata/private-networks)
# Extract the IP address from the metadata
PRIVATE_IP=$(echo "$METADATA" | awk -F': ' '/ip:/ {print $2}' | tr -d ' ')
# Generate the docker-compose.yml file
cat <<EOF > /root/docker-compose.yml
services:
qdrant:
image: qdrant/qdrant:v1.14.0
restart: always
volumes:
- ./qdrant_storage:/qdrant/storage
ports:
- "6333:6333"
- "6334:6334"
- "6335:6335"
environment:
QDRANT__CLUSTER__ENABLED: "true"
command: "./qdrant --bootstrap http://10.0.0.6:6335 --uri http://$PRIVATE_IP:6335"
EOF
- path: /root/install_docker.sh
permissions: "0755"
owner: root:root
content: |
#!/bin/bash
# Install Docker
curl -fsSL https://get.docker.com -o install-docker.sh
bash install-docker.sh
# Install Docker Compose
wget https://github.com/docker/compose/releases/download/v2.36.0/docker-compose-linux-x86_64 -O /usr/bin/docker-compose
chmod +x /usr/bin/docker-compose
runcmd:
# Update package lists
- apt-get update
- apt-get install -y curl wget
# Execute the install docker script
- /root/install_docker.sh
# Execute the script to create the docker-compose.yml file
- /root/create_docker_compose.sh
# Start Docker Compose
- cd /root && docker-compose up -d
Update the bootstrap IP for the first node, then paste this config when creating new Hetzner VMs:
Very convenient! Hetzner’s low pricing also makes this approach very cost-effective.
VM Price Comparison
Lastly, let’s do some shameless pricing comparison:
Shared CPU VMs
Dedicated CPU VMs
If you find Hetzner’s pricing attractive, feel free to use my referral link: 👉 https://hetzner.cloud/?ref=6moYBzkpMb9s 👈 You’ll get €20 credit on signup, and I get €10 too 😘
That’s all — see you in the next post!