This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b00ad30b7f186b29d401f8e9007663fb076b07b1 Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 18 14:46:48 2025 +0800 Pipe: Fixed the insert node EMA logic for degrading (#15741) (#15758) --- .../PipeDataNodeRemainingEventAndTimeOperator.java | 30 ++++++++++--------- .../apache/iotdb/commons/conf/CommonConfig.java | 34 +++++----------------- .../iotdb/commons/pipe/config/PipeConfig.java | 13 ++------- .../iotdb/commons/pipe/config/PipeDescriptor.java | 14 +++------ 4 files changed, 31 insertions(+), 60 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java index 47dc0ff18b9..86368acf353 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java @@ -59,9 +59,7 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { private Timer insertNodeTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER; private Timer tsfileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER; - private volatile long lastInsertNodeEventCountSmoothingTime = Long.MIN_VALUE; - private final Meter insertNodeEventCountMeter = - new Meter(new ExponentialMovingAverages(), Clock.defaultClock()); + private final InsertNodeEMA insertNodeEventCountEMA = new InsertNodeEMA(); private double lastDataRegionCommitSmoothingValue = Long.MAX_VALUE; private double lastSchemaRegionCommitSmoothingValue = Long.MAX_VALUE; @@ -105,17 +103,8 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { } double getRemainingInsertEventSmoothingCount() { - if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() == PipeRateAverage.NONE) { - return insertNodeEventCount.get(); - } - if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime - >= PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds()) { - insertNodeEventCountMeter.mark(insertNodeEventCount.get()); - lastInsertNodeEventCountSmoothingTime = System.currentTimeMillis(); - } - return PipeConfig.getInstance() - .getPipeRemainingInsertNodeCountAverage() - .getMeterRate(insertNodeEventCountMeter); + insertNodeEventCountEMA.update(insertNodeEventCount.get()); + return insertNodeEventCountEMA.insertNodeEMAValue; } long getRemainingEvents() { @@ -277,4 +266,17 @@ class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator { dataRegionCommitMeter.set(null); schemaRegionCommitMeter.set(null); } + + private static class InsertNodeEMA { + private double insertNodeEMAValue; + + public void update(final double newValue) { + final double alpha = PipeConfig.getInstance().getPipeRemainingInsertNodeCountEMAAlpha(); + if (insertNodeEMAValue == 0) { + insertNodeEMAValue = newValue; + } else { + insertNodeEMAValue = alpha * newValue + (1 - alpha) * insertNodeEMAValue; + } + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index e82cc2fb1dd..59d348abe5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -289,7 +289,6 @@ public class CommonConfig { private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE; private int pipeMaxAllowedRemainingInsertEventCountPerPipe = 10000; private int pipeMaxAllowedTotalRemainingInsertEventCount = 50000; - private int pipeRemainingEventCountSmoothingIntervalSeconds = 10; private int pipeMetaReportMaxLogNumPerRound = 10; private int pipeMetaReportMaxLogIntervalRounds = 36; @@ -311,7 +310,7 @@ public class CommonConfig { private int pipeSnapshotExecutionMaxBatchSize = 1000; private long pipeRemainingTimeCommitRateAutoSwitchSeconds = 30; private PipeRateAverage pipeRemainingTimeCommitRateAverageTime = PipeRateAverage.FIVE_MINUTES; - private PipeRateAverage pipeRemainingInsertNodeCountAverage = PipeRateAverage.ONE_MINUTE; + private double pipeRemainingInsertNodeCountEMAAlpha = 0.1; private double pipeTsFileScanParsingThreshold = 0.05; private double pipeDynamicMemoryHistoryWeight = 0.5; private double pipeDynamicMemoryAdjustmentThreshold = 0.05; @@ -1603,23 +1602,6 @@ public class CommonConfig { pipeMaxAllowedTotalRemainingInsertEventCount); } - public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() { - return pipeRemainingEventCountSmoothingIntervalSeconds; - } - - public void setPipeRemainingInsertEventCountSmoothingIntervalSeconds( - int pipeRemainingEventCountSmoothingIntervalSeconds) { - if (this.pipeRemainingEventCountSmoothingIntervalSeconds - == pipeRemainingEventCountSmoothingIntervalSeconds) { - return; - } - this.pipeRemainingEventCountSmoothingIntervalSeconds = - pipeRemainingEventCountSmoothingIntervalSeconds; - logger.info( - "pipeRemainingEventCountSmoothingIntervalSeconds is set to {}", - pipeRemainingEventCountSmoothingIntervalSeconds); - } - public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) { if (this.pipeStuckRestartIntervalSeconds == pipeStuckRestartIntervalSeconds) { return; @@ -1935,19 +1917,19 @@ public class CommonConfig { pipeRemainingTimeCommitRateAverageTime); } - public PipeRateAverage getPipeRemainingInsertNodeCountAverage() { - return pipeRemainingInsertNodeCountAverage; + public double getPipeRemainingInsertNodeCountEMAAlpha() { + return pipeRemainingInsertNodeCountEMAAlpha; } - public void setPipeRemainingInsertNodeCountAverage( - PipeRateAverage pipeRemainingInsertNodeCountAverage) { + public void setPipeRemainingInsertNodeCountEMAAlpha( + final double pipeRemainingInsertNodeCountEMAAlpha) { if (Objects.equals( - this.pipeRemainingInsertNodeCountAverage, pipeRemainingInsertNodeCountAverage)) { + this.pipeRemainingInsertNodeCountEMAAlpha, pipeRemainingInsertNodeCountEMAAlpha)) { return; } - this.pipeRemainingInsertNodeCountAverage = pipeRemainingInsertNodeCountAverage; + this.pipeRemainingInsertNodeCountEMAAlpha = pipeRemainingInsertNodeCountEMAAlpha; logger.info( - "pipeRemainingInsertEventCountAverage is set to {}", pipeRemainingInsertNodeCountAverage); + "pipeRemainingInsertEventCountAverage is set to {}", pipeRemainingInsertNodeCountEMAAlpha); } public double getPipeTsFileScanParsingThreshold() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index a616d80b64e..be0c70d7f42 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -251,8 +251,8 @@ public class PipeConfig { return COMMON_CONFIG.getPipeRemainingTimeCommitRateAverageTime(); } - public PipeRateAverage getPipeRemainingInsertNodeCountAverage() { - return COMMON_CONFIG.getPipeRemainingInsertNodeCountAverage(); + public double getPipeRemainingInsertNodeCountEMAAlpha() { + return COMMON_CONFIG.getPipeRemainingInsertNodeCountEMAAlpha(); } public double getPipeTsFileScanParsingThreshold() { @@ -383,10 +383,6 @@ public class PipeConfig { return COMMON_CONFIG.getPipeMaxAllowedTotalRemainingInsertEventCount(); } - public int getPipeRemainingInsertEventCountSmoothingIntervalSeconds() { - return COMMON_CONFIG.getPipeRemainingInsertEventCountSmoothingIntervalSeconds(); - } - /////////////////////////////// Logger /////////////////////////////// public int getPipeMetaReportMaxLogNumPerRound() { @@ -551,7 +547,7 @@ public class PipeConfig { LOGGER.info( "PipeRemainingTimeCommitRateAverageTime: {}", getPipeRemainingTimeCommitRateAverageTime()); LOGGER.info( - "PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountAverage()); + "PipePipeRemainingInsertEventCountAverage: {}", getPipeRemainingInsertNodeCountEMAAlpha()); LOGGER.info("PipeTsFileScanParsingThreshold(): {}", getPipeTsFileScanParsingThreshold()); LOGGER.info("PipeTransferTsFileSync: {}", isTransferTsFileSync()); @@ -645,9 +641,6 @@ public class PipeConfig { LOGGER.info( "PipeMaxAllowedTotalRemainingInsertEventCount: {}", getPipeMaxAllowedTotalRemainingInsertEventCount()); - LOGGER.info( - "PipeRemainingInsertEventCountSmoothingIntervalSeconds: {}", - getPipeRemainingInsertEventCountSmoothingIntervalSeconds()); LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound()); LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 1849209d651..7086ff731bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -208,12 +208,12 @@ public class PipeDescriptor { "pipe_remaining_time_commit_rate_average_time", String.valueOf(config.getPipeRemainingTimeCommitRateAverageTime())) .trim())); - config.setPipeRemainingInsertNodeCountAverage( - PipeRateAverage.valueOf( + config.setPipeRemainingInsertNodeCountEMAAlpha( + Double.parseDouble( properties .getProperty( - "pipe_remaining_insert_node_count_average", - String.valueOf(config.getPipeRemainingInsertNodeCountAverage())) + "pipe_remaining_insert_node_count_ema_alpha", + String.valueOf(config.getPipeRemainingInsertNodeCountEMAAlpha())) .trim())); } @@ -476,12 +476,6 @@ public class PipeDescriptor { properties.getProperty( "pipe_max_allowed_total_remaining_insert_event_count", String.valueOf(config.getPipeMaxAllowedTotalRemainingInsertEventCount())))); - config.setPipeRemainingInsertEventCountSmoothingIntervalSeconds( - Integer.parseInt( - properties.getProperty( - "pipe_remaining_insert_event_count_smoothing_interval_seconds", - String.valueOf( - config.getPipeRemainingInsertEventCountSmoothingIntervalSeconds())))); config.setPipeStuckRestartMinIntervalMs( Long.parseLong( properties.getProperty(
