RocketMQ消息消费流程源码分析(四)

本篇我们主要关注事务消息的流程。

事务消息示例

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
//创建TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

producer.setExecutorService(executorService);
//设置TransactionListener,TransactionListener包含两个动作:1、执行本地事务 2、检查事务状态
producer.setTransactionListener(transactionListener);
//启动producer
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

TransactionListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}

//根据transactionId查询事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

实现原理

producer发送prepared消息到broker,broker会把prepared消息的topic转换成RMQ_SYS_TRANS_HALF_TOPIC进行存储,producer端会执行本地事务,执行事务成功以后会发送commit或者是rollback消息到broker,如果是commit消息,broker会恢复真实的topic,重新存储以及构建consumequeue,让消费方进行消费;如果是rollback消息,则会在RMQ_SYS_TRANS_HALF_TOPIC删除prepared消息。如果broker没有收到commit或者rollback消息,则broker每隔1分钟会定时回查proder该条消息的事务执行状态。

Producer端实现

DefaultMQProducerImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//校验消息的Topic以及Body
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
//设置消息的TRAN_MSG属性为true
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//同步发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//TODO
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//设置transactionId
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
//localTransactionExecuter为空,调用transactionListener进行处理
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
//执行本地事务(重点)
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
//发送confirm消息,见EndTransactionProcessor的分析
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
}

Broker端实现

SendMessageProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//省略代码。。。
//处理事务消息,这里不展开分析
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//处理prepared消息,这里会调用TransactionalMessageServiceImpl的prepareMessage方法,最终会调用TransactionalMessageBridge的putHalfMessage方法
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
//处理非事务消息,这里展开分析
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

}
}
TransactionalMessageBridge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TransactionalMessageBridge {
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
//1.将消息的Topic转换成RMQ_SYS_TRANS_HALF_TOPIC,并设置队列为0;并将真实Topic与队列ID存到property中
//2.调用DefaultMessageStore进行消息存储
return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}
EndTransationProcessor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class EndTransactionProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
//从服务器不支持事务消息
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
//如果是消息回查后发的END_TRANSACTION消息
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());

break;
}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
} else {
switch (requestHeader.getCommitOrRollback()) {
//
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}

case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}

case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
OperationResult result = new OperationResult();
//如果是提交消息
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//查找prepared消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//恢复消息的Topic以及queueId
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//调用DefaultMessageStore进行真实消息存储,存储之后就可以构建Cosumequeue,消费者就可以消费了
RemotingCommand sendResult = sendFinalMessage(msgInner);
//逻辑删除prepared消息,会往RMQ_SYS_TRANS_OP_HALF_TOPIC这个Topic里写消息,且消息的body存放的是prepared消息的偏移量
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
//如果是回滚消息,则删除prepared消息,和COMMIT_TYPE的删除逻辑一致
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
}

消息回查

当执行本地事务,返回本地事务状态为UN_KNOW,提交到服务器不会做任何操作,消息还是会存在RMQ_SYS_TRANS_HALF_TOPIC主题中,并不会被消费者消费。这些消息最终会如何处理呢?Rocket MQ依赖Broker的定时任务对这些消息进行状态回查,最终执行Commit或者Rollback。如下,在BrokerController中会调用TransactionalMessageCheckService的start()方法

TransactionalMessageCheckService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TransactionalMessageCheckService extends ServiceThread {
@Override
public void run() {
log.info("Start transaction check service thread!");
//每隔一分钟进行检测
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//调用TrasactionalMessageServiceImpl的check方法进行消息回查
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}
TrasactionalMessageServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
//获取到所有的RMQ_SYS_TRANS_HALF_TOPIC消息队列(prepare消息)
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
//获取MessageQueue的操作队列
MessageQueue opQueue = getOpQueue(messageQueue);
//获取prepare消息队列 以及 操作队列的消费进度,如果任意一个小于0,则忽略该队列,处理下一个队列
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}

List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
//填充removeMap和doneOpOffset
//removeMap里存放prepare消息队列中已经commit或者rollback的偏移量和待操作队列的消息偏移量(发送commit或rollback后,会往待操作队列中写)
//doneOpOffset存放待操作队列的消息偏移量
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
//每个prepared消息队列处理时间不超过60s
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
//如果removeMap中包含当前处理的消息,则继续下一条
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {
//从prepared消息队列偏移量为i的地方获取消息
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
//如果获取消息为空,则可以重试一次(具体值为MAX_RETRY_COUNT_WHEN_HALF_NULL,不可设置)
//如果没有新消息,也即返回NO_NEW_MSG,则结束回查
//如果其他原因,则将i设置为getResult.getPullResult().getNextBeginOffset(),重新查询
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
//如果消息回查次数大于transactionCheckMax,默认为5,则丢弃该消息
//如果消息时间大于文件过期时间(也即72小时),则丢弃该消息
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
//如果消息的存储时间大于事务回查的开始时间,则结束循环
if (msgExt.getStoreTimestamp() >= startTime) {
log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
//该消息已存储的时间
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
//获取事务消息过期时间属性; transactionTimeout事务消息的超时时间
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
//checkImmunityTime:立即检测事务消息的时间;
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
//如果消息已存储的时间小于事务超时时间,则结束当前循环
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
//判断是否需要发送回查消息
//1. 如果操作队列中没有要处理的消息且消息已存储的时间已经超过checkImmunityTime
//2. 如果操作队列不为空,且最后一条消息的存储时间已经超过了transactionTimeout
//3. 消息存储时间小于-1
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);

//如果需要回查事务消息状态,则将消息再次存储到RMQ_SYS_TRANS_HALF_TOPIC
//如果不需要回查事务消息状态,则拉取更多操作队列
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//Broker往Producer端发送Code为CHECK_TRANSACTION_STATE的消息,这是一条单向的消息,不需要回复
//在Producer端会调用TransactionListener对象的checkLocalTransaction检查事务状态,Producer检查消息状态以后,会向Broker发送END_TRANSACTION的单向消息,由broker来处理
//这里就不跟到代码里去了 TODO
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
//消息偏移量自增
newOffset = i + 1;
i++;
}
//更新消费队列的消费进度
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
//更新操作队列的消费进度
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}
}
}

到这里事务消息的主流程就结束了。