This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new c92b625c99d Pipe/Load: Assign distinct progress indexes for loading 
tsfiles in time partitions to reduce pipe reprocessing after restart & Decrease 
pipe heartbeat interval (#15583) (#15608)
c92b625c99d is described below

commit c92b625c99d445180b462b12f1f223817d6c5ab7
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu May 29 18:20:12 2025 +0800

    Pipe/Load: Assign distinct progress indexes for loading tsfiles in time 
partitions to reduce pipe reprocessing after restart & Decrease pipe heartbeat 
interval (#15583) (#15608)
    
    (cherry picked from commit 9b6b32344ec8095bd75c9c48a43ddb3b745fe0e3)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 13 ++-----
 .../impl/DataNodeInternalRPCServiceImpl.java       | 21 +++++++----
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 44 +++++++++++++---------
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 39 ++++++++++++++-----
 .../iotdb/db/storageengine/StorageEngine.java      |  5 ++-
 .../db/storageengine/load/LoadTsFileManager.java   | 22 +++++++++--
 .../apache/iotdb/commons/conf/CommonConfig.java    |  2 +-
 .../src/main/thrift/datanode.thrift                |  2 +-
 8 files changed, 95 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 753075f3b20..96772535634 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -517,17 +517,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     }
 
     if (startIndex instanceof StateProgressIndex) {
-      // Some different tsFiles may share the same max progressIndex, thus 
tsFiles with an
-      // "equals" max progressIndex must be transmitted to avoid data loss
-      final ProgressIndex innerProgressIndex =
-          ((StateProgressIndex) startIndex).getInnerProgressIndex();
-      return 
!innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
-          && 
!innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
+      startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
     }
-
-    // Some different tsFiles may share the same max progressIndex, thus 
tsFiles with an
-    // "equals" max progressIndex must be transmitted to avoid data loss
-    return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
+    return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+        && !startIndex.equals(resource.getMaxProgressIndexAfterClose());
   }
 
   private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource 
resource) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 186be9835fa..7732814c109 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.common.rpc.thrift.TSettleReq;
 import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonConfig;
@@ -459,14 +460,18 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-    final ProgressIndex progressIndex;
-    if (req.isSetProgressIndex()) {
-      progressIndex = 
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
+    final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap 
= new HashMap<>();
+    if (req.isSetTimePartition2ProgressIndex()) {
+      for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
+          req.getTimePartition2ProgressIndex().entrySet()) {
+        timePartitionProgressIndexMap.put(
+            entry.getKey(), 
ProgressIndexType.deserializeFrom(entry.getValue()));
+      }
     } else {
-      // fallback to use local generated progress index for compatibility
-      progressIndex = 
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
-      LOGGER.info(
-          "Use local generated load progress index {} for uuid {}.", 
progressIndex, req.uuid);
+      final TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+      status.setMessage("Load command requires time partition to progress 
index map");
+      return createTLoadResp(status);
     }
 
     return createTLoadResp(
@@ -475,7 +480,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
                 LoadTsFileScheduler.LoadCommand.values()[req.commandType],
                 req.uuid,
                 req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
-                progressIndex));
+                timePartitionProgressIndexMap));
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 5227f230007..f50533bec47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -56,8 +57,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -219,7 +222,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   public Future<FragInstanceDispatchResult> dispatchCommand(
-      TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
+      TLoadCommandReq originalLoadCommandReq, Set<TRegionReplicaSet> 
replicaSets) {
     Set<TEndPoint> allEndPoint = new HashSet<>();
     for (TRegionReplicaSet replicaSet : replicaSets) {
       for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
@@ -228,23 +231,27 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
     }
 
     for (TEndPoint endPoint : allEndPoint) {
+      // duplicate for progress index binary serialization
+      final TLoadCommandReq duplicatedLoadCommandReq = 
originalLoadCommandReq.deepCopy();
       try (SetThreadName threadName =
           new SetThreadName(
               "load-dispatcher"
                   + "-"
-                  + 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
+                  + 
LoadTsFileScheduler.LoadCommand.values()[duplicatedLoadCommandReq.commandType]
                   + "-"
-                  + loadCommandReq.uuid)) {
+                  + duplicatedLoadCommandReq.uuid)) {
         if (isDispatchedToLocal(endPoint)) {
-          dispatchLocally(loadCommandReq);
+          dispatchLocally(duplicatedLoadCommandReq);
         } else {
-          dispatchRemote(loadCommandReq, endPoint);
+          dispatchRemote(duplicatedLoadCommandReq, endPoint);
         }
       } catch (FragmentInstanceDispatchException e) {
-        LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", 
loadCommandReq, e);
+        LOGGER.warn(
+            "Cannot dispatch LoadCommand for load operation {}", 
duplicatedLoadCommandReq, e);
         return immediateFuture(new 
FragInstanceDispatchResult(e.getFailureStatus()));
       } catch (Exception t) {
-        LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", 
loadCommandReq, t);
+        LOGGER.warn(
+            "Cannot dispatch LoadCommand for load operation {}", 
duplicatedLoadCommandReq, t);
         return immediateFuture(
             new FragInstanceDispatchResult(
                 RpcUtils.getStatus(
@@ -256,17 +263,18 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
 
   private void dispatchLocally(TLoadCommandReq loadCommandReq)
       throws FragmentInstanceDispatchException {
-    final ProgressIndex progressIndex;
-    if (loadCommandReq.isSetProgressIndex()) {
-      progressIndex =
-          
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+    final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap 
= new HashMap<>();
+    if (loadCommandReq.isSetTimePartition2ProgressIndex()) {
+      for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
+          loadCommandReq.getTimePartition2ProgressIndex().entrySet()) {
+        timePartitionProgressIndexMap.put(
+            entry.getKey(), 
ProgressIndexType.deserializeFrom(entry.getValue()));
+      }
     } else {
-      // fallback to use local generated progress index for compatibility
-      progressIndex = 
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
-      LOGGER.info(
-          "Use local generated load progress index {} for uuid {}.",
-          progressIndex,
-          loadCommandReq.uuid);
+      final TSStatus status = new TSStatus();
+      status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
+      status.setMessage("Load command requires time partition to progress 
index map");
+      throw new FragmentInstanceDispatchException(status);
     }
 
     final TSStatus resultStatus =
@@ -275,7 +283,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
                 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
                 loadCommandReq.uuid,
                 loadCommandReq.isSetIsGeneratedByPipe() && 
loadCommandReq.isGeneratedByPipe,
-                progressIndex);
+                timePartitionProgressIndexMap);
     if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
       throw new FragmentInstanceDispatchException(resultStatus);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 49a9fc9e3be..43465d7ed74 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -135,6 +136,7 @@ public class LoadTsFileScheduler implements IScheduler {
   private final PlanFragmentId fragmentId;
   private final Set<TRegionReplicaSet> allReplicaSets;
   private final boolean isGeneratedByPipe;
+  private final Map<TTimePartitionSlot, ProgressIndex> 
timePartitionSlotToProgressIndex;
   private final LoadTsFileDataCacheMemoryBlock block;
 
   public LoadTsFileScheduler(
@@ -153,6 +155,7 @@ public class LoadTsFileScheduler implements IScheduler {
     this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
     this.allReplicaSets = new HashSet<>();
     this.isGeneratedByPipe = isGeneratedByPipe;
+    this.timePartitionSlotToProgressIndex = new HashMap<>();
     this.block = 
LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
 
     for (FragmentInstance fragmentInstance : 
distributedQueryPlan.getInstances()) {
@@ -397,7 +400,26 @@ public class LoadTsFileScheduler implements IScheduler {
 
     try {
       loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
-      loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
+      loadCommandReq.setTimePartition2ProgressIndex(
+          timePartitionSlotToProgressIndex.entrySet().stream()
+              .collect(
+                  Collectors.toMap(
+                      Map.Entry::getKey,
+                      entry -> {
+                        try (final PublicBAOS byteArrayOutputStream = new 
PublicBAOS();
+                            final DataOutputStream dataOutputStream =
+                                new DataOutputStream(byteArrayOutputStream)) {
+                          entry.getValue().serialize(dataOutputStream);
+                          return ByteBuffer.wrap(
+                              byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+                        } catch (final IOException e) {
+                          throw new RuntimeException(
+                              String.format(
+                                  "Serialize Progress Index error, 
isFirstPhaseSuccess: %s, uuid: %s, tsFile: %s",
+                                  isFirstPhaseSuccess, uuid, 
tsFile.getAbsolutePath()),
+                              e);
+                        }
+                      })));
       Future<FragInstanceDispatchResult> dispatchResultFuture =
           dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
 
@@ -420,14 +442,6 @@ public class LoadTsFileScheduler implements IScheduler {
         stateMachine.transitionToFailed(status);
         return false;
       }
-    } catch (IOException e) {
-      LOGGER.warn(
-          "Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, 
tsFile: {}",
-          isFirstPhaseSuccess,
-          uuid,
-          tsFile.getAbsolutePath());
-      stateMachine.transitionToFailed(e);
-      return false;
     } catch (InterruptedException | ExecutionException e) {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
@@ -601,6 +615,12 @@ public class LoadTsFileScheduler implements IScheduler {
     return null;
   }
 
+  private void computeTimePartitionSlotToProgressIndexIfAbsent(
+      final TTimePartitionSlot timePartitionSlot) {
+    timePartitionSlotToProgressIndex.putIfAbsent(
+        timePartitionSlot, 
PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad());
+  }
+
   public enum LoadCommand {
     EXECUTE,
     ROLLBACK
@@ -642,6 +662,7 @@ public class LoadTsFileScheduler implements IScheduler {
       nonDirectionalChunkData.add(chunkData);
       dataSize += chunkData.getDataSize();
       block.addMemoryUsage(chunkData.getDataSize());
+      
scheduler.computeTimePartitionSlotToProgressIndexIfAbsent(chunkData.getTimePartitionSlot());
 
       if (!isMemoryEnough()) {
         routeChunkData();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 18d43931ba4..f923fb2f341 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -972,13 +973,13 @@ public class StorageEngine implements IService {
       LoadTsFileScheduler.LoadCommand loadCommand,
       String uuid,
       boolean isGeneratedByPipe,
-      ProgressIndex progressIndex) {
+      Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap) {
     TSStatus status = new TSStatus();
 
     try {
       switch (loadCommand) {
         case EXECUTE:
-          if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, 
progressIndex)) {
+          if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, 
timePartitionProgressIndexMap)) {
             status = RpcUtils.SUCCESS_STATUS;
           } else {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index fe8a5806113..4ab5acae15f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -264,7 +265,10 @@ public class LoadTsFileManager {
     return FOLDER_MANAGER.get().getNextFolder();
   }
 
-  public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex 
progressIndex)
+  public boolean loadAll(
+      String uuid,
+      boolean isGeneratedByPipe,
+      Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
       throws IOException, LoadFileException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
@@ -273,7 +277,7 @@ public class LoadTsFileManager {
     final Optional<CleanupTask> cleanupTask = 
Optional.of(uuid2CleanupTask.get(uuid));
     cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
     try {
-      uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+      uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, 
timePartitionProgressIndexMap);
     } finally {
       cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
     }
@@ -472,7 +476,9 @@ public class LoadTsFileManager {
       }
     }
 
-    private void loadAll(boolean isGeneratedByPipe, ProgressIndex 
progressIndex)
+    private void loadAll(
+        boolean isGeneratedByPipe,
+        Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
         throws IOException, LoadFileException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
@@ -491,7 +497,11 @@ public class LoadTsFileManager {
 
         final DataRegion dataRegion = entry.getKey().getDataRegion();
         final TsFileResource tsFileResource = 
dataPartition2Resource.get(entry.getKey());
-        endTsFileResource(writer, tsFileResource, progressIndex);
+        endTsFileResource(
+            writer,
+            tsFileResource,
+            timePartitionProgressIndexMap.getOrDefault(
+                entry.getKey().getTimePartitionSlot(), 
MinimumProgressIndex.INSTANCE));
         dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
 
         // Metrics
@@ -656,6 +666,10 @@ public class LoadTsFileManager {
       return dataRegion;
     }
 
+    public TTimePartitionSlot getTimePartitionSlot() {
+      return timePartitionSlot;
+    }
+
     @Override
     public String toString() {
       return String.join(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b55ef0347f1..c3f2e0f4a11 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -253,7 +253,7 @@ public class CommonConfig {
       (int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
 
   private boolean isSeperatedPipeHeartbeatEnabled = true;
-  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
+  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
   private long pipeMetaSyncerSyncIntervalMinutes = 3;
   private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 2445396508f..d58e1eb1591 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -345,7 +345,7 @@ struct TLoadCommandReq {
     1: required i32 commandType
     2: required string uuid
     3: optional bool isGeneratedByPipe
-    4: optional binary progressIndex
+    4: optional map<common.TTimePartitionSlot, binary> 
timePartition2ProgressIndex
 }
 
 struct TLoadResp {

Reply via email to