This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fedfa10fc8d1ca922873c9f3319a6987afc7f72a Author: Caideyipi <[email protected]> AuthorDate: Thu Jul 10 19:24:46 2025 +0800 [To dev/1.3] Pipe: Reduced the progress index report interval & Added some logs (#15905) (#15908) * partial * Changed default * Update PipeDataNodeTaskAgent.java * Next --- .../pipe/agent/task/PipeConfigNodeTaskAgent.java | 20 ++++++------ .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 37 ++++++++++++---------- .../apache/iotdb/commons/conf/CommonConfig.java | 8 ++--- .../iotdb/commons/pipe/config/PipeConfig.java | 2 +- .../iotdb/commons/pipe/config/PipeDescriptor.java | 2 +- .../commons/pipe/resource/log/PipeLogManager.java | 2 +- .../commons/pipe/resource/log/PipeLogStatus.java | 4 +-- 7 files changed, 38 insertions(+), 37 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java index 911909d104c..b79fd8da94f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java @@ -207,21 +207,19 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent { if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) { return; } - - LOGGER.info("Received pipe heartbeat request {} from config coordinator.", req.heartbeatId); + final Optional<Logger> logger = + PipeConfigNodeResourceManager.log() + .schedule( + PipeConfigNodeTaskAgent.class, + PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), + pipeMetaKeeper.getPipeMetaCount()); + LOGGER.debug("Received pipe heartbeat request {} from config coordinator.", req.heartbeatId); final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>(); final List<Long> pipeRemainingEventCountList = new ArrayList<>(); final List<Double> pipeRemainingTimeList = new ArrayList<>(); try { - final Optional<Logger> logger = - PipeConfigNodeResourceManager.log() - .schedule( - PipeConfigNodeTaskAgent.class, - PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), - PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), - pipeMetaKeeper.getPipeMetaCount()); - for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -244,7 +242,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent { remainingEventCount, estimatedRemainingTime)); } - LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); + logger.ifPresent(l -> l.info("Reported {} pipe metas.", pipeMetaBinaryList.size())); } catch (final IOException e) { throw new TException(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 15e20347393..f800c15c3f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -381,6 +381,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { if (PipeDataNodeAgent.runtime().isShutdown()) { return; } + final Optional<Logger> logger = + PipeDataNodeResourceManager.log() + .schedule( + PipeDataNodeTaskAgent.class, + PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), + pipeMetaKeeper.getPipeMetaCount()); final Set<Integer> dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds().stream() @@ -392,13 +399,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final List<Long> pipeRemainingEventCountList = new ArrayList<>(); final List<Double> pipeRemainingTimeList = new ArrayList<>(); try { - final Optional<Logger> logger = - PipeDataNodeResourceManager.log() - .schedule( - PipeDataNodeTaskAgent.class, - PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), - PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), - pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -445,7 +445,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { remainingEventAndTime.getLeft(), remainingEventAndTime.getRight())); } - LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); + logger.ifPresent(l -> l.info("Reported {} pipe metas.", pipeMetaBinaryList.size())); } catch (final IOException | IllegalPathException e) { throw new TException(e); } @@ -460,10 +460,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { protected void collectPipeMetaListInternal( final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { // Do nothing if data node is removing or removed, or request does not need pipe meta list + // If the heartbeatId == Long.MIN_VALUE then it's shutdown report and shall not be skipped if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) { return; } - LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId); + final Optional<Logger> logger = + PipeDataNodeResourceManager.log() + .schedule( + PipeDataNodeTaskAgent.class, + PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), + PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), + pipeMetaKeeper.getPipeMetaCount()); + LOGGER.debug("Received pipe heartbeat request {} from config node.", req.heartbeatId); final Set<Integer> dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds().stream() @@ -475,13 +483,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final List<Long> pipeRemainingEventCountList = new ArrayList<>(); final List<Double> pipeRemainingTimeList = new ArrayList<>(); try { - final Optional<Logger> logger = - PipeDataNodeResourceManager.log() - .schedule( - PipeDataNodeTaskAgent.class, - PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(), - PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), - pipeMetaKeeper.getPipeMetaCount()); for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { pipeMetaBinaryList.add(pipeMeta.serialize()); @@ -528,7 +529,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { remainingEventAndTime.getLeft(), remainingEventAndTime.getRight())); } - LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size()); + logger.ifPresent(l -> l.info("Reported {} pipe metas.", pipeMetaBinaryList.size())); } catch (final IOException | IllegalPathException e) { throw new TException(e); } @@ -613,6 +614,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) { LOGGER.warn("Failed to persist progress index to configNode, status: {}", result); + } else { + LOGGER.info("Successfully persisted all pipe's info to configNode."); } } catch (final Exception e) { LOGGER.warn(e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index dfcbba78cff..3ac297e1c69 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -284,7 +284,7 @@ public class CommonConfig { (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8); private boolean isSeperatedPipeHeartbeatEnabled = true; - private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30; + private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3; private long pipeMetaSyncerInitialSyncDelayMinutes = 3; private long pipeMetaSyncerSyncIntervalMinutes = 3; private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1; @@ -303,8 +303,8 @@ public class CommonConfig { private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated - private int pipeMetaReportMaxLogNumPerRound = 10; - private int pipeMetaReportMaxLogIntervalRounds = 36; + private double pipeMetaReportMaxLogNumPerRound = 0.1; + private int pipeMetaReportMaxLogIntervalRounds = 360; private int pipeTsFilePinMaxLogNumPerRound = 10; private int pipeTsFilePinMaxLogIntervalRounds = 90; @@ -1581,7 +1581,7 @@ public class CommonConfig { return pipeMetaReportMaxLogNumPerRound; } - public void setPipeMetaReportMaxLogNumPerRound(int pipeMetaReportMaxLogNumPerRound) { + public void setPipeMetaReportMaxLogNumPerRound(double pipeMetaReportMaxLogNumPerRound) { if (this.pipeMetaReportMaxLogNumPerRound == pipeMetaReportMaxLogNumPerRound) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index 4ce2fd19093..25fe88d72c9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -347,7 +347,7 @@ public class PipeConfig { /////////////////////////////// Logger /////////////////////////////// - public int getPipeMetaReportMaxLogNumPerRound() { + public double getPipeMetaReportMaxLogNumPerRound() { return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index d6bad9d18f6..9e13bb04ade 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -118,7 +118,7 @@ public class PipeDescriptor { Integer.toString(config.getPipeAirGapReceiverPort())))); config.setPipeMetaReportMaxLogNumPerRound( - Integer.parseInt( + Double.parseDouble( properties.getProperty( "pipe_meta_report_max_log_num_per_round", String.valueOf(config.getPipeMetaReportMaxLogNumPerRound())))); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java index 49699fdf878..69d8b5294db 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java @@ -34,7 +34,7 @@ public class PipeLogManager { public Optional<Logger> schedule( final Class<?> logClass, - final int maxAverageScale, + final double maxAverageScale, final int maxLogInterval, final int scale) { return logClass2LogStatusMap diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java index 3b4db49d065..a30b12ab5eb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java @@ -29,11 +29,11 @@ class PipeLogStatus { private final Logger logger; - private final int maxAverageScale; + private final double maxAverageScale; private final int maxLogInterval; private final AtomicLong currentRounds = new AtomicLong(0); - PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int maxLogInterval) { + PipeLogStatus(final Class<?> logClass, final double maxAverageScale, final int maxLogInterval) { logger = LoggerFactory.getLogger(logClass); this.maxAverageScale = maxAverageScale;
