浅析Consul集群的管理
作为一名后端开发,难免在开发过程中用到各种中间件,对于所用到的东西,我们需要对其有所理解,并且在理解中学习。Consul作为一个服务发现与注册中心已经被广泛运用到分布式项目生产中,而本身带有分布式属性的Consul对作为后端开发的我具有很好的学习意义,了解Consul的架构及其原理有助于提升自身技术水平。
consul: https://github.com/hashicorp/consul/tree/release/1.14.3
sref: https://github.com/hashicorp/serf/tree/master
memberlist: https://github.com/hashicorp/memberlist/tree/master
1 了解Consul
Consul是一个基于Golang编写的,支持服务注册、服务发现、健康检查等功能的服务发现与注册中心。Consul支持单机、集群、跨集群部署,且提供的KV存储功能还可将Consul作为配置中心,开箱即用方便快捷。
Consul is a service networking solution that enables teams to manage secure network connectivity between services and across multi-cloud environments and runtimes. Consul offers service discovery, identity-based authorization, L7 traffic management, and service-to-service encryption.
这是Consul官方文档对Consul的描述,虽然Consul也提供了KV存储功能,但与Etcd(分布式KV数据库)相比,Consul官方把Consul定义为一个服务注册与发现中心。
Consul 使用 gossip 协议来同步集群节点信息。Gossip 协议是一种去中心化的协议,可以让节点在彼此之间以点对点的方式交换信息。Gossip 协议基于以下原则:
- 每个节点都维护一个关于集群中其他节点的视图;
- 每个节点以一定的频率将视图中的一部分信息发送给一些随机选择的邻居节点;
- 每个节点在接收到邻居节点发送来的信息时,会用该信息更新自己维护的视图。
Consul有两种启动模式,分别是server和agent(官方把两种模式统称为agent),我们可以把agent看作是一个负载均衡的proxy,把server看作真正的Consul服务。在Consul的设计中,agent是无状态的,server是有状态的,所有的数据只能存在server上,同时,Consul也支持跨数据中心异地集群,server通过WAN在集群间进行通信以进行数据同步。在Consul中用了多种一致性协议来同步不同类型的数据,对于Consul本身的集群与节点信息使用gossip协议达到最终一致性,而对于KV数据Consul则使用了raft协议来达到强一直性。
2 Gossip——最终一致性协议
与Raft这种变更即刻感知的一致性协议不同,Gossip是一个最终的一致性协议,意思就是当数据发生了改变后,客户端立刻对数据发起请求得到的结果可能并不是上次操作的结果,等到一定时间后再次请求才会得到正确的结果,但无论如何,这个份数据最终还是准确的。
当Gossip节点数据发生改变时,节点会随机选择其他几个节点发送此次数据变更,其他节点收到后又会随机选择几个节点发送这个数据变更,如此往复直到集群中所有的节点都收到了该消息为止,由于消息同步到每个节点需要多轮的传播,所以某一时刻的数据一致性是不能被保证的,但在正常情况下每个节点最终都会收到这个消息,所以是最终一致性的。这有点像农村里面妇人们聊的八卦,一传十十传百,最终到达每个人的耳朵里面,区别是这些八卦并不会随着传播而出现不同的版本。
3 Serf——Consul中基于gossip管理集群接口的封装
Serf是Consul使用的基础协议,用于管理集群中成员之间的通信和状态。在Consul源码中,Consul使用了由Consul同一个公司所实现的集群节点管理库——Serf,这个库基于另外一个库实现的Gossip协议对集群中节点状态数据进行管理。
3.1 server中Serf的初始化
在Consul的NewServer函数(agent/consul/server.go:398)中会进行Serf的创建,SerfLAN默认会被创建的,这个Serf被用于集群内节点信息的同步,而SerfWAN则是通过判断config.SerfWANConfig != nil来创建的,这个Serf则是被用于跨集群(跨数据中心)的节点信息同步,这也对应了前面的Consul跨集群架构图。
// consul:agent/consul/server.go:398
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
...
if config.SerfWANConfig != nil {
s.serfWAN, s.serfWANConfig, err = s.setupSerf(setupSerfOptions{
Config: config.SerfWANConfig,
EventCh: s.eventChWAN,
SnapshotPath: serfWANSnapshot,
WAN: true,
Listener: s.Listener,
})
...
}
...
// Initialize the LAN Serf for the default network segment.
if err := s.setupSerfLAN(config); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
...
go s.lanEventHandler()
...
}
Server结构体(agent/consul/server.go:147)的setupSerfLAN函数(agent/consul/enterprise_server_oss.go:95)与NewServer函数中创建SerfWAN做的事情一样,本质上都是调用setupSerf函数(agent/consul/server_serf.go:52),最终调用serf库的Create函数创建一个节点。
// consul:agent/consul/server_serf.go:52
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, *serf.Config, error) {
conf, err := s.setupSerfConfig(opts)
if err != nil {
return nil, nil, err
}
cluster, err := serf.Create(conf)
if err != nil {
return nil, nil, err
}
return cluster, conf, nil
}
需要注意的是setupSerf函数的入参数setupSerfOptions结构体(agent/consul/server_serf.go:37),其中的EventCh则是在LAN/WAN上发生的事件的广播。通过继续阅读源码发现,NewServer中在SerfLAN和SerfWAN都初始化后,开启协程监听这两个channel,并处理对应事件。
// consul:agent/consul/server_serf.go:37
type setupSerfOptions struct {
Config *serf.Config
EventCh chan serf.Event
SnapshotPath string
Listener net.Listener
// WAN only
WAN bool
// LAN only
Segment string
Partition string
}
// consul:agent/consul/server.go:398
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Server, error) {
...
go s.lanEventHandler()
...
// Add a "static route" to the WAN Serf and hook it up to Serf events.
if s.serfWAN != nil {
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN, s.wanMembershipNotifyCh)
// Fire up the LAN <-> WAN join flooder.
addrFn := func(s *metadata.Server) (string, error) {
if s.WanJoinPort == 0 {
return "", fmt.Errorf("no wan join port for server: %s", s.Addr.String())
}
addr, _, err := net.SplitHostPort(s.Addr.String())
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%d", addr, s.WanJoinPort), nil
}
go s.Flood(addrFn, s.serfWAN)
}
...
}
在Server结构体的lanEventHandler函数中(agent/consul/server_serf.go:265),不断监听着eventChLAN的消息,并分类型处理。如果启用了跨集群(跨数据中心)的话,还会调用HandleSerfEvents函数(agent/router/serf_adapter.go:58)监听来自其他数据中心的集群节点状态的变更,并且分类处理变更。LAN与WAN的handler函数总的来说就是监听消息然后分类处理,两者实际上极为相似。
// consul:agent/consul/server_serf.go:265
func (s *Server) lanEventHandler() {
for {
select {
case e := <-s.eventChLAN:
switch e.EventType() {
case serf.EventMemberJoin:
s.lanNodeJoin(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed, serf.EventMemberReap:
s.lanNodeFailed(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventUser:
s.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate:
s.lanNodeUpdate(e.(serf.MemberEvent))
s.localMemberEvent(e.(serf.MemberEvent))
case serf.EventQuery: // Ignore
default:
s.logger.Warn("Unhandled LAN Serf Event", "event", e)
}
case <-s.shutdownCh:
return
}
}
}
// consul:agent/router/serf_adapter.go:58
func HandleSerfEvents(
logger hclog.Logger,
router *Router,
areaID types.AreaID,
shutdownCh <-chan struct{},
eventCh <-chan serf.Event,
membershipNotifyCh chan<- struct{},
) {
for {
select {
case <-shutdownCh:
return
case e := <-eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberLeave, serf.EventMemberReap:
handleMemberEvent(logger, router.RemoveServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberUpdate:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
// All of these event types are ignored.
case serf.EventUser:
case serf.EventQuery:
default:
logger.Warn("Unhandled Serf Event", "event", e)
}
}
}
}
在LAN的handler中,以nodeJoin函数(agent/consul/server_serf.go:349)为例,serverLookup结构体(agent/consul/server.go:313)的AddServer函数(agent/consul/server_lookup.go:25)往内存中存节点信息。
// consul:agent/consul/server.go:313
func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m)
if !ok || serverMeta.Segment != "" {
continue
}
s.logger.Info("Adding LAN server", "server", serverMeta.String())
// Update server lookup
s.serverLookup.AddServer(serverMeta)
// If we're still expecting to bootstrap, may need to handle this.
if s.config.BootstrapExpect != 0 {
s.maybeBootstrap()
}
// Kick the join flooders.
s.FloodNotify()
}
}
// consul:agent/consul/server_lookup.go:25
func (sl *ServerLookup) AddServer(server *metadata.Server) {
sl.lock.Lock()
defer sl.lock.Unlock()
sl.addressToServer[raft.ServerAddress(server.Addr.String())] = server
sl.idToServer[raft.ServerID(server.ID)] = server
}
在WAN的handler中,以成员加入事件为例(agent/router/serf_adapter.go:74),在过程中执行Router结构体(agent/router/router.go:22)的addServer函数(agent/router/router.go:279),先取得manager并调用AddServer,然后通过rpc发送至其他server中。
3.2 client(agent)中Serf的初始化
相比server的初始化,agent(client)的初始化就简单很多了,NewClient函数(agent/consul/client.go:94)其最主要的就是创建Serf。
// consul:agent/consul/client.go:94
func NewClient(config *Config, deps Deps) (*Client, error) {
if err := config.CheckProtocolVersion(); err != nil {
return nil, err
}
if config.DataDir == "" {
return nil, fmt.Errorf("Config must provide a DataDir")
}
if err := config.CheckACL(); err != nil {
return nil, err
}
c := &Client{
config: config,
connPool: deps.ConnPool,
eventCh: make(chan serf.Event, serfEventBacklog),
logger: deps.Logger.NamedIntercept(logging.ConsulClient),
shutdownCh: make(chan struct{}),
tlsConfigurator: deps.TLSConfigurator,
}
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
if err := c.initEnterprise(deps); err != nil {
c.Shutdown()
return nil, err
}
aclConfig := ACLResolverConfig{
Config: config.ACLResolverSettings,
Backend: &clientACLResolverBackend{Client: c},
Logger: c.logger,
DisableDuration: aclClientDisabledTTL,
CacheConfig: clientACLCacheConfig,
ACLConfig: newACLConfig(&partitionInfoNoop{}, c.logger),
Tokens: deps.Tokens,
}
var err error
if c.ACLResolver, err = NewACLResolver(&aclConfig); err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to create ACL resolver: %v", err)
}
// Initialize the LAN Serf
c.serf, err = c.setupSerf(config.SerfLANConfig, c.eventCh, serfLANSnapshot)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
}
if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
c.Shutdown()
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
}
c.router = deps.Router
// Start LAN event handlers after the router is complete since the event
// handlers depend on the router and the router depends on Serf.
go c.lanEventHandler()
return c, nil
}
4 有趣的的channel过滤
了解了Serf在Consul的基本创建流程后,我们需要了解Serf中Gossip是如何作用的,以NewClient中创建的Serf为例。NewClient调用setupSerf函数传入了一个c.eventCh,在setupSerf函数(agent/consul/client_serf.go:19)中conf.EventCh = ch然后Create一个Sref,这个channel就是Consul中用来传递事件信息的。
// consul:agent/consul/client_serf.go:19
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
...
conf.EventCh = ch
...
return serf.Create(conf)
}
在进一步阅读Serf的源码中发现一个有意思的东西,下面有一个demo,通过channel的传递,解耦出不同类型的消息对应不同的处理函数,每个函数只负责处理自己要处理的事件而与自己无关的则继续传递下去。在前面的setupSerf函数中,我们将channel赋值给了config.EventCh,这个channel在Serf库的Create函数(serf/serf.go:249)中不断变换,最后Consul的NewClient使用一个go c.lanEventHandler()来进行监听以及处理对应事件。这个设计就像一个过滤器,所有的信息都往一个地方写然后每个过滤器过滤自己需要处理的信息,如果不是自己需要处理的则继续往下传,最后到lanEventHandler函数中处理。
func main() {
ch := make(chan string, 1024)
ch = A(ch)
ch = B(aCh)
ff := []string{"AFunc", "BFunc"}
go func() {
for {
n := ff[rand.Int()%2]
fmt.Println("main: send ", n)
ch <- n
time.Sleep(time.Second * 5)
}
}()
time.Sleep(time.Second * 60)
}
func A(ch chan string) chan string {
newCh := make(chan string, 1024)
go func() {
for {
select {
case e := <-newCh:
if strings.Contains(e, "AFunc") {
fmt.Println("A func do something")
} else {
ch <- e
}
}
}
}()
return newCh
}
func B(ch chan string) chan string {
newCh := make(chan string, 1024)
go func() {
for {
select {
case e := <-newCh:
if strings.Contains(e, "BFunc") {
fmt.Println("B func do something")
} else {
ch <- e
}
}
}
}()
return newCh
}
// serf:serf/serf.go:249
func Create(conf *Config) (*Serf, error) {
...
// Check if serf member event coalescing is enabled
if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
c := &memberEventCoalescer{
lastEvents: make(map[string]EventType),
latestEvents: make(map[string]coalesceEvent),
}
conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
conf.CoalescePeriod, conf.QuiescentPeriod, c)
}
// Check if user event coalescing is enabled
if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
c := &userEventCoalescer{
events: make(map[string]*latestUserEvents),
}
conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c)
}
outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh)
if err != nil {
return nil, fmt.Errorf("Failed to setup serf query handler: %v", err)
}
conf.EventCh = outCh
...
}
5 由Consul到Serf再到Memberlist
搭建Consul集群时,在Consul启动后可以执行consul join xxx加入集群,实际上执行这个命令就是向ConsulAPI的/v1/agent/join/发送了一个请求,如果加入一个LAN/WAN节点主要是由Serf提供的,Agent结构体(agent/agent.go:209)的JoinWAN函数(agent/agent.go:1724)调用Server结构体(agent/consul/server.go:147)的JoinWAN函数(agent/consul/server.go:1320),最后调用的是Serf中的Join函数(serf/serf.go:641)。在Serf中的Join函数中又调用了s.broadcast,然后又在其中调用s.broadcasts.QueueBroadcast,而s.broadcasts就是Memberlist中的TransmitLimitedQueue,根据代码中的注解可知TransmitLimitedQueue用于将消息排队以广播到集群。
// serf:serf/serf.go:641
func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
...
num, err := s.memberlist.Join(existing)
// If we joined any nodes, broadcast the join message
if num > 0 {
// Start broadcasting the update
if err := s.broadcastJoin(s.clock.Time()); err != nil {
return num, err
}
}
return num, err
}
在Join函数中,会执行s.memberlist.Join函数,这个Memberlist实际上才是真正实现gossip的地方,serf作为condul的一个库被consul引用,同样memberlist作为serf中一个库被serf引用。在Consul调用Serf的Create的时候,也会创建出memberlist。当Memberlist被创建的时候,还不会连接任何节点,只会开启监听并允许其他节点加入,并在newMemberlist中开启对信息的监听与处理的协程。
// serf:serf/serf.go:249
func Create(conf *Config) (*Serf, error) {
...
serf.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: serf.NumNodes,
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
}
...
memberlist, err := memberlist.Create(conf.MemberlistConfig)
if err != nil {
return nil, fmt.Errorf("Failed to create memberlist: %v", err)
}
serf.memberlist = memberlist
...
}
streamListen方法是用来监听并处理tcp网络连接的,packetListen是用来监听udp连接的。前文提及Serf LAN用于数据中心内gossip交换数据,SerfWAN用于跨数据中心gossip交换数据用,这两种serf都可以使用tcp或udp传输数据,这以特性可以在memberlist初始化网络的源码中找到(net_transport.go:66)。通过阅读newMemberlist开启的几个协程的源码可知udp主要用于心跳检测,tcp主要用于交换节点数据也可以用于处理ping。
// memberlist:memberlist.go:97
func newMemberlist(conf *Config) (*Memberlist, error) {
...
go m.streamListen()
go m.packetListen()
go m.packetHandler()
go m.checkBroadcastQueueDepth()
...
}
创建memberlist的时候会调用schedule函数用来处理一些后台任务,比如gossip,schedule里会定时向其他节点发送消息以同步集群节点信息。
// memberlist:memberlist.go:245
func Create(conf *Config) (*Memberlist, error) {
m, err := newMemberlist(conf)
if err != nil {
return nil, err
}
if err := m.setAlive(); err != nil {
m.Shutdown()
return nil, err
}
m.schedule()
return m, nil
}
// memberlist: state.go:122
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// If we already have tickers, then don't do anything, since we're
// scheduled
if len(m.tickers) > 0 {
return
}
// Create the stop tick channel, a blocking channel. We close this
// when we should stop the tickers.
stopCh := make(chan struct{})
// Create a new probeTicker
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
m.tickers = append(m.tickers, t)
}
// Create a push pull ticker if needed
if m.config.PushPullInterval > 0 {
go m.pushPullTrigger(stopCh)
}
// Create a gossip ticker if needed
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
m.tickers = append(m.tickers, t)
}
// If we made any tickers, then record the stopTick channel for
// later.
if len(m.tickers) > 0 {
m.stopTick = stopCh
}
}
Memberlist结构体(memberlist.go:38)的Join函数(memberlist.go:267)中对加入的节点进行遍历然后执行pushPullNode方法,与远程服务器建立连接并发送数据,然后对数据进行合并处理,此时节点已经初步注册进consul集群了。
// memberlist:memberlist.go:267
func (m *Memberlist) Join(existing []string) (int, error) {
numSuccess := 0
var errs error
for _, exist := range existing {
addrs, err := m.resolveAddr(exist)
if err != nil {
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[WARN] memberlist: %v", err)
continue
}
for _, addr := range addrs {
hp := joinHostPort(addr.ip.String(), addr.port)
a := Address{Addr: hp, Name: addr.nodeName}
if err := m.pushPullNode(a, true); err != nil {
err = fmt.Errorf("Failed to join %s: %v", a.Addr, err)
errs = multierror.Append(errs, err)
m.logger.Printf("[DEBUG] memberlist: %v", err)
continue
}
numSuccess++
}
}
if numSuccess > 0 {
errs = nil
}
return numSuccess, errs
}
// memberlist:state.go:670
func (m *Memberlist) pushPullNode(a Address, join bool) error {
defer metrics.MeasureSinceWithLabels([]string{"memberlist", "pushPullNode"}, time.Now(), m.metricLabels)
// Attempt to send and receive with the node
remote, userState, err := m.sendAndReceiveState(a, join)
if err != nil {
return err
}
if err := m.mergeRemoteState(join, remote, userState); err != nil {
return err
}
return nil
}
到此,我们已经向集群中那个ip为xxx的节点发送了我们的join请求并交换了节点信息。节点初步加入consul集群后,我们需要广播我们的加入信息,调用broadcastJoin函数(serf/serf.go:678)广播加入信息。