RocketMQ消息发送流程源码分析

Producer发送消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//1.创建DefaultMQProducer,在DefaultMQProducer构造方法中会创建DefaultMQProducerImpl
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
//2.设置nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3.启动producer
producer.start();

for (int i = 0; i < 10; i++) {
try {
//4.构建消息,包括Topic Tag Message body三个部分
Message msg = new Message("TopicTest_1" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
//5.发送消息,这里是发送消息的入口,最终会调用到DefaultMQProducerImpl#sendDefaultImpl方法
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//6.关闭producer
producer.shutdown();
}
}

消息发送的入口

​ 在上面说到了消息发送最终会调用到DefaultMQProducerImpl#sendDefaultImpl,那我们直接来看代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//验证DefaultMQProducerImpl的状态是否是RUNNING状态,Producer启动的时候会设置为RUNNING状态
this.makeSureStateOK();
//验证Topic以及消息体
Validators.checkMessage(msg, this.defaultMQProducer);
//目前看来只在日志打印中使用,是否存在性能问题?
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
/*
* 获取路由信息
* 1 首先从缓存中获取
* 2 如果没有的话,从nameserver中获取。
* 2.1 如果从nameserver中获取成功,则会更新路由缓存信息,具体实现在MQClientInstance#updateTopicRouteInfoFromNameServer方法中
* 2.2 如果该Topic在nameserver中没有的话,则会根据默认Topic “TBW102” 获取路由信息。(Topic “TBW102”这个Topic是在Broker启动创建的,可以参考TopicConfigManager的构造方法,然后向nameserver中注册)
*/
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
//for循环进行重试
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//选择一个MessageQueue,后面会单独分析
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//如果以上步骤超时了,则重新选取MessageQueue进行发送
if (timeout < costTime) {
callTimeout = true;
break;
}
//真正发送消息的逻辑
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//发送成功后,更新Broker延迟容错信息,具体见后面分析
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//三种发送策略,同步,异步,单向(ONEWAY,只管不送,不接收结果)
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}

return sendResult;
default:
break;
}
//抛异常后,也会更新Broker延迟容错信息,具体见后面分析
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
}
// 省略异常代码
} else {
break;
}
}

if (sendResult != null) {
return sendResult;
}
//省略代码
}

由上面的流程我们来梳理一下,RocketMQ发送同步消息的基本是:消息校验 — 获取Topic路由信息 — 选择消息队列— 发送消息 — 返回结果。下面我们分别就这几方面来分析。

消息校验

校验的代码我就不贴出来了,比较简单,具体实现在makeSureStateOK()以及Validators.checkMessage()中,首先是校验生产者DefaultMQProducerImpl是否处于运行状态,其次校验消息体不能为空,且默认不能超过最大长度4M,以及Topic要符合相应的规范(具体实现参见Validators.checkTopic()方法)。

获取Topic路由信息

DefaultMQProducerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//1. 首先从缓存中查询该Topic的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//2. 如果没有查到,则向NameServer查询该Topic的路由信息
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

//3. 前两步如果有任何一步查到了,就返回路由信息
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
//4. 如果前两步都没有查到,会根据默认Topic "TBW102"获取路由信息
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

MQClientInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
//如果isDefault为true的话且defaultMQProducer不为空
if (isDefault && defaultMQProducer != null) {
//查询默认主题"TBW102"的路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
//如果查到了,则替换路由信息中读写队列的个数为生产者默认的队列个数4(因为默认主题"TBW102"的读写队列个数为16)
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//查询Topic的路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
//判断路由信息是否有变更,和本地缓存中的信息进行对比
boolean changed = topicRouteDataIsChange(old, topicRouteData);
//如果路由信息未变更,再次判断是否有新增的和Topic相关的Producer和Consumer,如果有,则需要更新路由信息
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}

if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

//更新broker地址缓存信息
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// 将TopicRouteData转换成TopicPublishInfo,也就是将TopicRouteData中的List<QueueData>转换成TopicPublishInfo中的List<MessageQuue>,然后更新所有和Topic有关的消息发送的路由信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

// 将TopicRouteData中的List<QueueData>转换成Set<MessageQueue>,更新订阅信息
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {

} finally {
this.lockNamesrv.unlock();
}
} else {

}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}

选择消息队列

选择消息队列有两种方式,一种是默认机制,即不开启broker故障延迟机制,一种是开启broker故障延迟机制

不开启broker故障延迟机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//如果是第一次选择,则轮询选择一个MessageQueue
if (lastBrokerName == null) {
return selectOneMessageQueue();
//如果是重试,则轮询选择一个不属于上一次broker的MessageQueue
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}

开启broker故障延迟机制

开启broker故障延迟机制有比较两个重要的类,即MQFaultStrategy、LatencyFaultToleranceImpl

MQFaultStrategy的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class MQFaultStrategy {
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//如果打开broker故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
//自增取值,这里用的是ThreadLocal
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
//第一步:对MessageQueue的数量取模,其实也就是轮询
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//如果当前broker是可用的,怎么判断broker是可用的,参见LatencyFaultToleranceImpl的isAvailable方法
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
//如果是第一次进行选择或者是和上一次选择的broker不一样,这里的代码逻辑应该是有问题的,应该是!mq.getBrokerName().equals(lastBrokerName)
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//第二步:如果上面的过程没有选择到合适的broker,则在不可用的队列中按照可用性进行排序(是否可用 > 延迟时间 > 开始时间),从前半数中轮询选取一个broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//第三步:如果前两步都不没有选取到,则轮询选取
return tpInfo.selectOneMessageQueue();
}
//默认情况:不开启broker故障延迟机制时,则轮询选取一个不属于上一次发送的broker的队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
//如果打开broker故障延迟机制
if (this.sendLatencyFaultEnable) {
/*计算该broker的不可用时间。
* 1. 如果发送成功了,则isolation为false,且currentLatency为消息发送的耗时
* 1.1 根据computeNotAvailableDuration的计算规则,发送耗时会相应的对应broker不可用的时间,比如550ms 则broker 30s不可用
* 2. 如果抛异常了,则isolation为true,则会根据30000ms进行计算,那broker 10分钟不可用
*/
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
//调用LatencyFaultTolerance的updateFaultItem方法进行更新,参看LatencyFaultToleranceImpl类
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

/**
* 计算broker的不可用时间,根据latencyMax和notAvailableDuration两个数组的对应关系来计算
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}

return 0;
}
}
LatencyFaultToleranceImpl的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
//从faultItemTable取FaultItem
FaultItem old = this.faultItemTable.get(name);
//如果未取到,则新创建一个并put到faultItemTable中,所以startTimestamp就是当前时间 + broker的不可用时间
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
// startTimestamp的时间(也即broker的可用时间)为 当前时间 + 不可用时间,在判断broker isAvailable会用到这个值
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//TODO 为什么这样做,暂时没看出来
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
//如果取到了,则进行更新
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}

@Override
public boolean isAvailable(final String name) {
//从FaultItemTable中获取FaultItem,并判断FaultItem的可用性
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}

@Override
public void remove(final String name) {
this.faultItemTable.remove(name);
}
//第二步:如果上面的过程没有选择到合适的broker,则在不可用的broker中按照可用性进行排序,从前半数中轮询选取一个broker
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
Collections.shuffle(tmpList);
Collections.sort(tmpList);
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}

return null;
}

class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;

public FaultItem(final String name) {
this.name = name;
}

//会根据这个排序规则排序,可用性,延迟时间,broker的可用时间
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable())
return -1;
if (other.isAvailable())
return 1;
}
if (this.currentLatency < other.currentLatency)
return -1;
else if (this.currentLatency > other.currentLatency) {
return 1;
}
if (this.startTimestamp < other.startTimestamp)
return -1;
else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}

//FaultItem的isAvailable方法,这里只用当前时间 和 startTimestamp来进行比较。那startTimestamp的值是怎么来的呢?参看MQFaultStrategy的updateFaultItem方法
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
}
}

消息队列选择总结

  1. 默认情况下,不开启broker故障延迟机制,轮询选择一个不属于上一次broker的队列

  2. 开启broker故障延迟机制

    首先,轮询选择一个可用的broker中且不属于上一次broker的队列

    其次,按照是否可用、延迟时间、可用时间进行排序,从前半数中轮询选择

    最后,直接轮询选取,什么都不管。

这个开关一般不开启,适合比较大规模的集群。

消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//获取broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
//是否使用broker vip通道;在broker端会开启两个端口,一个是10911,一个是-2后的10909,也即vip通道
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
//设置消息唯一ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

//消息压缩,把压缩后的消息放在body里,修改sysFlag属性
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

//如果属于事物消息,设置sysFlag
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}

//消息发送前的扩展点
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}

//构建发送消息请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}

String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

//发送消息
SendResult sendResult = null;
switch (communicationMode) {
//异步消息
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
//同步消息,有结果返回会调用MQClientAPIImpl.processSendResponse方法处理结果并返回
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

//消息发送后的扩展点
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}

return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}