This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b00ad30b7f186b29d401f8e9007663fb076b07b1
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 18 14:46:48 2025 +0800

    Pipe: Fixed the insert node EMA logic for degrading (#15741) (#15758)
---
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 30 ++++++++++---------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 34 +++++-----------------
 .../iotdb/commons/pipe/config/PipeConfig.java      | 13 ++-------
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 14 +++------
 4 files changed, 31 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 47dc0ff18b9..86368acf353 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -59,9 +59,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   private Timer insertNodeTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer tsfileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
 
-  private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE;
-  private final Meter insertNodeEventCountMeter =
-      new Meter(new ExponentialMovingAverages(), Clock.defaultClock());
+  private final InsertNodeEMA insertNodeEventCountEMA = new InsertNodeEMA();
 
   private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE;
   private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE;
@@ -105,17 +103,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   }
 
   double getRemainingInsertEventSmoothingCount() {
-    if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() == 
PipeRateAverage.NONE) {
-      return insertNodeEventCount.get();
-    }
-    if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
-        >= 
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
 {
-      insertNodeEventCountMeter.mark(insertNodeEventCount.get());
-      lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis();
-    }
-    return PipeConfig.getInstance()
-        .getPipeRemainingInsertNodeCountAverage()
-        .getMeterRate(insertNodeEventCountMeter);
+    insertNodeEventCountEMA.update(insertNodeEventCount.get());
+    return insertNodeEventCountEMA.insertNodeEMAValue;
   }
 
   long getRemainingEvents() {
@@ -277,4 +266,17 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     dataRegionCommitMeter.set(null);
     schemaRegionCommitMeter.set(null);
   }
+
+  private static class InsertNodeEMA {
+    private double insertNodeEMAValue;
+
+    public void update(final double newValue) {
+      final double alpha = 
PipeConfig.getInstance().getPipeRemainingInsertNodeCountEMAAlpha();
+      if (insertNodeEMAValue == 0) {
+        insertNodeEMAValue = newValue;
+      } else {
+        insertNodeEMAValue = alpha * newValue + (1 - alpha) * 
insertNodeEMAValue;
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e82cc2fb1dd..59d348abe5c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -289,7 +289,6 @@ public class CommonConfig {
   private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
   private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000;
   private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000;
-  private int pipeRemainingEventCountSmoothingIntervalSeconds = 10;
 
   private int pipeMetaReportMaxLogNumPerRound = 10;
   private int pipeMetaReportMaxLogIntervalRounds = 36;
@@ -311,7 +310,7 @@ public class CommonConfig {
   private int pipeSnapshotExecutionMaxBatchSize = 1000;
   private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30;
   private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = 
PipeRateAverage.FIVE_MINUTES;
-  private PipeRateAverage pipeRemainingInsertNodeCountAverage = 
PipeRateAverage.ONE_MINUTE;
+  private double pipeRemainingInsertNodeCountEMAAlpha = 0.1;
   private double pipeTsFileScanParsingThreshold = 0.05;
   private double pipeDynamicMemoryHistoryWeight = 0.5;
   private double pipeDynamicMemoryAdjustmentThreshold = 0.05;
@@ -1603,23 +1602,6 @@ public class CommonConfig {
         pipeMaxAllowedTotalRemainingInsertEventCount);
   }
 
-  public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
-    return pipeRemainingEventCountSmoothingIntervalSeconds;
-  }
-
-  public void setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
-      int pipeRemainingEventCountSmoothingIntervalSeconds) {
-    if (this.pipeRemainingEventCountSmoothingIntervalSeconds
-        == pipeRemainingEventCountSmoothingIntervalSeconds) {
-      return;
-    }
-    this.pipeRemainingEventCountSmoothingIntervalSeconds =
-        pipeRemainingEventCountSmoothingIntervalSeconds;
-    logger.info(
-        "pipeRemainingEventCountSmoothingIntervalSeconds is set to {}",
-        pipeRemainingEventCountSmoothingIntervalSeconds);
-  }
-
   public void setPipeStuckRestartIntervalSeconds(long 
pipeStuckRestartIntervalSeconds) {
     if (this.pipeStuckRestartIntervalSeconds == 
pipeStuckRestartIntervalSeconds) {
       return;
@@ -1935,19 +1917,19 @@ public class CommonConfig {
         pipeRemainingTimeCommitRateAverageTime);
   }
 
-  public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
-    return pipeRemainingInsertNodeCountAverage;
+  public double getPipeRemainingInsertNodeCountEMAAlpha() {
+    return pipeRemainingInsertNodeCountEMAAlpha;
   }
 
-  public void setPipeRemainingInsertNodeCountAverage(
-      PipeRateAverage pipeRemainingInsertNodeCountAverage) {
+  public void setPipeRemainingInsertNodeCountEMAAlpha(
+      final double pipeRemainingInsertNodeCountEMAAlpha) {
     if (Objects.equals(
-        this.pipeRemainingInsertNodeCountAverage, 
pipeRemainingInsertNodeCountAverage)) {
+        this.pipeRemainingInsertNodeCountEMAAlpha, 
pipeRemainingInsertNodeCountEMAAlpha)) {
       return;
     }
-    this.pipeRemainingInsertNodeCountAverage = 
pipeRemainingInsertNodeCountAverage;
+    this.pipeRemainingInsertNodeCountEMAAlpha = 
pipeRemainingInsertNodeCountEMAAlpha;
     logger.info(
-        "pipeRemainingInsertEventCountAverage is set to {}", 
pipeRemainingInsertNodeCountAverage);
+        "pipeRemainingInsertEventCountAverage is set to {}", 
pipeRemainingInsertNodeCountEMAAlpha);
   }
 
   public double getPipeTsFileScanParsingThreshold() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index a616d80b64e..be0c70d7f42 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -251,8 +251,8 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime();
   }
 
-  public PipeRateAverage getPipeRemainingInsertNodeCountAverage() {
-    return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage();
+  public double getPipeRemainingInsertNodeCountEMAAlpha() {
+    return COMMON_CONFIG.getPipeRemainingInsertNodeCountEMAAlpha();
   }
 
   public double getPipeTsFileScanParsingThreshold() {
@@ -383,10 +383,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeMaxAllowedTotalRemainingInsertEventCount();
   }
 
-  public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() {
-    return 
COMMON_CONFIG.getPipeRemainingInsertEventCountSmoothingIntervalSeconds();
-  }
-
   /////////////////////////////// Logger ///////////////////////////////
 
   public int getPipeMetaReportMaxLogNumPerRound() {
@@ -551,7 +547,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info(
-        "PipePipeRemainingInsertEventCountAverage: {}", 
getPipeRemainingInsertNodeCountAverage());
+        "PipePipeRemainingInsertEventCountAverage: {}", 
getPipeRemainingInsertNodeCountEMAAlpha());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
     LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync());
 
@@ -645,9 +641,6 @@ public class PipeConfig {
     LOGGER.info(
         "PipeMaxAllowedTotalRemainingInsertEventCount: {}",
         getPipeMaxAllowedTotalRemainingInsertEventCount());
-    LOGGER.info(
-        "PipeRemainingInsertEventCountSmoothingIntervalSeconds: {}",
-        getPipeRemainingInsertEventCountSmoothingIntervalSeconds());
 
     LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", 
getPipeMetaReportMaxLogNumPerRound());
     LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", 
getPipeMetaReportMaxLogIntervalRounds());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 1849209d651..7086ff731bf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -208,12 +208,12 @@ public class PipeDescriptor {
                     "pipe_remaining_time_commit_rate_average_time",
                     
String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime()))
                 .trim()));
-    config.setPipeRemainingInsertNodeCountAverage(
-        PipeRateAverage.valueOf(
+    config.setPipeRemainingInsertNodeCountEMAAlpha(
+        Double.parseDouble(
             properties
                 .getProperty(
-                    "pipe_remaining_insert_node_count_average",
-                    
String.valueOf(config.getPipeRemainingInsertNodeCountAverage()))
+                    "pipe_remaining_insert_node_count_ema_alpha",
+                    
String.valueOf(config.getPipeRemainingInsertNodeCountEMAAlpha()))
                 .trim()));
   }
 
@@ -476,12 +476,6 @@ public class PipeDescriptor {
             properties.getProperty(
                 "pipe_max_allowed_total_remaining_insert_event_count",
                 
String.valueOf(config.getPipeMaxAllowedTotalRemainingInsertEventCount()))));
-    config.setPipeRemainingInsertEventCountSmoothingIntervalSeconds(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_remaining_insert_event_count_smoothing_interval_seconds",
-                String.valueOf(
-                    
config.getPipeRemainingInsertEventCountSmoothingIntervalSeconds()))));
     config.setPipeStuckRestartMinIntervalMs(
         Long.parseLong(
             properties.getProperty(

Reply via email to