分布式系统:6.Consensus⭐⭐⭐

本文最后更新于:2022年3月15日 中午

6. consensus

6.1 consensus

如果leader宕机了,需要重新选举一个新的leader

共识问题:需要所有节点同意一个事情

Paxos:基于一个值的共识

Muti-Paxos:提供total order broadcast

Raft,Zab:默认total order broadcast

Consensus system models

Paxos,Raft假设一个部分同步,崩溃恢复的模型

FLP result:不可能在完全异步的系统中构建共识算法

因此必须使用时钟来计算超时,检测崩溃,超时时间影响系统的活性(liveness)

leader election

  • 用时钟检测leader是否crash
  • 选举新leader
  • 防止同时有两个leader(脑裂)
  • 每次任期(term)最多只能选出一个leader
  • 一个节点在一次任期内只能选举一次
  • 需要法定人数(quorum)的节点来选举

为了保证只有一个leader:在选出leader之后,发信息之前leader需要再确认一次

6.2 Raft(伪代码)

raft中的状态机

Untitled

1/9 初始化

初始化、崩溃恢复、开始选举

log的存储如下:

Untitled

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
on initialisation do
currentTerm := 0; votedFor := null
//term在每次选举之后+1,
log := <>; commitLength := 0
//log是entry的序列,每一条entry包含我们要发送的信息和当前的term
//log末尾增加,并通过raft将其复制到所有节点,当log的条目被复制到法定人数则确认
//确认的log无法再更改,所有在commit length之前的log都是固定了的
//上面的变量存储在磁盘中,为了错误恢复
//每当有更新,必须先写入磁盘,再做其他的事情
currentRole := follower; currentLeader := null
votesReceived := {}; sentLength := <>; ackedLength := <>
//这些变量可以在易失性存储器上,如RAM
end on

on recovery from crash do
currentRole := follower; currentLeader := null
votesReceived := {}; sentLength := hi; ackedLength := hi
//从崩溃中恢复,恢复上面易失的变量
end on

on node nodeId suspects leader has failed, or on election timeout do
//当节点怀疑leader崩溃,或者到达任期之后
currentTerm := currentTerm + 1; currentRole := candidate
//节点任期 + 1,成为候选人
votedFor := nodeId; votesReceived := {nodeId}; lastTerm := 0
//节点为自己投票
if log.length > 0
then lastTerm := log[log.length − 1].term
end if
//lastTerm变量设置为log中最后一条记录的任期
msg := (VoteRequest, nodeId, currentTerm, log.length, lastTerm)
//msg类型为voteRequest,节点的ID,节点的任期,log的长度,lastTerm
for each node ∈ nodes:
send msg to node
//发送给每一个节点
start election timer
//开启计时器,如果没有收到足够的选票,重启选举
end on

2/9 选举新的leader

主要在于确认候选人是否logOK和termOK

$$
logOK := (cLogTerm > myLogTerm)\vee\(cLogTerm = myLogTerm \wedge cLogLength >= log.length)\
$$

$$
termOK := (cTerm > currentTerm)\vee\(cTerm = currentTerm\wedge votedFor in {cId,null})
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
on receiving (VoteRequest, cId, cTerm, cLogLength, cLogTerm) at node nodeId do
//收到的是voteRequest,C代表的是candidate候选人
//cLogTrem对应发送时的lastTerm
myLogTerm := log[log.length - 1].term
logOK := (cLogTerm > myLogTerm)||(cLogTerm = myLogTerm && cLogLength >= log.length)
//首先确认候选人的log是否OK,如果候选人的log中最后的任期大于自身log中最后的任期,OK
//或者候选人log中最后的任期等于节点自身log中最后的任期,并且候选人的log长度小于自身log长度,OK
termOK := (cTerm > currentTerm)||(cTerm = currentTerm && votedFor ∈ {cId,null})
//确认候选人的任期是否OK,如候选人任期更大,OK
//或者候选人任期与节点自身任期相等,并且(节点已经投票过给候选人或节点还未投票),OK
if logOK && termOK then
//都OK则为候选人投票
currentTerm := cTerm
currentRole := follower
votedFor := cId
send (VoteResponse, nodeId, currentTerm,true) to node cId
//修改任期和候选人一致,修改当前的角色,记下投给了谁,并发送投票
else
send (VoteResponse, nodeId, currentTerm, false) to node cId
//不符合投票要求,投反对票
end if
end on

3/9 收集选票

候选人得到选票后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
on receiving (VoteResponse, voterId, term, granted) at nodeId do
//如果自己不是候选人则什么都不做
if currentRole = candidate && term = currentTerm && granted then
//确认选票是投给自己这一次选举,且是支持票,如果投的票term小于currentTrem直接忽略
votesReceived := votesReceived ∪ {voterId}
//加入收到的选票集合,注意是集合并操作,故具有幂等性
if |votesReceived| ≥ d(|nodes| + 1)/2e then
//每次都判断是否得到了法定人数的选票
currentRole := leader; currentLeader := nodeId
cancel election timer
//修改自身为leader,停止计时器
for each follower ∈ nodes \ {nodeId} do
sentLength[follower ] := log.length
ackedLength[follower ] := 0
//sL以及aL将每一个节点Id映射到一个整数上,分别代表多少msg被发送给了节点,
//以及节点到现在确认了多少msg,这些都是假设有可能会出错
ReplicateLog(nodeId, follower)
//告诉所有节点自己现在是leader
end for
end if
else if term > currentTerm then
//如果得到的选票的任期大于候选人自己的任期,让出选举权,变成follower
currentTerm := term
currentRole := follower
votedFor := null
cancel election timer
end if
end on

4/9 广播消息

分为得到消息广播以及定期广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
on request to broadcast msg at node nodeId do
//leader广播的操作,首先判断是不是leader
if currentRole = leader then
append the record (msg : msg, term : currentTerm) to log
//将msg以及当前任期加入log
ackedLength[nodeId] := log.length
//leader自己确认log
for each follower ∈ nodes \ {nodeId} do
//出了leader的每个节点
ReplicateLog(nodeId, follower )
//复制新的entry到follower
end for
else
//如果不是leader,将消息通过FIFO链路转发到leader
forward the request to currentLeader via a FIFO link
end if
end on

periodically at node nodeId do
//leader每隔一段时间调用复制log的函数,复制log到每个follower
//1.作为心跳告诉follower自己还活着
//2.防止由于网络原因丢失消息,故定期重传
if currentRole = leader then
for each follower ∈ nodes \ {nodeId} do
ReplicateLog(nodeId, follower )
end for
end if
end do

5/9 函数ReplicateLog

每当有新msg到达leader,或者定期调用

如果没有新消息,entries是空列表

logRequest消息和空entries作为心跳,让follower知道leader还活着

1
2
3
4
5
6
7
8
9
10
11
12
13
function ReplicateLog(leaderId, followerId)
prefixLen := sentLength[followerId]
//得到现在已经发送给这个follower的log长度
suffix := <log[prefixLen], log[prefixLen + 1], . . . ,log[log.length − 1]>
//得到所有现在还没有发送给follower的log
prefixTerm := 0
if prefixLen > 0 then
prefixTerm := log[prefixLen − 1].term
//发送的msg附带了之前发送给follower节点中最后一个entry的任期信息,用于一致性检查
end if
send (LogRequest, leaderId, currentTerm, prefixLen,
prefixTerm, commitLength, suffix ) to followerId
end function

6/9 follower收到msg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
on receiving (LogRequest, leaderId, term, prefixLen, prefixTerm,
leaderCommit, suffix ) at node nodeId do
//suffix为leader发来的log,leadercommit就是leader有的commit长度
if term > currentTerm then
currentTerm := term; votedFor := null
cancel election timer
currentRole := follower; currentLeader := leaderId
//如果收到的msg的term大于当前的任期,直接覆盖,忘记自己的投票,关闭选举计时器
end if
//如果term等于当前任期,认为自己是follower(有人已经选举成功了),并且认为发送者为leader
logOk := (log.length ≥ prefixLen) ∧
(prefixLen = 0 ∨ log[prefixLen − 1].term = prefixTerm)
//follower可能已经下线一段时间,跟最新的log有gap,即自身的log和leader认为已经发送给follower的log有gap
//logOK则可以向leader回应ack,代表本地log与leader前缀部分相同,不OK需要同步log
//本地比leader记录的要长舍弃即可
//如果节点本地的log长度小于leader认为的prefix,说明丢失了一些msg,leader发来的消息不够,需要额外请求那部分
//检查完log再检查term,如果prefix为0则不需要检查log(第一次)
//△△△
//否则需要检查自己前缀log中的最后一个任期和leader发送过他的相应entry是否term相同(5中提到的一致性检查)
if term = currentTerm ∧ logOk then
AppendEntries(prefixLen, leaderCommit, suffix )
ack := prefixLen + suffix .length
//在节点自己的log中加入entry,并且计算已经确认的log长度
send (LogResponse, nodeId, currentTerm, ack,true) to leaderId
//ack log给leader
else
send (LogResponse, nodeId, currentTerm, 0, false) to leaderId
//给出nak
end if
end on

7/9 更新follower的log

follower节点更新自己的log(follower’s point of view)

注意区分leader发送的prefixLen不等于leaderCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
function AppendEntries(prefixLen, leaderCommit, suffix )
//leadercommit为leader的commit长度
if suffix .length > 0 && log.length > prefixLen then
//如果leader发过来的entries长度大于0,并且节点log的长度大于leader发来的leader的发送长度
//如果term不同意味着需要舍去本地多余的记录(可能来自老leader未被确认的部分)
//△△△term相同意味着可能node的ack丢失了
index := min(log.length, prefixLen + suffix .length) − 1
//得到最小需要更新的index
if log[index ].term != suffix [index − prefixLen].term then
log := <log[0], log[1], . . . , log[prefixLen − 1]>
end if
//比较能得到的本地log最后的term和leader发过来的term
//如果不一致则丢弃本地的,用leader发来的覆盖
end if
//下面进行append操作
if prefixLen + suffix .length > log.length then
for i := log.length − prefixLen to suffix .length − 1 do
append suffix [i] to log
end for
end if
//下面进行commit操作
if leaderCommit > commitLength then
for i := commitLength to leaderCommit − 1 do
deliver log[i].msg to the application
//将已经提交的log发送给上层应用
end for
commitLength := leaderCommit
end if
end function

8/9 leader收到ack

leader’s point of view

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
on receiving (LogResponse, follower , term, ack, success) at nodeId do
if term = currentTerm ∧ currentRole = leader then
//term相等且自己是leader
if success = true ∧ ack ≥ ackedLength[follower ] then
sentLength[follower] := ack
ackedLength[follower] := ack
//一切正常,两个都置为ack
CommitLogEntries()
else if sentLength[follower] > 0 then
//follower出现了gap,需要fill
sentLength[follower] := sentLength[follower ] − 1
ReplicateLog(nodeId, follower )
//回退一个entry,suffix+1长度之后重试
end if
else if term > currentTerm then
//出现了更大的term,自己变成了follower
currentTerm := term
currentRole := follower
votedFor := null
cancel election timer
//变成follower的操作
end if
end on

9/9 leader提交log entries

任何被法定人数节点确认的log entries需要被leader提交

每当log entry被提交,msg被送给上层应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function CommitLogEntries
while commitLength < log.length do
acks := 0
for each node ∈ {nodes} do
if ackedLength[node] > commitLength then
acks := acks + 1
end if
end for
if acks ≥ [(|nodes| + 1)/2] then
deliver log[commitLength].msg to the application
//被提交的msg交给上层应用
commitLength := commitLength + 1
//commitLength同样也告诉follower节点哪些被commit,可以被提交到上层应用
else
break
end if
end while
end function

补充

每个节点的election timeout随机设定

参考资料:

Raft Consensus Algorithm

Raft (thesecretlivesofdata.com)


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!