This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 3c264c8 Polish push message process in client(add broker name)
3c264c8 is described below
commit 3c264c8f2cf9c9672a26e5b9967dcd10f2f23fba
Author: ShannonDing <[email protected]>
AuthorDate: Tue Feb 19 17:30:24 2019 +0800
Polish push message process in client(add broker name)
---
.../client/impl/ClientRemotingProcessor.java | 2 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 36 +++++++++++++---------
.../client/impl/factory/MQClientInstance.java | 6 +++-
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index c8c3919..f3cfff7 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -218,12 +218,12 @@ public class ClientRemotingProcessor implements
RequestProcessor {
final PushMessageHeader requestHeader =
(PushMessageHeader) request
.decodeCommandCustomHeader(PushMessageHeader.class);
-
final MessageExt msg =
MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
boolean result =
this.mqClientFactory.processSnodePushMessage(msg,
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
+ requestHeader.getEnodeName(),
requestHeader.getQueueId(),
requestHeader.getQueueOffset());
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 000d569..f5251f5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -308,12 +308,13 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//Update local offset according remote offset
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey =
genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
AtomicLong localOffset =
localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
- localConsumerOffset.put(localOffsetKey, new
AtomicLong(-1));
+ localConsumerOffset.putIfAbsent(localOffsetKey, new
AtomicLong(-1));
}
localConsumerOffset.get(localOffsetKey).set(pullResult.getNextBeginOffset());
@@ -474,9 +475,10 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
private void executePullRequestLater(final PullRequest pullRequest, final
long timeDelay) {
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey =
genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null &&
pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
@@ -498,9 +500,10 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
- String localOffsetKey = pullRequest.getConsumerGroup()
- + "@" + pullRequest.getMessageQueue().getTopic()
- + "@" + pullRequest.getMessageQueue().getQueueId();
+ String localOffsetKey =
genLocalOffsetKey(pullRequest.getConsumerGroup(),
+ pullRequest.getMessageQueue().getTopic(),
+ pullRequest.getMessageQueue().getBrokerName(),
+ pullRequest.getMessageQueue().getQueueId());
if (pullStopped.get(localOffsetKey) != null &&
pullStopped.get(localOffsetKey).get()) {
//Stop pull request
log.info("Stop pull request, {}", localOffsetKey);
@@ -1197,21 +1200,23 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
public boolean processPushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
+ final String brokerName,
final int queueID,
final long offset) {
- String localOffsetKey = consumerGroup + "@" + topic + "@" + queueID;
+ String localOffsetKey = genLocalOffsetKey(consumerGroup, topic,
brokerName, queueID);
AtomicLong localOffset = this.localConsumerOffset.get(localOffsetKey);
if (localOffset == null) {
log.info("Current Local offset have not set, initiallized to -1.");
this.localConsumerOffset.put(localOffsetKey, new AtomicLong(-1));
return false;
}
- if (localOffset.get() < offset) {
+ if (localOffset.get() + 1 < offset) {
//should start pull message process
log.debug("Current Local key:{} and offset:{} and push
offset:{}", localOffsetKey, localOffset.get(), offset);
return false;
} else {
//Stop pull request
+ log.debug("Process Push : Current Local key:{} and offset:{} and
push offset:{}", localOffsetKey, localOffset.get(), offset);
AtomicBoolean pullStop = this.pullStopped.get(localOffsetKey);
if (pullStop == null) {
this.pullStopped.put(localOffsetKey, new AtomicBoolean(true));
@@ -1233,10 +1238,13 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
processQueue = processQueues.get(localOffsetKey);
}
processQueue.putMessage(messageExtList);
- MessageQueue messageQueue = new MessageQueue(topic, "", queueID);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName,
queueID);
this.consumeMessageService.submitConsumeRequest(messageExtList,
processQueue, messageQueue, true);
- log.info(".......submitConsumeRequest:{},Offset:{}...",
localOffsetKey, offset);
}
return true;
}
+
+ private String genLocalOffsetKey(String consumerGroup, String topic,
String brokerName, int queueID) {
+ return consumerGroup + "@" + topic + "@" + brokerName + "@" + queueID;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 4a81c3b..0953fdc 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1376,15 +1376,19 @@ public class MQClientInstance {
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
+
public boolean processSnodePushMessage(final MessageExt msg,
final String consumerGroup,
final String topic,
+ final String brokerName,
final int queueID,
final long offset) {
+ log.debug("Recieve:processSnodePushMessage :{}-{}-{}-{}-{}",
+ consumerGroup, topic, brokerName, queueID, offset);
MQConsumerInner mqConsumerInner =
this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)
mqConsumerInner;
-
consumer.processPushMessage(msg,consumerGroup,topic,queueID,offset);
+ consumer.processPushMessage(msg, consumerGroup, topic, brokerName,
queueID, offset);
return true;
}