用 gRPC 打造最终一致的 AP-模式注册中心
背景与目标
需求 | 说明 |
---|---|
高可用 (A) | 任意 Dispatcher 宕机或网络分区,注册/发现仍立即可用 |
分区容忍 (P) | 各分区内继续服务,网络恢复后自动收敛数据 |
最终一致 | 强一致不是刚需;允许短暂不一致 |
低延迟 | 注册/心跳 O(μs),不阻塞业务线程 |
传统 CP(如 ZooKeeper)在分区时会牺牲可写;Eureka / Nacos-Naming 走 AP 路线。本文在现有 executor ←→ dispatcher 代码之上,演示一套“Δ-Gossip + 内存快照”的极简 AP 实现。
总体架构
- Executor ⇄ Dispatcher:对外 gRPC 接口(9090、9091 …)。
- Dispatcher ↔ Dispatcher:集群 Δ-Gossip(9581、9582 …)。
- deltaQueue:本节点待广播增量缓冲,实现写-读解耦。
核心组件拆解
位置 | 类 | 作用 |
---|---|---|
executorconfig | ExecutorRegister | 读取 executor.* 配置 + 注入 registryStub |
executorlifecycle | ExecutorLifecycleManager | @EventListener 完成注册 / 心跳;@PreDestroy 注销 |
dispatcherregistry | GrpcRegistryServiceImpl | 对外 gRPC:register / heartbeat / unregister |
dispatcherstore | InstanceStore | 线程安全 Map + Δ 队列;幂等 upsert() |
dispatcherstore | RevisionGenerator | 单调递增版本号 (Lamport Clock) |
dispatchercluster/gossip | ClusterGossipSender | 500 ms 批量 drain Δ → 推送给 peers |
dispatchercluster/sync | ClusterSyncServiceImpl | 收到 Δ / FullSync → store.upsert() |
dispatcherscheduler | ExpireScanner | 每 5 s 剔除心跳超时实例 (TTL) |
peer = 与本节点处在同一集群、需要相互同步数据的 其他 Dispatcher。
一次「上线-下线」全过程
# | 时间线 | 详细步骤 |
---|---|---|
1 | Executor 启动 | ApplicationReadyEvent → 组装 RegisterRequest(instanceKey, executorId…) → gRPC 调用 Dispatcher-A |
2 | Dispatcher-A | GrpcRegistryServiceImpl.register() ① InstanceStore.upsert(dto) 更新本地快照② deltaQueue.offer(dto) ③ gossipSender.signal() → 立即 ACK Success |
3 | Δ 扩散 | ClusterGossipSender 每 500 ms pushDelta() - drain 0-500 条 Δ- 组装 DeltaSync - 遍历 cluster.peers 调 pushDelta() |
4 | Peers (B,C…) | ClusterSyncServiceImpl.pushDelta() → 再次 InstanceStore.upsert() → 本节点 signal() (波浪式扩散) |
5 | 心跳 | Executor 每 5 s heartbeat(instanceKey) → 同样走 Δ 机制 |
6 | 宕机 / 退出 | @PreDestroy → unregister(instanceKey) → 创建 tombstone Δ → 扩散 |
7 | 失联剔除 | ExpireScanner.scan() 每 5 s 检查 now-lastBeat > ttl → tombstone Δ |
8 | 分区自愈 | 分区恢复后,节点之间互推 Δ;revision 冲突解决:大版本号胜 |
Δ 队列:写-读解耦、可靠缓冲、低带宽
价值点 | 体现 |
---|---|
写-读解耦 | 注册/心跳只写内存 & 入队,立即返回;网络 I/O 留给定时线程 |
可靠缓冲 | peer 网络异常→发送失败→Δ 重新入队;恢复后重播,绝不丢增量 |
低带宽 | 队列仅存变化 (diff)。批量打包 N 次心跳 ≈ 1 个 DeltaSync 报文 |
一条实例的“出生-心跳-注销”
① Executor 启动 —— 注册
// ExecutorLifecycleManager.register()
String instanceKey = name + "-" + host + "-" + port;
long executorId = ToolObjectId.getInstance().nextId();
registryStub.register(RegisterRequest.newBuilder()
.setInstanceKey(instanceKey)
.setExecutorId(executorId)
.setName(name).setHost(host).setPort(port)
.build());
做了什么?
- 组合
instanceKey
(逻辑主键) - 生成分布式
executorId
- gRPC 调用 Dispatcher-A
② Dispatcher-A 落本地 + Δ 入队
// GrpcRegistryServiceImpl.register()
InstanceDTO dto = InstanceDTO.newBuilder()
.setInstanceKey(req.getInstanceKey())
.setExecutorId(req.getExecutorId())
.setRevision(revGen.next()) // Lamport Clock
.setLastBeat(System.currentTimeMillis())
.build();
store.upsert(dto); // ① merge Map ② deltaQueue.offer(dto)
gossip.signal(); // needPush = true 触发发送
做了什么?
O(1) 内存写完立即 ACK Executor —— 写-读分离。
③ Δ 广播线程
// ClusterGossipSender.pushDelta()
if (!needPush.compareAndSet(true,false)) return;
List<InstanceDTO> batch = store.drainDelta(500);
DeltaSync ds = DeltaSync.newBuilder().addAllInstances(batch).build();
peerList.forEach(addr -> {
try {
stub(addr).withDeadlineAfter(300, MILLISECONDS)
.pushDelta(ds); // 一次发送整批增量
} catch (Exception ex) {
log.debug("push fail {}", addr);
}
});
做了什么?
将过去 ≤ 500 条 Δ 打成一个
DeltaSync
报文广播给所有 peer —— 低带宽 + 可靠缓冲。
④ 其他 Dispatcher 收 Δ
// ClusterSyncServiceImpl.pushDelta()
req.getInstancesList().forEach(store::upsert); // 同样入 Map & Δ 队列
做了什么?
幂等 merge —— 如果版本号 (revision
) 大,则覆盖;否则丢弃。
随后本节点也 signal()
,确保继续扩散到更多 peer。
⑤ Executor 心跳
// Executor 定时线程
registryStub.heartbeat(
HeartbeatRequest.newBuilder().setInstanceKey(instanceKey).build());
Dispatcher 更新 lastBeat
→ 再次 Δ 扩散,所有节点 TTL 计时同步。
⑥ 过期剔除
@Scheduled(fixedDelay = 5000)
public void scan() {
long now = System.currentTimeMillis();
store.all().forEach(inst -> {
if (!inst.getTombstone() &&
now - inst.getLastBeat() > inst.getTtl()) {
store.tombstone(inst.getInstanceKey(), revGen.next());
gossip.signal(); // tombstone Δ 扩散
}
});
}
做了什么?
- 失联 > TTL 的实例会被自动标记为
tombstone = true
并同步全网。
⑦ Executor 优雅下线
@PreDestroy
public void onShutdown() {
registryStub.unregister(
UnregisterRequest.newBuilder().setInstanceKey(instanceKey).build());
}
Dispatcher 侧生成 tombstone Δ → 全网立即下线。
网络分区 & 节点失联自愈
场景 | 系统表现 |
---|---|
Dispatcher-A 离线 | Executors 自动重试列表中的 B/C;注册/心跳继续成功 |
跨机房分区 | 分区内各 Dispatcher 独立可写;恢复后 Δ 汇合,执行 revision 规则收敛 |
Executor 崩溃 | 未发送 Unregister,但 Dispatcher 在 3×TTL 后自动 tombstone 下线 |
未来可优化点
方向 | 建议 |
---|---|
持久化 | 给 InstanceStore 加写-前日志(WAL)或 RocksDB,集群全员宕机后可快速恢复 |
批量压缩 | Δ 报文体可 GZIP;或用 proto3 packed 减尺寸 |
异步 Stub | 把 BlockingStub 改为 FutureStub 并行发送,提升吞吐 |
动态 peer 发现 | 改静态列表为 K8s Headless Service or DNS‐SRV |
Metrics & Alert | 暴露实例数、Δ 队列长度、Gossip RTT;TTL 触发率告警 |
灰度卸载 | tombstone 时带 drain=true 标记,客户端收到后优雅摘除 |
总结
极简原则:
一台 Dispatcher 能活,就让写入成功;多台能连,就让数据最终一致。
- Executor 关心极简接口:注册 / 心跳 / 注销
- Dispatcher 内部通过
deltaQueue + Gossip
架起 AP 高可用桥梁 - Δ 把「更新」从「传播」中切出来:
- 写线程 = 低延迟
- 读线程 = 异步容错
- 分区、节点故障、全员重启,都能依靠 Δ + revision 自动收敛
空空如也!