粗略地从源码中了解Etcd中的数据同步
在一个由许多机器组成的集群中运行着一个服务,在无任何错误的情况下往某一个节点写入A=1后通过网络请求给集群中的其他节点发送这一变更,其他节点收到后也把A的值赋为1,既在集群中某个节点写入一个数据并把这个数据同步给其他节点。但是在真实情况中往往要复杂一些。
在生产环境中,网络与服务器并不是百分百可靠的,网络可能会丢包、故障,程序可能会painc,服务器可能会宕机,某个节点接受了数据变更然后准备同步到其他节点的时候由于某些原因节点同步失败,导致数据写入了该节点但没有同步到其他节点中,这就有可能导致失去一致性从而影响系统正常运行,所以我们不能在节点只通过简单的网络请求进行同步数据还要通过一定的约束保证一致性。
git_repo: https://github.com/etcd-io/etcd/tree/release-3.5
code_version: release-3.5
1 了解Raft协议
首先,在Raft协议中的节点有以下三种状态:
状态 | 解释 |
---|---|
Follower | 追随者状态,接受Leader发起的数据变更,当大部分节点无法感知到Leader存在时会变成Candidate状态 |
Candidate | 参选者状态,当节点变为该状态时节点参与Leader的竞争 |
Leader | 领导者状态,负责向其他Follower节点发送数据变更,任何改变都需要通过Leader来传达 |
在启动程序的时候所有节点都在Follower状态中,此时集群中还没有Leader的存在,于是所有节点会变成Candidate状态并参与Leader的竞争,Candidate节点向其他节点发起投票,其他节点对这次投票进行同意或拒绝,当大部分节点同意此次投票时则此节点变为Leader,这个过程叫做Leader Election(Leader的选举)。
Follower在接受到数据变更请求后不会马上对自己的数据进行变更而是把变更请求转发到Leader节点由Leader节点同意调度同步。每一次数据变更都会作为一个Entry通过WAL添加进Leader的节点日志中,此时这个数据变更还在Uncommitted(未提交状态),客户端发起请求还未能看到此次数据的变更,Leader将变更同步到Follower,大部分Follower[(n/2)+1]节点写入变更后Leader便认为此次变更写入完成才提交此次变更,提交变更并把提交同步到Follower中,最后数据达成一致。
2 粗略了解Etcd中的Raft
启动Etcd对Raft初始化的时候,lead的标记位为None,然后每次启动节点时都让该节点成为一个无Leader的Follower,在newRaft函数(raft/raft.go:318)与becomeFollower函数(raft/raft.go:680)中有:
// raft/raft.go:318
func newRaft(c *Config) *raft {
...
r := &raft{
...
lead: None,
...
}
...
r.becomeFollower(r.Term, None)
...
}
// raft/raft.go:680
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
StartNode函数(raft/node.go:218)通过调用NewRawNode函数(raft/node.go:222),NewRawNode又调用newRaft函数完成一个raft节点的初始化,然后调用run函数启动这个raft节点。
// raft/node.go:218
func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)
n := newNode(rn)
go n.run()
return &n
}
在node结构体(raft/node.go:254)的run函数中(raft/node.go:300),有一个for循环,循环中通过一个select接收channel,其中n.recvc就是节点间的消息,消息是通过node结构体中的stepWithWaitOption函数(raft/node.go:459)写入的,这个函数后面会提及。接收n.recvc后符合的信息最后会调用r.Setp函数(raft/raft.go:841),在正确执行流程中此函数最终会调用一个闭包函数,这个闭包会根据不同节点角色赋值不同函数,如果是Floower节点就是stepFollower函数(raft/raft.go:1415),如果是Leader节点就是stepLeader函数(raft/raft.go:985),如果是Candidate节点则是stepCandidate函数(raft/raft.go:1370)。
// raft/node.go:300
func (n *node) run() {
...
for {
...
select {
...
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
...
}
}
}
// raft/raft.go:841
func (r *raft) Step(m pb.Message) error {
...
switch m.Type {
...
default:
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}
在Follower的stepFollower函数(raft/raft.go:1415)中,会根据Message的类型决定做何种处理,可查阅MessageType文档,了解各种MessageType的作用,当类型为 pb.MsgProp 时Follower直接将消息转发给Leader
// raft/raft.go:1415
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
} else if r.disableProposalForwarding {
r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
...
}
return nil
}
在Leader的stepLeader函数(raft/raft.go:985)中,接收到的 pb.MsgProp 消息类型会将Entry添加到raft日志中,这时候就是已经接收变更但未提交的状态,然后调用r.bcastAppend()将变更同步到Follower中。具体广播同步的源码可查看raft结构体(raft/raft.go:243)的bcastAppend函数(raft/raft.go:515)。
// raft/raft.go:1013
case pb.MsgProp:
...
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
...
}
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
Follower收到Leader的 pb.MsgApp 消息后会调用handleAppendEntries函数(raft/raft.go:1469)对变更日志进行处理,这个函数会先校验提交版本,符合条件后调用 r.raftLog.maybeAppend 写日志。处理完日志后Follower会回复一个 pb.MsgAppResp 返回最新的日志的index,Leader收到 pb.MsgAppResp 后如果没有错误就会调用 r.maybeCommit (raft/raft.go:585)进行提交
// raft/raft.go:1469
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
// follower's log for the maximum (index, term) pair with a term <= the
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}
}
3 为什么要使用WAL
”WAL(Write Ahead Log)预写日志,是数据库系统中常见的一种手段,用于保证数据操作的原子性和持久性。“,简单来说wal就是追加写日志,把所有对数据的操作都顺序追加写入文件,mysql中innodb的redolog也使用了wal,与更新存储数据相比,wal不存在随机io所以可以以极高的性能写入磁盘。
在Etcd中,Leader先把所有数据变更写wal,然后同步到Follower,Follower节点把变更的数据也写入wal,此时数据变更还未应用,当数据变更应用命令下达时,由于是顺序写,当某个节点发生问题重启后程序仍可顺序读到变更日志继续执行数据变更。初始数据库里没有值,设A的值为5,wal就记录下A变更为5,此时将A的值设为1,wal又记录A为1,假设A为5成功应用但A为1已写入wal未应用,此时服务器重启,可以根据处理情况继续应用A为1的操作。假设不适用wal而是直接更新A的值,由于需要等待随机IO的完成,此时服务器重启,此操作有可能失败或无效。当Etcd启动时会判断是否存在wal,如果存在wal则继续处理存在的日志,可见NewServer函数(server/etcdserver/server.go:331)中,对wal是否存在的判断,如果存在则加载并验证snapshot,然后在server/etcdserver/server.go:494调用ss.LoadNewestAvailable(server/etcdserver/api/snap/snapshotter.go:113)获取可处理的snapshot,如果可处理的snapshot不为nil,则继续处理。
// server/etcdserver/server.go:331
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
...
haveWAL := wal.Exist(cfg.WALDir())
...
switch {
...
case haveWAL:
if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
if cfg.ShouldDiscover() {
cfg.Logger.Warn(
"discovery token is ignored since cluster already initialized; valid logs are found",
zap.String("wal-dir", cfg.WALDir()),
)
}
// Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// wal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}
if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
}
if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, err
}
cfg.Logger.Info(
"recovered v2 store from snapshot",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)
if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
// A snapshot db may have already been recovered, and the old db should have
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)
s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info(
"recovered v3 backend from snapshot",
zap.Int64("backend-size-bytes", s1),
zap.String("backend-size", humanize.Bytes(uint64(s1))),
zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
)
} else {
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
}
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.SetBackend(be)
cl.Recover(api.UpdateCapability)
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
os.RemoveAll(bepath)
return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
}
...
}
...
}
4 读的强一致性(线性一致性)
大部分节点同意数据将被提交,如果查询到未同意提交的节点不会产生数据不一致吗?
当Follower节点接受到请求查询时,会先与Leader节点同步一下数据版本,如果版本不一致则进行数据同步,同步后返回数据,保证数据一致性。可见EtcdServer的linearizableReadNotify方法与linearizableReadLoop方法,每次进行查询时Range方法(server/etcdserver/v3_server.go:99)都会判断r.Serializable,默认情况下r.Serializable为false,就会调用linearizableReadNotify。
// server/etcdserver/v3_server.go:99
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
...
if !r.Serializable {
err = s.linearizableReadNotify(ctx)
trace.Step("agreement among raft nodes before linearized reading")
if err != nil {
return nil, err
}
}
...
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
}
return resp, err
}
linearizableReadNotify方法(server/etcdserver/v3_server.go:892)将一个struct写入readwaitc通道并等待返回,并触发linearizableReadLoop方法(server/etcdserver/v3_server.go:734),发起与Leader同步index信息,从而达到读的强一致性。
// (server/etcdserver/v3_server.go:892)
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()
// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}
// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}
// server/etcdserver/v3_server.go:734
func (s *EtcdServer) linearizableReadLoop() {
for {
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.LeaderChangedNotify()
select {
case <-leaderChangedNotifier:
continue
case <-s.readwaitc:
case <-s.stopping:
return
}
...
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
...
appliedIndex := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
...
}
}
阅读源码可能需要了解的词
- | - |
---|---|
Raft | Etcd 所采用的保证分布式系统数据强一致性的算法 |
Node | 一个 Raft 状态机实例 |
Member | 一个 Etcd 实例,它管理着一个 Node,并且可以为客户端请求提供服务 |
Cluster | 由多个 Member 构成可以协同工作的 Etcd 集群 |
Peer | 对同一个 Etcd 集群中另外一个 Member 的称呼 |
Client | 向 Etcd 集群发送 HTTP 请求的客户端 |
WAL | 预写式日志,etcd 用于持久化存储的日志格式 |
Snapshot | Etcd 防止 WAL 文件过多而设置的快照,存储 Etcd 数据状态 |
Entry | Raft 算法中的日志的一个条目 |
Proxy | Etcd 的一种模式,为 Etcd 集群提供反向代理服务 |
Leader | Raft 算法中通过竞选而产生的处理所有数据提交的节点 |
Follower | Raft 算法中竞选失败的节点作为从属节点,为算法提供强一致性保证 |
Candidate | 当 Follower 超过一定时间接收不到 Leader 的心跳时(认为 Leader 发生了故障)转变为 Candidate 开始竞选 |
Term | 某个节点成为 Leader 到下一次竞选时间,称为一个 Term |
Vote | 选举时的一张投票 |
Index | 数据项编号,Raft 中通过 Term 和 Index 来定位数据 |
Commit | 一个提交,持久化数据写入到日志中 |
Propose | 一个提议,请求大部分 Node 同意数据写入 |
CommitedIndex | 经过Raft同意提交的数据index |
AppliedIndex | 已经被应用层应用的index |
Compact | 压缩历史版本数据 |
Revision | 数据版本号 |
Apply | 将提交的数据应用到db |