This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 86aa71177f1 Pipe: Added collect invocation count in the caculation of 
data region extractor events for data node remaining time (#12799)
86aa71177f1 is described below

commit 86aa71177f1754907b97ee63306b7c945e461023
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 25 12:18:31 2024 +0800

    Pipe: Added collect invocation count in the caculation of data region 
extractor events for data node remaining time (#12799)
---
 iotdb-core/datanode/pom.xml                        |  4 +--
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  | 13 ++++++++++
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 29 +++++++++++++++-------
 .../pipe/task/connection/PipeEventCollector.java   |  4 +++
 .../subtask/processor/PipeProcessorSubtask.java    |  4 +++
 5 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index cabc80a5aa1..478e0e783b0 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -521,9 +521,7 @@
                         <!-- These are used at runtime in tests -->
                         
<usedDependency>io.jsonwebtoken:jjwt-impl</usedDependency>
                         
<usedDependency>io.jsonwebtoken:jjwt-jackson</usedDependency>
-                        <!-- We need this dependency as it provides the metric 
managers used in tests -->
-                        
<usedDependency>org.apache.iotdb:metrics-core</usedDependency>
-                        <!-- This dependency is required at runtime, when 
esnabling the rest service -->
+                        <!-- This dependency is required at runtime, when 
enabling the rest service -->
                         
<usedDependency>org.glassfish.jersey.inject:jersey-hk2</usedDependency>
                     </usedDependencies>
                 </configuration>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index cff505d444e..3daebdf0d84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -210,6 +210,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
+  public void markCollectInvocationCount(final String pipeID, final long 
collectInvocationCount) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.get(pipeID);
+    if (Objects.isNull(operator)) {
+      return;
+    }
+
+    operator.markCollectInvocationCount(collectInvocationCount);
+  }
+
   //////////////////////////// Show pipes ////////////////////////////
 
   public Pair<Long, Double> getRemainingEventAndTime(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index 8963c7019c2..e85a212b96d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.metrics.core.IoTDBMetricManager;
+import org.apache.iotdb.metrics.core.type.IoTDBHistogram;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import com.codahale.metrics.Clock;
@@ -49,6 +51,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
       Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final AtomicReference<Meter> dataRegionCommitMeter = new 
AtomicReference<>(null);
   private final AtomicReference<Meter> schemaRegionCommitMeter = new 
AtomicReference<>(null);
+  private final IoTDBHistogram collectInvocationHistogram =
+      (IoTDBHistogram) IoTDBMetricManager.getInstance().createHistogram(null);
 
   private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
   private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -85,12 +89,18 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     final PipeRemainingTimeRateAverageTime 
pipeRemainingTimeCommitRateAverageTime =
         PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
 
+    final double invocationValue = collectInvocationHistogram.getMean();
     // Do not take heartbeat event into account
-    final int totalDataRegionWriteEventCount =
-        dataRegionExtractors.stream()
-                .map(IoTDBDataRegionExtractor::getEventCount)
-                .reduce(Integer::sum)
-                .orElse(0)
+    final double totalDataRegionWriteEventCount =
+        (dataRegionExtractors.stream()
+                        .map(IoTDBDataRegionExtractor::getEventCount)
+                        .reduce(Integer::sum)
+                        .orElse(0)
+                    - dataRegionExtractors.stream()
+                        
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
+                        .reduce(Integer::sum)
+                        .orElse(0))
+                * Math.max(invocationValue, 1)
             + dataRegionProcessors.stream()
                 .map(processorSubtask -> processorSubtask.getEventCount(true))
                 .reduce(Integer::sum)
@@ -99,10 +109,6 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
                 .map(connectorSubtask -> 
connectorSubtask.getEventCount(pipeName))
                 .reduce(Integer::sum)
                 .orElse(0)
-            - dataRegionExtractors.stream()
-                .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
-                .reduce(Integer::sum)
-                .orElse(0)
             - dataRegionConnectors.stream()
                 .map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
                 .reduce(Integer::sum)
@@ -205,6 +211,11 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
         });
   }
 
+  void markCollectInvocationCount(final long collectInvocationCount) {
+    // If collectInvocationCount == 0, the event will still be committed once
+    collectInvocationHistogram.update(Math.max(collectInvocationCount, 1));
+  }
+
   //////////////////////////// Switch ////////////////////////////
 
   // Thread-safe & Idempotent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index ddc7ee5a2d3..c97257cae61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -174,6 +174,10 @@ public class PipeEventCollector implements EventCollector {
     collectInvocationCount.set(0);
   }
 
+  public long getCollectInvocationCount() {
+    return collectInvocationCount.get();
+  }
+
   public boolean hasNoCollectInvocationAfterReset() {
     return collectInvocationCount.get() == 0;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 8ead33deb79..96e0911af3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -136,9 +136,13 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
         if (event instanceof TabletInsertionEvent) {
           pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
           PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
+          PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+              .markCollectInvocationCount(taskID, 
outputEventCollector.getCollectInvocationCount());
         } else if (event instanceof TsFileInsertionEvent) {
           pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
           PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
+          PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+              .markCollectInvocationCount(taskID, 
outputEventCollector.getCollectInvocationCount());
         } else if (event instanceof PipeHeartbeatEvent) {
           pipeProcessor.process(event, outputEventCollector);
           ((PipeHeartbeatEvent) event).onProcessed();

Reply via email to