This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fedfa10fc8d1ca922873c9f3319a6987afc7f72a
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 19:24:46 2025 +0800

    [To dev/1.3] Pipe: Reduced the progress index report interval & Added some 
logs (#15905) (#15908)
    
    * partial
    
    * Changed default
    
    * Update PipeDataNodeTaskAgent.java
    
    * Next
---
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   | 20 ++++++------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 37 ++++++++++++----------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  8 ++---
 .../iotdb/commons/pipe/config/PipeConfig.java      |  2 +-
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  2 +-
 .../commons/pipe/resource/log/PipeLogManager.java  |  2 +-
 .../commons/pipe/resource/log/PipeLogStatus.java   |  4 +--
 7 files changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 911909d104c..b79fd8da94f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -207,21 +207,19 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
     if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
       return;
     }
-
-    LOGGER.info("Received pipe heartbeat request {} from config coordinator.", 
req.heartbeatId);
+    final Optional<Logger> logger =
+        PipeConfigNodeResourceManager.log()
+            .schedule(
+                PipeConfigNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
+    LOGGER.debug("Received pipe heartbeat request {} from config 
coordinator.", req.heartbeatId);
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeConfigNodeResourceManager.log()
-              .schedule(
-                  PipeConfigNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
-
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -244,7 +242,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
                     remainingEventCount,
                     estimatedRemainingTime));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException e) {
       throw new TException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 15e20347393..f800c15c3f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -381,6 +381,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     if (PipeDataNodeAgent.runtime().isShutdown()) {
       return;
     }
+    final Optional<Logger> logger =
+        PipeDataNodeResourceManager.log()
+            .schedule(
+                PipeDataNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
 
     final Set<Integer> dataRegionIds =
         StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -392,13 +399,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeDataNodeResourceManager.log()
-              .schedule(
-                  PipeDataNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -445,7 +445,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                     remainingEventAndTime.getLeft(),
                     remainingEventAndTime.getRight()));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
@@ -460,10 +460,18 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   protected void collectPipeMetaListInternal(
       final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
+    // If the heartbeatId == Long.MIN_VALUE then it's shutdown report and 
shall not be skipped
     if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != 
Long.MIN_VALUE) {
       return;
     }
-    LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
+    final Optional<Logger> logger =
+        PipeDataNodeResourceManager.log()
+            .schedule(
+                PipeDataNodeTaskAgent.class,
+                PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
+                
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
+                pipeMetaKeeper.getPipeMetaCount());
+    LOGGER.debug("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
 
     final Set<Integer> dataRegionIds =
         StorageEngine.getInstance().getAllDataRegionIds().stream()
@@ -475,13 +483,6 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     final List<Long> pipeRemainingEventCountList = new ArrayList<>();
     final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
-      final Optional<Logger> logger =
-          PipeDataNodeResourceManager.log()
-              .schedule(
-                  PipeDataNodeTaskAgent.class,
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogNumPerRound(),
-                  
PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(),
-                  pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
@@ -528,7 +529,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                     remainingEventAndTime.getLeft(),
                     remainingEventAndTime.getRight()));
       }
-      LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
+      logger.ifPresent(l -> l.info("Reported {} pipe metas.", 
pipeMetaBinaryList.size()));
     } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
@@ -613,6 +614,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
               IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
         LOGGER.warn("Failed to persist progress index to configNode, status: 
{}", result);
+      } else {
+        LOGGER.info("Successfully persisted all pipe's info to configNode.");
       }
     } catch (final Exception e) {
       LOGGER.warn(e.getMessage());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index dfcbba78cff..3ac297e1c69 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -284,7 +284,7 @@ public class CommonConfig {
       (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
-  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
+  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 3;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
   private long pipeMetaSyncerSyncIntervalMinutes = 3;
   private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
@@ -303,8 +303,8 @@ public class CommonConfig {
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 
Integer.MAX_VALUE; // Deprecated
   private long pipeMaxAllowedLinkedTsFileCount = Long.MAX_VALUE; // Deprecated
 
-  private int pipeMetaReportMaxLogNumPerRound = 10;
-  private int pipeMetaReportMaxLogIntervalRounds = 36;
+  private double pipeMetaReportMaxLogNumPerRound = 0.1;
+  private int pipeMetaReportMaxLogIntervalRounds = 360;
   private int pipeTsFilePinMaxLogNumPerRound = 10;
   private int pipeTsFilePinMaxLogIntervalRounds = 90;
 
@@ -1581,7 +1581,7 @@ public class CommonConfig {
     return pipeMetaReportMaxLogNumPerRound;
   }
 
-  public void setPipeMetaReportMaxLogNumPerRound(int 
pipeMetaReportMaxLogNumPerRound) {
+  public void setPipeMetaReportMaxLogNumPerRound(double 
pipeMetaReportMaxLogNumPerRound) {
     if (this.pipeMetaReportMaxLogNumPerRound == 
pipeMetaReportMaxLogNumPerRound) {
       return;
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4ce2fd19093..25fe88d72c9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -347,7 +347,7 @@ public class PipeConfig {
 
   /////////////////////////////// Logger ///////////////////////////////
 
-  public int getPipeMetaReportMaxLogNumPerRound() {
+  public double getPipeMetaReportMaxLogNumPerRound() {
     return COMMON_CONFIG.getPipeMetaReportMaxLogNumPerRound();
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index d6bad9d18f6..9e13bb04ade 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -118,7 +118,7 @@ public class PipeDescriptor {
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
 
     config.setPipeMetaReportMaxLogNumPerRound(
-        Integer.parseInt(
+        Double.parseDouble(
             properties.getProperty(
                 "pipe_meta_report_max_log_num_per_round",
                 String.valueOf(config.getPipeMetaReportMaxLogNumPerRound()))));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
index 49699fdf878..69d8b5294db 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogManager.java
@@ -34,7 +34,7 @@ public class PipeLogManager {
 
   public Optional<Logger> schedule(
       final Class<?> logClass,
-      final int maxAverageScale,
+      final double maxAverageScale,
       final int maxLogInterval,
       final int scale) {
     return logClass2LogStatusMap
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
index 3b4db49d065..a30b12ab5eb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogStatus.java
@@ -29,11 +29,11 @@ class PipeLogStatus {
 
   private final Logger logger;
 
-  private final int maxAverageScale;
+  private final double maxAverageScale;
   private final int maxLogInterval;
   private final AtomicLong currentRounds = new AtomicLong(0);
 
-  PipeLogStatus(final Class<?> logClass, final int maxAverageScale, final int 
maxLogInterval) {
+  PipeLogStatus(final Class<?> logClass, final double maxAverageScale, final 
int maxLogInterval) {
     logger = LoggerFactory.getLogger(logClass);
 
     this.maxAverageScale = maxAverageScale;

Reply via email to