Raft

  1. raft基本原理演示请参考 http://thesecretlivesofdata.com/raft/
  2. go语言的raft实现 https://github.com/goraft/raft, 但是该项目早已不再维护,原作者将raft融入到etcd和influxdb(现在貌似不是了)里了,这里摘出etcd的raft实现进行研究

这是etcd/raft包的文档文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package raft sends and receives messages in the Protocol Buffer format
defined in the raftpb package.

Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.

A simple example application, _raftexample_, is also available to help illustrate
how to use this package in practice:
https://github.com/coreos/etcd/tree/master/contrib/raftexample

Usage

# raft里面最的基础对象是Node
The primary object in raft is a Node. You either start a Node from scratch
using raft.StartNode or start a Node from some initial state using raft.RestartNode.

To start a node from scratch:

storage := raft.NewMemoryStorage()
c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

To restart a node from previous state:

storage := raft.NewMemoryStorage()

// recover the in-memory storage from persistent
// snapshot, state and entries.
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)

c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}

// restart raft without peer information.
// peer information is already included in the storage.
n := raft.RestartNode(c)

# 现在你掌握了一个Node,你有些任务需要完成
Now that you are holding onto a Node you have a few responsibilities:

# 第一,你必须从 Node.Ready()通道读取数据,并要对其包含的更新进行操作,这步操作一般
是并发的,除非遇到步骤二
First, you must read from the Node.Ready() channel and process the updates
it contains. These steps may be performed in parallel, except as noted in step
2.

# 1. 将硬件状态,Entries(一个个数据实体)和Snapshot(快照)写入到持久化数据库,如果这
些东西不是空的。注意,如果写入一个Entry的index是i,那之前持久的index大于i的Entries
都将被忽略掉。
1. Write HardState, Entries, and Snapshot to persistent storage if they are
not empty. Note that when writing an Entry with Index i, any
previously-persisted entries with Index >= i must be discarded.

# 2. 将所有的消息发送给字段to里面的节点。这里关键的点是,在硬件状态同步到数据库之前,
并且所有的Entries已经写入到之前准备好的batch中,消息将无法发送。为了减少I/O延迟,可以
通过并发操作,将leader的数据同步给follwers。如果某条消息的类型是MsgSnap,在将数据发
送出去之后需要调用 Node.ReportSnapshot
2. Send all Messages to the nodes named in the To field. It is important that
no messages be sent until the latest HardState has been persisted to disk,
and all Entries written by any previous Ready batch (Messages may be sent while
entries from the same batch are being persisted). To reduce the I/O latency, an
optimization can be applied to make leader write to disk in parallel with its
followers (as explained at section 10.2.1 in Raft thesis). If any Message has type
MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be
large).

# 注意: 编码消息不是并发安全的,所以很重要的一点是你要保证在消息编码的时候没有Entries
持久化。最简单的处理方式就死直接在你的raft主循环里序列化消息
Note: Marshalling messages is not thread-safe; it is important that you
make sure that no new entries are persisted while marshalling.
The easiest way to achieve this is to serialise the messages directly inside
your main raft loop.

# 3. 将Snapshot和已提交的Entries应用到状态机。如果已提交的Entry是类型EntryConfChange,
调用Node.ApplyConfChange()来将它应用到节点。如果在调用ApplyConfChange之前将NodeID设置
成0,那么配置的更改将被取消(但是ApplyConfChange必须通过某种方式调用,并且取消配置变更只有
状态机能做,即使是健康检查也不行)
3. Apply Snapshot (if any) and CommittedEntries to the state machine.
If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
to apply it to the node. The configuration change may be cancelled at this point
by setting the NodeID field to zero before calling ApplyConfChange
(but ApplyConfChange must be called one way or the other, and the decision to cancel
must be based solely on the state machine and not external information such as
the observed health of the node).

# 4.调用 Node.Advance()标志下一个batch已经准备好发送。这在步骤1之后的任何时候都有可
能完成,不过所有的更新必须在按照Ready返回的顺序执行。
4. Call Node.Advance() to signal readiness for the next batch of updates.
This may be done at any time after step 1, although all updates must be processed
in the order they were returned by Ready.

# 第二,持久化的实现需要保证所有的日志实体是可读取的的,比如提供的MemoryStorage,或者
你可以自己实现
Second, all persisted log entries must be made available via an
implementation of the Storage interface. The provided MemoryStorage
type can be used for this (if you repopulate its state upon a
restart), or you can supply your own disk-backed implementation.

# 第三, 当你从其他节点中收到消息的时候,将消息传递给 Node.Step:
Third, when you receive a message from another node, pass it to Node.Step:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}

# 最后,你需要定时调用 Node.Tick()。Raft有两种超时: 心跳超时和选举超时。他们在raft
包被抽象成同一个
Finally, you need to call Node.Tick() at regular intervals (probably
via a time.Ticker). Raft has two important timeouts: heartbeat and the
election timeout. However, internally to the raft package time is
represented by an abstract "tick".

# 综上,状态机的处理循环就像下面这样:
The total state machine handling loop will look something like this:

for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
saveToStorage(rd.State, rd.Entries, rd.Snapshot)
send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
s.Node.Advance()
case <-s.done:
return
}
}

# 要向状态机发起议案,需要将你的节点应用数据序列化成byte数组,然后调用n.Propose(ctx, data)
To propose changes to the state machine from your node take your application
data, serialize it into a byte slice and call:

n.Propose(ctx, data)

# 如果完成提交,数据将变成具有EntryNormal类型的已提交数据。一个提议命令如果已经提交是
没有过期时间的,你必须在超时之后重新发起
If the proposal is committed, data will appear in committed entries with type
raftpb.EntryNormal. There is no guarantee that a proposed command will be
committed; you may have to re-propose after a timeout.

# 添加或移除节点,构建配置然后调用ProposeConfChange
To add or remove node in a cluster, build ConfChange struct 'cc' and call:

n.ProposeConfChange(ctx, cc)

# 在配置更改提交之后,一些有着EntryConfChange类型的Entries会被返回,解析后通过
ApplyConfChange应用它们
After config change is committed, some committed entry with type
raftpb.EntryConfChange will be returned. You must apply it to node through:

var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)

# 注意,节点ID必须总是唯一
Note: An ID represents a unique node in a cluster for all time. A
given ID MUST be used only once even if the old node has been removed.
This means that for example IP addresses make poor node IDs since they
may be reused. Node IDs must be non-zero.

# 实现笔记
Implementation notes

# 实现基本是跟着Raft thesis 同步更新的,但是还是与第4章节描述的有所差异。关键的差异
是,保留在节点memberships一次改变时,我们的实现时在entry被应用的时候,而不是它被加
到log中的时候(因此,entry在提交后还是原来的成员而不是新的)。这是为了安全性
This implementation is up to date with the final Raft thesis
(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
implementation of the membership change protocol differs somewhat from
that described in chapter 4. The key invariant that membership changes
happen one node at a time is preserved, but in our implementation the
membership change takes effect when its entry is applied, not when it
is added to the log (so the entry is committed under the old
membership instead of the new). This is equivalent in terms of safety,
since the old and new configurations are guaranteed to overlap.

# 为了确保不会一次性提交两次成员变化,我们在leader节点没有提交完成的时候,不允许提起成
员变化
To ensure that we do not attempt to commit two membership changes at
once by matching log positions (which would be unsafe since they
should have different quorum requirements), we simply disallow any
proposed membership change while any uncommitted change appears in
the leader's log.

# 这里介绍了这样一个问题: 当只有两个节点的时候,如果一个成员在另一个节点收到配置变化
提交之前死亡,那么集群将会完全失效,因此强烈建议构建三个以上节点的集群
This approach introduces a problem when you try to remove a member
from a two-member cluster: If one of the members dies before the
other one receives the commit of the confchange entry, then the member
cannot be removed any more since the cluster cannot make progress.
For this reason it is highly recommended to use three or more nodes in
every cluster.

# 消息类型
MessageType

Package raft sends and receives message in Protocol Buffer format (defined
in raftpb package). Each state (follower, candidate, leader) implements its
own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when
advancing with the given raftpb.Message. Each step is determined by its
raftpb.MessageType. Note that every step is checked by one common method
'Step' that safety-checks the terms of node and incoming message to prevent
stale log entries:

# MsgHup用于选举过程。如果在一个tickElection中,节点没有收到心跳,就发送MsgHup
给Step,并成为候选者
'MsgHup' is used for election. If a node is a follower or candidate, the
'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
candidate has not received any heartbeat before the election timeout, it
passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
start a new election.

# MsgBeat 是节点内部的心跳消息类型,由主节点定时向各follew发起
'MsgBeat' is an internal type that signals the leader to send a heartbeat of
the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
send periodic 'MsgHeartbeat' messages to its followers.

# MsgProp 发起提议,用来将议案转发给主节点。这里send方法用硬件状态的Term重写了
Message的Term以避免直接将本地的term附加给MsgProp。当MsgProp传递给主节点,主节
点立即将消息append到日志队列,然后调用bcastAppend将这些entried广播给各个节点。
如果发送给了候选者,会被候选者丢掉,当发给跟随者,跟随者将会通过send发放将消息存
入mailbox(消息)。消息存储的时候会带上发送者的ID然后通过raftHttp发送给leader。
'MsgProp' proposes to append data to its log entries. This is a special
type to redirect proposals to leader. Therefore, send method overwrites
raftpb.Message's term with its HardState's term to avoid attaching its
local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
method, the leader first calls the 'appendEntry' method to append entries
to its log, and then calls 'bcastAppend' method to send those entries to
its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
method. It is stored with sender's ID and later forwarded to leader by
rafthttp package.

# MsgApp 包含了要复制的日志,主节点bcastAppend的时候会通过sendAppend发送MsgApp
消息,意味着消息需要尽快发送出去。当MsgApp发送给候选者,候选者会重新变回跟随者,因为
这意味着这是合法leader发过来的消息,就不需要再选举了。候选者和跟随者会回
MsgAppResp消息
'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
'MsgAppResp' type.

# MsgAppResp,日志复制回复
'MsgAppResp' is response to log replication request('MsgApp'). When
'MsgApp' is passed to candidate or follower's Step method, it responds by
calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft
mailbox.

# MsgVote 是用来请求选票的。当跟随者或者候选人发送MsgHup消息给它的Step方法之后,
节点会调用campaign方法参加竞选,使自己成为候选人。一旦campaign调用,节点及腰发
送MsgVote给集群的每个成员要求选票。当消息被传递给leader或者候选人的Step方法,并且
消息的term比他们自己term小,选举会被拒绝(MsgVoteResp回复附带拒绝标志)。如果
leader或者候选者收到的选举消息附带的Term更大,他们会转化成follower。当消息传递
给follower,只有当发送者当term和lastCommitted index大于等于接收节点的时候,才
会投票
'MsgVote' requests votes for election. When a node is a follower or
candidate and 'MsgHup' is passed to its Step method, then the node calls
'campaign' method to campaign itself to become a leader. Once 'campaign'
method is called, the node becomes candidate and sends 'MsgVote' to peers
in cluster to request votes. When passed to leader or candidate's Step
method and the message's Term is lower than leader's or candidate's,
'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true).
If leader or candidate receives 'MsgVote' with higher term, it will revert
back to follower. When 'MsgVote' is passed to follower, it votes for the
sender only when sender's last term is greater than MsgVote's term or
sender's last term is equal to MsgVote's term but sender's last committed
index is greater than or equal to follower's.

# 选票结果,候选人接收到后统计票数,超过半数则成为leader,broadcast广告给所有节点。
否则从新变成follower
'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is
passed to candidate, the candidate calculates how many votes it has won. If
it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
If candidate receives majority of votes of denials, it reverts back to
follower.

# MsgPreVote 和 MsgPreVoteResp 用于二阶段选举。当配置的PreVote是true的时
候,二阶段选举才会派上用场。并且节点不会增加term,除非pre-election已经告知了竞选
节点胜出。这样减少当分区节点重新加入时候的混乱。
'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
protocol. When Config.PreVote is true, a pre-election is carried out first
(using the same rules as a regular election), and no node increases its term
number unless the pre-election indicates that the campaigining node would win.
This minimizes disruption when a partitioned node rejoins the cluster.

# MsgSnap 要求加载消息。当一个节点刚成为leader的时候,或者leader收到MsgProp的
时候,它调用bcastAppend方法给个节点同步消息的时候,如果获取不到term或者
entries,leader就会发送MsgSnap获取快照。
'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.

# MsgSnapStatus 告知快照结果。当follower拒绝返回,就意味着主节点与之网络连接
失败,主节点就会考虑对该节点进行网络探测。当返回了,意味着foller接收了快照并恢复了
日志复制
'MsgSnapStatus' tells the result of snapshot install message. When a
follower rejected 'MsgSnap', it indicates the snapshot request with
'MsgSnap' had failed from network issues which causes the network layer
to fail to send out snapshots to its followers. Then leader considers
follower's progress as probe. When 'MsgSnap' were not rejected, it
indicates that the snapshot succeeded and the leader sets follower's
progress to probe and resumes its log replication.

# MsgHeartbeat leader给follwer发心跳。当心跳发送给候选人,如果term高于候选
人,候选人重新成为follower并更新提交索引。如果发送给follower,候选人更新
leaderId
'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed
to candidate and message's term is higher than candidate's, the candidate
reverts back to follower and updates its committed index from the one in
this heartbeat. And it sends the message to its mailbox. When
'MsgHeartbeat' is passed to follower's Step method and message's term is
higher than follower's, the follower updates its leaderID with the ID
from the message.

# MsgHeartbeatResp, 心跳返回,附带follower的日志id,点那个日志id比leader大的
时候,发送append
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.

# MsgUnreachable 表示消息发送失败。当主节点收到该消息的时候,主节点会发现发送该消
息的节点不可达,基本意味着MsgApp消息丢失。
'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
that the follower that sent this 'MsgUnreachable' is not reachable, often
indicating 'MsgApp' is lost. When follower's progress state is replicate,
the leader sets it back to probe.

*/
package raft