This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c68404  [INLONG-2151][Improve] Add time and sort statistics by topic 
(#2152)
4c68404 is described below

commit 4c6840481936e4c4a071116368a45d084e7c4683
Author: gosonzhang <[email protected]>
AuthorDate: Thu Jan 13 12:14:17 2022 +0800

    [INLONG-2151][Improve] Add time and sort statistics by topic (#2152)
---
 .../tubemq/server/tools/cli/CliConsumer.java       | 38 +++++++++++++++++++---
 .../tubemq/server/tools/cli/CliProducer.java       | 11 +++++--
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
index b60cf83..8417863 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliConsumer.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.cli.CommandLine;
@@ -59,6 +60,9 @@ public class CliConsumer extends CliAbstractBase {
             LoggerFactory.getLogger(CliConsumer.class);
     // statistic data index
     private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
+    private static final ConcurrentHashMap<String, AtomicLong> TOPIC_COUNT_MAP 
=
+            new ConcurrentHashMap();
+    private long startTime = System.currentTimeMillis();
     // sent data content
     private final Map<String, TreeSet<String>> topicAndFiltersMap = new 
HashMap<>();
     private final List<MessageSessionFactory> sessionFactoryList = new 
ArrayList<>();
@@ -188,6 +192,7 @@ public class CliConsumer extends CliAbstractBase {
         consumerConfig.setRpcTimeoutMs(rpcTimeoutMs);
         consumerConfig.setPushFetchThreadCnt(fetchThreadCnt);
         consumerConfig.setConsumePosition(consumePos);
+        startTime = System.currentTimeMillis();
         // initial consumer object
         if (isPushConsume) {
             DefaultMessageListener msgListener =
@@ -203,6 +208,7 @@ public class CliConsumer extends CliAbstractBase {
                     for (Map.Entry<String, TreeSet<String>> entry
                             : topicAndFiltersMap.entrySet()) {
                         consumer1.subscribe(entry.getKey(), entry.getValue(), 
msgListener);
+                        TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
                     }
                     consumer1.completeSubscribe();
                     consumerMap.put(consumer1, null);
@@ -217,6 +223,7 @@ public class CliConsumer extends CliAbstractBase {
                     for (Map.Entry<String, TreeSet<String>> entry
                             : topicAndFiltersMap.entrySet()) {
                         consumer1.subscribe(entry.getKey(), entry.getValue(), 
msgListener);
+                        TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
                     }
                     consumer1.completeSubscribe();
                     consumerMap.put(consumer1, null);
@@ -233,6 +240,7 @@ public class CliConsumer extends CliAbstractBase {
                     for (Map.Entry<String, TreeSet<String>> entry
                             : topicAndFiltersMap.entrySet()) {
                         consumer2.subscribe(entry.getKey(), entry.getValue());
+                        TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
                     }
                     consumer2.completeSubscribe();
                     consumerMap.put(consumer2,
@@ -248,6 +256,7 @@ public class CliConsumer extends CliAbstractBase {
                     for (Map.Entry<String, TreeSet<String>> entry
                             : topicAndFiltersMap.entrySet()) {
                         consumer2.subscribe(entry.getKey(), entry.getValue());
+                        TOPIC_COUNT_MAP.put(entry.getKey(), new AtomicLong(0));
                     }
                     consumer2.completeSubscribe();
                     consumerMap.put(consumer2,
@@ -282,7 +291,6 @@ public class CliConsumer extends CliAbstractBase {
                 thread.start();
             }
         }
-
     }
 
     // for push consumer callback process
@@ -294,7 +302,11 @@ public class CliConsumer extends CliAbstractBase {
         @Override
         public void receiveMessages(PeerInfo peerInfo, List<Message> messages) 
{
             if (messages != null && !messages.isEmpty()) {
-                TOTAL_COUNTER.addAndGet(messages.size());
+                int msgCnt = messages.size();
+                Message message = messages.get(0);
+                TOTAL_COUNTER.addAndGet(msgCnt);
+                AtomicLong accCount = TOPIC_COUNT_MAP.get(message.getTopic());
+                accCount.addAndGet(msgCnt);
             }
         }
 
@@ -327,7 +339,11 @@ public class CliConsumer extends CliAbstractBase {
                     if (result.isSuccess()) {
                         List<Message> messageList = result.getMessageList();
                         if (messageList != null && !messageList.isEmpty()) {
-                            TOTAL_COUNTER.addAndGet(messageList.size());
+                            int msgCnt = messageList.size();
+                            TOTAL_COUNTER.addAndGet(msgCnt);
+                            AtomicLong accCount =
+                                    TOPIC_COUNT_MAP.get(result.getTopicName());
+                            accCount.addAndGet(msgCnt);
                         }
                         
messageConsumer.confirmConsume(result.getConfirmContext(), true);
                     } else {
@@ -365,14 +381,26 @@ public class CliConsumer extends CliAbstractBase {
             while (cliConsumer.msgCount < 0
                     || TOTAL_COUNTER.get() < cliConsumer.msgCount * 
cliConsumer.clientCount) {
                 ThreadUtils.sleep(cliConsumer.printIntervalMs);
-                System.out.println("Required received count VS received 
message count = "
+                System.out.println("Continue, cost time: "
+                        + (System.currentTimeMillis() - cliConsumer.startTime)
+                        + " ms, required count VS received count = "
                         + (cliConsumer.msgCount * cliConsumer.clientCount)
                         + " : " + TOTAL_COUNTER.get());
+                for (Map.Entry<String, AtomicLong> entry : 
TOPIC_COUNT_MAP.entrySet()) {
+                    System.out.println("Topic Name = " + entry.getKey()
+                            + ", count=" + entry.getValue().get());
+                }
             }
             cliConsumer.shutdown();
-            System.out.println("Finished, received count VS received message 
count = "
+            System.out.println("Finished, cost time: "
+                    + (System.currentTimeMillis() - cliConsumer.startTime)
+                    + " ms, required count VS received count = "
                     + (cliConsumer.msgCount * cliConsumer.clientCount)
                     + " : " + TOTAL_COUNTER.get());
+            for (Map.Entry<String, AtomicLong> entry : 
TOPIC_COUNT_MAP.entrySet()) {
+                System.out.println("Topic Name = " + entry.getKey()
+                        + ", count=" + entry.getValue().get());
+            }
         } catch (Throwable ex) {
             ex.printStackTrace();
             logger.error(ex.getMessage());
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
index 0ac63a5..1a706f1 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/cli/CliProducer.java
@@ -55,6 +55,8 @@ public class CliProducer extends CliAbstractBase {
 
     private static final Logger logger =
             LoggerFactory.getLogger(CliProducer.class);
+    // start time
+    private long startTime = System.currentTimeMillis();
     // statistic data index
     private static final AtomicLong TOTAL_COUNTER = new AtomicLong(0);
     private static final AtomicLong SENT_SUCC_COUNTER = new AtomicLong(0);
@@ -187,6 +189,7 @@ public class CliProducer extends CliAbstractBase {
         sentData = MixedUtils.buildTestData(msgDataSize);
         // initial topic send round
         topicSendRounds = 
MixedUtils.buildTopicFilterTupleList(topicAndFiltersMap);
+        startTime = System.currentTimeMillis();
         // initial send thread service
         sendExecutorService =
                 Executors.newFixedThreadPool(sendThreadCnt, new 
ThreadFactory() {
@@ -323,7 +326,9 @@ public class CliProducer extends CliAbstractBase {
             while (cliProducer.msgCount < 0
                     || TOTAL_COUNTER.get() < cliProducer.msgCount * 
cliProducer.clientCount) {
                 ThreadUtils.sleep(cliProducer.printIntervalMs);
-                System.out.println("Required send count VS sent message count 
= "
+                System.out.println("Continue, cost time: "
+                        + (System.currentTimeMillis() - cliProducer.startTime)
+                        + "ms, required count VS sent count = "
                         + (cliProducer.msgCount * cliProducer.clientCount)
                         + " : " + TOTAL_COUNTER.get()
                         + " (" + SENT_SUCC_COUNTER.get()
@@ -332,7 +337,9 @@ public class CliProducer extends CliAbstractBase {
                         + ")");
             }
             cliProducer.shutdown();
-            System.out.println("Finished, required send count VS sent message 
count = "
+            System.out.println("Finished, cost time: "
+                    + (System.currentTimeMillis() - cliProducer.startTime)
+                    + "ms, required count VS sent count = "
                     + (cliProducer.msgCount * cliProducer.clientCount)
                     + " : " + TOTAL_COUNTER.get()
                     + " (" + SENT_SUCC_COUNTER.get()

Reply via email to