用 gRPC 打造最终一致的 AP-模式注册中心

背景与目标

需求说明
高可用 (A)任意 Dispatcher 宕机或网络分区,注册/发现仍立即可用
分区容忍 (P)各分区内继续服务,网络恢复后自动收敛数据
最终一致强一致不是刚需;允许短暂不一致
低延迟注册/心跳 O(μs),不阻塞业务线程

传统 CP(如 ZooKeeper)在分区时会牺牲可写;Eureka / Nacos-Naming 走 AP 路线。本文在现有 executor ←→ dispatcher 代码之上,演示一套“Δ-Gossip + 内存快照”的极简 AP 实现。


总体架构

  • Executor ⇄ Dispatcher:对外 gRPC 接口(9090、9091 …)。
  • Dispatcher ↔ Dispatcher:集群 Δ-Gossip(9581、9582 …)。
  • deltaQueue:本节点待广播增量缓冲,实现写-读解耦。

核心组件拆解

位置作用
executorconfigExecutorRegister读取 executor.* 配置 + 注入 registryStub
executorlifecycleExecutorLifecycleManager@EventListener 完成注册 / 心跳;@PreDestroy 注销
dispatcherregistryGrpcRegistryServiceImpl对外 gRPC:register / heartbeat / unregister
dispatcherstoreInstanceStore线程安全 Map + Δ 队列;幂等 upsert()
dispatcherstoreRevisionGenerator单调递增版本号 (Lamport Clock)
dispatchercluster/gossipClusterGossipSender500 ms 批量 drain Δ → 推送给 peers
dispatchercluster/syncClusterSyncServiceImpl收到 Δ / FullSync → store.upsert()
dispatcherschedulerExpireScanner每 5 s 剔除心跳超时实例 (TTL)

peer = 与本节点处在同一集群、需要相互同步数据的 其他 Dispatcher


一次「上线-下线」全过程

#时间线详细步骤
1Executor 启动ApplicationReadyEvent → 组装 RegisterRequest(instanceKey, executorId…) → gRPC 调用 Dispatcher-A
2Dispatcher-AGrpcRegistryServiceImpl.register()InstanceStore.upsert(dto) 更新本地快照② deltaQueue.offer(dto)gossipSender.signal() → 立即 ACK Success
3Δ 扩散ClusterGossipSender 每 500 ms pushDelta()- drain 0-500 条 Δ- 组装 DeltaSync- 遍历 cluster.peerspushDelta()
4Peers (B,C…)ClusterSyncServiceImpl.pushDelta() → 再次 InstanceStore.upsert() → 本节点 signal()(波浪式扩散)
5心跳Executor 每 5 s heartbeat(instanceKey) → 同样走 Δ 机制
6宕机 / 退出@PreDestroyunregister(instanceKey) → 创建 tombstone Δ → 扩散
7失联剔除ExpireScanner.scan() 每 5 s 检查 now-lastBeat > ttl → tombstone Δ
8分区自愈分区恢复后,节点之间互推 Δ;revision 冲突解决:大版本号胜

Δ 队列:写-读解耦、可靠缓冲、低带宽

价值点体现
写-读解耦注册/心跳只写内存 & 入队,立即返回;网络 I/O 留给定时线程
可靠缓冲peer 网络异常→发送失败→Δ 重新入队;恢复后重播,绝不丢增量
低带宽队列仅存变化 (diff)。批量打包 N 次心跳 ≈ 1 个 DeltaSync 报文

一条实例的“出生-心跳-注销”

① Executor 启动 —— 注册

// ExecutorLifecycleManager.register()
String instanceKey = name + "-" + host + "-" + port;
long executorId    = ToolObjectId.getInstance().nextId();

registryStub.register(RegisterRequest.newBuilder()
        .setInstanceKey(instanceKey)
        .setExecutorId(executorId)
        .setName(name).setHost(host).setPort(port)
        .build());

做了什么?

  1. 组合 instanceKey(逻辑主键)
  2. 生成分布式 executorId
  3. gRPC 调用 Dispatcher-A

② Dispatcher-A 落本地 + Δ 入队

// GrpcRegistryServiceImpl.register()
InstanceDTO dto = InstanceDTO.newBuilder()
    .setInstanceKey(req.getInstanceKey())
    .setExecutorId(req.getExecutorId())
    .setRevision(revGen.next())           // Lamport Clock
    .setLastBeat(System.currentTimeMillis())
    .build();

store.upsert(dto);   // ① merge Map ② deltaQueue.offer(dto)
gossip.signal();     // needPush = true 触发发送

做了什么?
O(1) 内存写完立即 ACK Executor —— 写-读分离

③ Δ 广播线程

// ClusterGossipSender.pushDelta()
if (!needPush.compareAndSet(true,false)) return;

List<InstanceDTO> batch = store.drainDelta(500);
DeltaSync ds = DeltaSync.newBuilder().addAllInstances(batch).build();

peerList.forEach(addr -> {
    try {
        stub(addr).withDeadlineAfter(300, MILLISECONDS)
                  .pushDelta(ds);         // 一次发送整批增量
    } catch (Exception ex) {
        log.debug("push fail {}", addr);
    }
});

做了什么?

将过去 ≤ 500 条 Δ 打成一个 DeltaSync 报文广播给所有 peer —— 低带宽 + 可靠缓冲

④ 其他 Dispatcher 收 Δ

// ClusterSyncServiceImpl.pushDelta()
req.getInstancesList().forEach(store::upsert); // 同样入 Map & Δ 队列

做了什么?
幂等 merge —— 如果版本号 (revision) 大,则覆盖;否则丢弃。
随后本节点也 signal(),确保继续扩散到更多 peer。

⑤ Executor 心跳

// Executor 定时线程
registryStub.heartbeat(
   HeartbeatRequest.newBuilder().setInstanceKey(instanceKey).build());

Dispatcher 更新 lastBeat → 再次 Δ 扩散,所有节点 TTL 计时同步

⑥ 过期剔除

@Scheduled(fixedDelay = 5000)
public void scan() {
    long now = System.currentTimeMillis();
    store.all().forEach(inst -> {
        if (!inst.getTombstone() &&
            now - inst.getLastBeat() > inst.getTtl()) {
            store.tombstone(inst.getInstanceKey(), revGen.next());
            gossip.signal();               // tombstone Δ 扩散
        }
    });
}

做了什么?

  • 失联 > TTL 的实例会被自动标记为 tombstone = true 并同步全网。

⑦ Executor 优雅下线

@PreDestroy
public void onShutdown() {
    registryStub.unregister(
        UnregisterRequest.newBuilder().setInstanceKey(instanceKey).build());
}

Dispatcher 侧生成 tombstone Δ → 全网立即下线。


网络分区 & 节点失联自愈

场景系统表现
Dispatcher-A 离线Executors 自动重试列表中的 B/C;注册/心跳继续成功
跨机房分区分区内各 Dispatcher 独立可写;恢复后 Δ 汇合,执行 revision 规则收敛
Executor 崩溃未发送 Unregister,但 Dispatcher 在 3×TTL 后自动 tombstone 下线

未来可优化点

方向建议
持久化InstanceStore 加写-前日志(WAL)或 RocksDB,集群全员宕机后可快速恢复
批量压缩Δ 报文体可 GZIP;或用 proto3 packed 减尺寸
异步 StubBlockingStub 改为 FutureStub 并行发送,提升吞吐
动态 peer 发现改静态列表为 K8s Headless Service or DNS‐SRV
Metrics & Alert暴露实例数、Δ 队列长度、Gossip RTT;TTL 触发率告警
灰度卸载tombstone 时带 drain=true 标记,客户端收到后优雅摘除

总结

极简原则
一台 Dispatcher 能活,就让写入成功;多台能连,就让数据最终一致。

  • Executor 关心极简接口:注册 / 心跳 / 注销
  • Dispatcher 内部通过 deltaQueue + Gossip 架起 AP 高可用桥梁
  • Δ 把「更新」从「传播」中切出来:
    • 写线程 = 低延迟
    • 读线程 = 异步容错
  • 分区、节点故障、全员重启,都能依靠 Δ + revision 自动收敛
消息盒子
# 您需要首次评论以获取消息 #
# 您需要首次评论以获取消息 #

只显示最新10条未读和已读信息