[ROCKETMQ-160]SendHeartBeat may not be logged in the expected period closes apache/incubator-rocketmq#86
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/04c8925d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/04c8925d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/04c8925d Branch: refs/heads/master Commit: 04c8925d6da77a41cef746a9c6478a407c4c9edd Parents: adae162 Author: Jaskey <linjunjie1...@gmail.com> Authored: Sat May 27 12:38:00 2017 +0800 Committer: dongeforever <zhendongli...@yeah.net> Committed: Sat May 27 12:38:00 2017 +0800 ---------------------------------------------------------------------- .../client/impl/factory/MQClientInstance.java | 66 ++++++++++---------- 1 file changed, 34 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/04c8925d/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index a8c65b2..1b075ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -112,7 +112,7 @@ public class MQClientInstance { private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; private final ConsumerStatsManager consumerStatsManager; - private final AtomicLong storeTimesTotal = new AtomicLong(0); + private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0); private ServiceState serviceState = ServiceState.CREATE_JUST; private DatagramSocket datagramSocket; private Random random = new Random(); @@ -517,38 +517,40 @@ public class MQClientInstance { return; } - long times = this.storeTimesTotal.getAndIncrement(); - Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); - while (it.hasNext()) { - Entry<String, HashMap<Long, String>> entry = it.next(); - String brokerName = entry.getKey(); - HashMap<Long, String> oneTable = entry.getValue(); - if (oneTable != null) { - for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { - Long id = entry1.getKey(); - String addr = entry1.getValue(); - if (addr != null) { - if (consumerEmpty) { - if (id != MixAll.MASTER_ID) - continue; - } - - try { - int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); - if (!this.brokerVersionTable.containsKey(brokerName)) { - this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); - } - this.brokerVersionTable.get(brokerName).put(addr, version); - if (times % 20 == 0) { - log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); - log.info(heartbeatData.toString()); + if (!this.brokerAddrTable.isEmpty()) { + long times = this.sendHeartbeatTimesTotal.getAndIncrement(); + Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, HashMap<Long, String>> entry = it.next(); + String brokerName = entry.getKey(); + HashMap<Long, String> oneTable = entry.getValue(); + if (oneTable != null) { + for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) { + Long id = entry1.getKey(); + String addr = entry1.getValue(); + if (addr != null) { + if (consumerEmpty) { + if (id != MixAll.MASTER_ID) + continue; } - } catch (Exception e) { - if (this.isBrokerInNameServer(addr)) { - log.error("send heart beat to broker exception", e); - } else { - log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, - id, addr); + + try { + int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); + if (!this.brokerVersionTable.containsKey(brokerName)) { + this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4)); + } + this.brokerVersionTable.get(brokerName).put(addr, version); + if (times % 20 == 0) { + log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); + log.info(heartbeatData.toString()); + } + } catch (Exception e) { + if (this.isBrokerInNameServer(addr)) { + log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); + } else { + log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, + id, addr); + } } } }