RocketMQ消息存储流程源码分析(一)

本篇主要分析消息存储的流程,也即消息写到Buffer的过程(这也是为什么RocketMQ性能高的原因),刷盘的过程我们放到下篇分析,我们从消息接收的部分开始。

Broker消息接收

SendMessageProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
//解析消息头,如果消息头为空,直接返回
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
//处理存储消息前的逻辑
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
//处理批量消息
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
//处理单条消息
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
//处理存储消息后的逻辑
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//初始化响应
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

log.debug("receive SendMessage request command, {}", request);

//broker开始服务的时间,默认值是0,broker的一个配置参数startAcceptSendRequestTimeStamp
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
/**
* Topic校验
* 1. 检查broker是否可写
* 2. Topic是否可以发送消息,目前是Topic "TW102" 不能发送消息
* 3. 检查Topic是否存在,若不存在则创建,此流程在自动创建Topic时会执行
* 4. 检查队列编号是否正确
*/
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
return response;
}
//获取消息body
final byte[] body = request.getBody();

int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

//如果队列小于0,则从可用队列中随机选择一个
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}

//构建Broker使用的Message对象MessageExtBrokerInner
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);

//如果是消费重试的Topic,如果重试次数达到上线,就会进入死信队列
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
return response;
}

msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
//处理事务消息,这里不展开分析
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
//处理非事务消息,调用DefaultMessageStore#putMessage方法进行处理
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
//处理消息结果
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}
}

Broker消息存储

DefaultMessageStore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class DefaultMessageStore implements MessageStore {
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

// Broker角色为slave,不进行后续处理
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

// Broker是否可写
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}

// 校验Topic长度是否过长
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}

// 校验Message Properties长度是否过长
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}

//
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

long beginTime = this.getSystemClock().now();
// 调用commitLog的putMessage方法存储消息
PutMessageResult result = this.commitLog.putMessage(msg);

long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}

return result;
}
}

CommitLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

// 定时消息,此处暂不分析
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//获取最后一个映射文件,参看MappedFileQueue
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

// 默认使用自旋锁,一直循环直到putMessageLock释放锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// 如果映射文件为空(如写第一条消息)或者是映射文件已经写满了
if (null == mappedFile || mappedFile.isFull()) {
// 创建映射文件,一次创建两个,会提交到AllocateMappedFileService进行创建
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 如果创建失败了,直接返回
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
/**
* 将消息写入MappedFile,调用DefaultAppendMessageCallback的doAppend方法
* 1. 如果是同步刷盘,则写入MappedByteBuffer;
* 2. 如果是异步刷盘,分两种情况:
* 2.1 如果配置了transientStorePoolEnable为true,即开启堆外内存,则先写入堆外内存,也即ByteBuffer.allocateDirect()分配的直接内存
* 2.2 如果未配置transientStorePoolEnable,则写入MappedByteBuffer
*/
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
//如果文件写满了,重新创建MappedFile,重新进行消息写入
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

//统计信息
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

//刷盘逻辑,分为同步刷盘与异步刷盘
handleDiskFlush(result, putMessageResult, msg);
//主从同步
handleHA(result, putMessageResult, msg);

return putMessageResult;
}

class DefaultAppendMessageCallback implements AppendMessageCallback {
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

//消息在commitlog的偏移量
long wroteOffset = fileFromOffset + byteBuffer.position();

this.resetByteBuffer(hostHolder, 8);
// 生成消息唯一ID
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

// Record ConsumeQueue information,消息在消息消费队列的偏移量,写完消息后会进行自增
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}

// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}

/**
* Serialize message
*/
//计算properties的长度
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}

//计算topic的长度
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;

//计算body的长度
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

//计算整个消息长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

// Determines whether there is sufficient free space
//如果文件不够存储这条消息,则往文件里写当前文件的声誉空间 + 魔数BLANK_MAGIC_CODE,共8个字节,也就是Commitlog文件最少会空闲8个字节
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
//msgStoreItemMemory是ByteBuffer.allocate()分配的堆内内存,首先将消息写入堆内内存,然后将消息写入MappedByteBuffer或者是堆外直接内存
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

//queueOffset进行自增
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
}
}

以上内容是将消息写入mappedFile,消息的存储格式如下:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode 消息魔数,固定值为0xdaa320a7 Int 4
3 BodyCRC 消息内容CRC校验码 Int 4
4 QueueId 消息消费队列编号 Int 4
5 Flag 消息Flag,RocketMQ不做处理 Int 4
6 QueueOffset 消息在消费队列的偏移量 Long 8
7 PhysicalOffset 消息在 CommitLog的文件中的偏移量 Long 8
8 SysFlag 消息系统Flag,入是否压缩,是否事物消息 Int 4
9 BornTimestamp 生产者生成消息时间戳 Long 8
10 BornHost 生产者的IP和端口号 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost Broker服务器的IP和端口号 Long 8
13 ReconsumeTimes 消费重试次数 Int 4
14 PreparedTransationOffset 事物消息物理偏移量 Long 8
15 BodyLength 消息体长度 Int 4
16 Body 消息体,长度为BodyLength存的值 Bytes
17 TopicLength Topic长度 Byte 1
18 Topic Topic内容,长度为TopicLength存的值 Bytes
19 PropertiesLength 消息属性长度 Short 2
20 Properties 消息属性,长度为PropertiesLength存的值 Bytes

接下来分析MappedFile的创建流程,涉及到MappedFileQueue、AllocateMappedFileService、MappedFile

MappedFileQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class MappedFileQueue {
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

private final AllocateMappedFileService allocateMappedFileService;

/**
* 创建映射文件,一次创建两个,会提交到AllocateMappedFileService进行创建
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;

//获取最后一个映射文件
MappedFile mappedFileLast = getLastMappedFile();

//如果为空,则代表第一次创建,则createOffset = 0
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}

//如果最后一个映射文件写满了,则createOffset = 最后一个映射文件的偏移量 + mappedFileSize(默认为1G)
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}

if (createOffset != -1 && needCreate) {
//创建两个MappedFile
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;

//allocateMappedFileService在DefaultMessageStore里进行初始化
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}

//将mappedFile加入到mappedFiles中
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}

return mappedFile;
}

return mappedFileLast;
}
}

AllocateMappedFileService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
public class AllocateMappedFileService extends ServiceThread {
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
//开启了堆外内存
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}

/**
* 这里为什么会创建两个AllocateRequest,下面分析一下,设计思想还是非常不错的
* 1. 如果是第一次创建,假如mappedFileSize为1000,则这里要创建的是00000000000000000000 和 0000000000000001000
* 然后在mmapOperation异步去创建这两个文件,00000000000000000000文件创建好了之后然后返回,mappedFileQueue会将00000000000000000000加入到mappedFiles,且是最后一个mappedFile
* 2. 如果是第二次创建,0文件已经写满了,且这个时候从MappedFileQueue获取最后一个文件还是00000000000000000000,所以这一次要创建的是0000000000000001000 和 0000000000000002000
* 这时从requestTable里就能拿出上一步创建0000000000000001000的AllocateRequest,如果已经创建好了,则直接返回,同时也会去创建0000000000000002000
* 总的来说就是预先把下一个要用的MappedFile创建好,当前的MappedFile写满之后直接能拿到下一个MappedFile
*/
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}

AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}

if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}

AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
//等待mappedFile创建完成
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
//mappedFile创建完成后将nextFilePath:AllocateRequest从requestTable移除,并将mappedFile返回
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}

return null;
}

public void run() {
log.info(this.getServiceName() + " service started");

//循环创建mappedFile
while (!this.isStopped() && this.mmapOperation()) {

}
log.info(this.getServiceName() + " service end");
}

/**
* Only interrupted by the external thread, will return false
*/
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}

if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
//如果是异步刷盘,且开启了堆外内存,默认会走这个逻辑,创建mappedFile
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
//如果是同步刷盘,或者是异步刷盘(未开启堆外内存),走这个逻辑创建mappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

//计算耗时
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}

//TODO 文件预热,Page Cache性能提升相关,此处暂不做分析
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}

req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
//创建失败后,会将AllocateRequest加入到requestQueue,等待下一次继续创建
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
}

MappedFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class MappedFile extends ReferenceResource {
//当前该文件的写指针,从0开始
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
//当前该文件的提交指针,如果开启transientStorePoolEnable(且为异步刷盘),数据会首先存储在transientStorePool中,然后提交到ByteBuffer中,再刷盘到磁盘
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//刷写到磁盘的指针,该指针之前的数据持久化到磁盘中
private final AtomicInteger flushedPosition = new AtomicInteger(0);

//堆外内存池
protected TransientStorePool transientStorePool = null;
//物理内存对应的内存映射
private MappedByteBuffer mappedByteBuffer;

//两种构造方式
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}

public MappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}

public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
//从堆外内存池获取ByteBuffer
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
if (this.writeBuffer != null) {
System.out.println("invoke....");
}
}

private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;

ensureDirOK(this.file.getParent());

try {
//创建文件映射,java nio
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
}

这个时候我们已经将消息写入内存中了,但是怎么落盘呢,接下来我们分析同步刷盘与异步刷盘。

同步刷盘

CommitLog
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//同步刷盘处理逻辑
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
//构造GroupCommitRequest,并将GroupCommitRequest提交到GroupCommitService进行处理
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush 异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// FlushRealTimeService
flushCommitLogService.wakeup();
} else {
// CommitRealTimeService
commitLogService.wakeup();
}
}
}

GroupCommitService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
class GroupCommitService extends FlushCommitLogService {
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
//有任务添加进来,当hasNotified为false时,将hasNotified置为true,且唤醒GroupCommitService线程
if (hasNotified.compareAndSet(false, true)) {
System.out.println("invoke GroupCommitService..." + Thread.currentThread());
waitPoint.countDown(); // notify
}
}

//交换读写队列,将写队列的数据复制到读队列,并将写队列清空,消息put到写队列,刷盘是从读队列中取,读写分离
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
//循环读队列
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
//是否满足刷盘条件
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
// 唤醒commitLog中等待的线程,通知刷盘完毕
req.wakeupCustomer(flushOK);
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
//将读队列清空
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}

public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
/**
* 如果没有消息put进来,也就是说没有调用putRequest,则会挂起等待10毫秒,执行后续的doCommit方法
* 一旦有消息put进来,则会立马被唤醒
*/
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}

synchronized (this) {
this.swapRequests();
}

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
}

@Override
protected void onWaitEnd() {
this.swapRequests();
}

@Override
public String getServiceName() {
return GroupCommitService.class.getSimpleName();
}

@Override
public long getJointime() {
return 1000 * 60 * 5;
}
}

对于同步刷盘,有两点设计比较好,第一就是用了读写队列,具体参看GroupCommitService,消息全部写到requestsWrite,真正刷盘的时候从requestsRead里面拿。第二是ServiceThread的设计,也就是说刷盘的逻辑是一个单独的线程处理的,如果有任务过来,它就会立马被唤醒去处理,如果没有任务,则会挂起等待10ms后自动唤醒去处理任务。所以说同步刷盘也不是真正的同步,而是用了一个异步线程去刷,可能一次会刷多条消息。

异步刷盘

异步刷盘有两种方式,一种是开启了堆外内存,首先会把写到堆外内存的消息commit到FileChannel,然后再flush到磁盘,另外一种是未开启堆外内存,直接从mappedByteBuffer flush到磁盘

第一种方式的实现在FlushRealTimeService中,第二种的实现方式在CommitRealTimeService,这里我就不具体分析了。

后面我们将继续分析CosumerQueue和IndexQueue的构建