This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6405a06885fd5c420650b2b89c4c7773e29d790f
Author: 陈 哲涵 <[email protected]>
AuthorDate: Tue Jul 15 10:55:27 2025 +0000

    [TIMECHODB] Continue to revert the local persisting logic
---
 .../runtime/heartbeat/PipeHeartbeatParser.java     |   2 -
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   7 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  10 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  14 +-
 .../confignode/service/ConfigNodeShutdownHook.java |   3 -
 .../request/ConfigPhysicalPlanSerDeTest.java       |  12 +-
 .../consensus/response/pipe/PipeTableRespTest.java |   6 +-
 .../agent/PipeConfigNodeSubtaskExecutorTest.java   |   3 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   4 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   5 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     |   3 -
 .../agent/runtime/PipePeriodicalJobExecutor.java   |  33 -----
 .../commons/pipe/agent/task/PipeTaskAgent.java     |   3 -
 .../commons/pipe/agent/task/meta/PipeMeta.java     |   6 +-
 .../pipe/agent/task/meta/PipeRuntimeMeta.java      |  36 ++----
 .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 141 +--------------------
 .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java |  22 +---
 17 files changed, 42 insertions(+), 268 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 6f6c0b2d443..e2303fecdea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -244,7 +244,6 @@ public class PipeHeartbeatParser {
                 .equals(PipeStatus.STOPPED)) {
               PipeRuntimeMeta runtimeMeta = 
pipeMetaFromCoordinator.getRuntimeMeta();
               runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-              runtimeMeta.onSetPipeDroppedOrStopped();
               runtimeMeta.setIsStoppedByRuntimeException(true);
 
               needWriteConsensusOnConfigNodes.set(true);
@@ -274,7 +273,6 @@ public class PipeHeartbeatParser {
                               exceptionMap.put(nodeId, exception);
                             }
                             runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-                            runtimeMeta.onSetPipeDroppedOrStopped();
                             runtimeMeta.setIsStoppedByRuntimeException(true);
 
                             needWriteConsensusOnConfigNodes.set(true);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 880271f8d01..372209bfd74 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -616,11 +616,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             if (newLeader != -1) {
                               consensusGroupIdToTaskMetaMap.put(
                                   consensusGroupId.getId(),
-                                  new PipeTaskMeta(
-                                      MinimumProgressIndex.INSTANCE,
-                                      newLeader,
-                                      consensusGroupId.getId(),
-                                      false));
+                                  new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader));
                             }
                             // else:
                             // "The pipe task meta does not contain the data 
region group {} or
@@ -794,7 +790,6 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
                   // Mark the status of the pipe with exception as stopped
                   runtimeMeta.getStatus().set(PipeStatus.STOPPED);
-                  runtimeMeta.onSetPipeDroppedOrStopped();
                   runtimeMeta.setIsStoppedByRuntimeException(true);
 
                   final Map<Integer, PipeRuntimeException> exceptionMap =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index b17795afa43..b11c74408a6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -155,11 +155,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                 // Pipe only collect user's data, filter metric database here.
                 updatedConsensusGroupIdToTaskMetaMap.put(
                     regionGroupId.getId(),
-                    new PipeTaskMeta(
-                        currentPipeTaskMeta.getProgressIndex(),
-                        regionLeaderNodeId,
-                        regionGroupId.getId(),
-                        false));
+                    new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), 
regionLeaderNodeId));
               }
             });
 
@@ -174,9 +170,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           new PipeTaskMeta(
               configRegionTaskMeta.getProgressIndex(),
               // The leader of the config region is the config node itself
-              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-              Integer.MIN_VALUE,
-              false));
+              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
     }
 
     updatedPipeRuntimeMeta = new 
PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 3b91aa07802..813d4ebe69e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -171,9 +171,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           groupId.getId(),
           new PipeTaskMeta(
               new RecoverProgressIndex(senderDataNodeId, new 
SimpleProgressIndex(0, 0)),
-              senderDataNodeId,
-              groupId.getId(),
-              false));
+              senderDataNodeId));
     } else {
       // data regions & schema regions
       env.getConfigManager()
@@ -189,11 +187,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
                   // Pipe only collect user's data, filter out metric database 
here.
                   consensusGroupIdToTaskMetaMap.put(
                       regionGroupId.getId(),
-                      new PipeTaskMeta(
-                          MinimumProgressIndex.INSTANCE,
-                          regionLeaderNodeId,
-                          regionGroupId.getId(),
-                          false));
+                      new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 
regionLeaderNodeId));
                 }
               });
 
@@ -206,9 +200,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           new PipeTaskMeta(
               MinimumProgressIndex.INSTANCE,
               // The leader of the config region is the config node itself
-              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-              Integer.MIN_VALUE,
-              false));
+              ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId()));
     }
 
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
index bd12adbd804..5c3ec5af063 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -88,8 +87,6 @@ public class ConfigNodeShutdownHook extends Thread {
             "Reporting ConfigNode shutdown failed. The cluster will still take 
the current ConfigNode as Running for a few seconds.");
       }
     }
-    // Shutdown pipe progressIndex background service
-    PipePeriodicalJobExecutor.shutdownBackgroundService();
 
     if (LOGGER.isInfoEnabled()) {
       LOGGER.info(
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index c09dafb1d29..de2a350571d 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -849,7 +849,7 @@ public class ConfigPhysicalPlanSerDeTest {
     extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -874,7 +874,7 @@ public class ConfigPhysicalPlanSerDeTest {
     extractorAttributes.put("pattern", "root.db");
     processorAttributes.put("processor", "do-nothing-processor");
     connectorAttributes.put("batch.enable", "false");
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -912,7 +912,7 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void OperateMultiplePipesPlanV2Test() throws IOException {
-    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    final PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new 
ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -925,7 +925,7 @@ public class ConfigPhysicalPlanSerDeTest {
     PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
     CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, 
pipeRuntimeMeta);
 
-    final PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false);
+    final PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2);
     final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new 
ConcurrentHashMap<>();
     pipeTasks.put(2, pipeTaskMeta1);
     PipeStaticMeta pipeStaticMeta1 =
@@ -1024,8 +1024,8 @@ public class ConfigPhysicalPlanSerDeTest {
         new PipeRuntimeMeta(
             new ConcurrentHashMap<Integer, PipeTaskMeta>() {
               {
-                put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1, 
false));
-                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789, 
1, false));
+                put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987));
+                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789));
               }
             });
     pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index 04dea675015..94189a19d99 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -54,7 +54,7 @@ public class PipeTableRespTest {
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -74,7 +74,7 @@ public class PipeTableRespTest {
     connectorAttributes1.put("host", "127.0.0.1");
     connectorAttributes1.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta1 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new 
ConcurrentHashMap<>();
     pipeTasks1.put(1, pipeTaskMeta1);
     PipeStaticMeta pipeStaticMeta1 =
@@ -94,7 +94,7 @@ public class PipeTableRespTest {
     connectorAttributes2.put("host", "172.30.30.30");
     connectorAttributes2.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta2 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta2 = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new 
ConcurrentHashMap<>();
     pipeTasks2.put(1, pipeTaskMeta2);
     PipeStaticMeta pipeStaticMeta2 =
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
index a43a87b120e..f2fa5b0205a 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java
@@ -62,8 +62,7 @@ public class PipeConfigNodeSubtaskExecutorTest {
                         
BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName());
                   }
                 },
-                new PipeTaskMeta(
-                    MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE, 
Integer.MIN_VALUE, false)));
+                new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 
Integer.MIN_VALUE)));
   }
 
   @After
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 815d5c1757e..c3e7916108f 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -86,7 +86,7 @@ public class PipeInfoTest {
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -121,7 +121,7 @@ public class PipeInfoTest {
     extractorAttributes.put("extractor", 
"org.apache.iotdb.pipe.extractor.DefaultExtractor");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocol.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false);
+    PipeTaskMeta pipeTaskMeta = new 
PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
     ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
     pipeTasks.put(1, pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 41cc715eb52..c1c4893ae8a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -261,10 +261,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     pipeName = environment.getPipeName();
     creationTime = environment.getCreationTime();
     pipeTaskMeta = environment.getPipeTaskMeta();
-
-    // progressIndex is immutable in 
`updateToMinimumEqualOrIsAfterProgressIndex`, so data
-    // consistency in `environment.getPipeTaskMeta().getProgressIndex()` is 
ensured.
-    startIndex = environment.getPipeTaskMeta().restoreProgressIndex();
+    startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
     dataRegionId = environment.getRegionId();
     pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 8dd3d39c92c..5b62a0d614c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -123,8 +122,6 @@ public class DataNodeShutdownHook extends Thread {
     }
     // Persist progress index before shutdown to accurate recovery after 
restart
     PipeDataNodeAgent.task().persistAllProgressIndex();
-    // Shutdown pipe progressIndex background service
-    PipePeriodicalJobExecutor.shutdownBackgroundService();
 
     // Actually stop all services started by the DataNode.
     // If we don't call this, services like the RestService are not stopped 
and I can't re-start
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index 33ac03c5c96..3226b3947f0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,16 +21,8 @@ package org.apache.iotdb.commons.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 /**
  * The shortest scheduling cycle for these jobs is {@link
  * PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()}, 
suitable for jobs that are
@@ -38,31 +30,6 @@ import java.util.concurrent.TimeUnit;
  */
 public class PipePeriodicalJobExecutor extends 
AbstractPipePeriodicalJobExecutor {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
-  // This background service is used to execute jobs that need to be cancelled 
and released.
-  private static final ScheduledExecutorService backgroundService =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName());
-
-  public static Future<?> submitBackgroundJob(
-      Runnable job, long initialDelayInMs, long periodInMs) {
-    return ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        backgroundService, job, initialDelayInMs, periodInMs, 
TimeUnit.MILLISECONDS);
-  }
-
-  public static void shutdownBackgroundService() {
-    backgroundService.shutdownNow();
-    try {
-      if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) {
-        LOGGER.warn("Pipe progressIndex background service did not terminate 
within {}s", 30);
-      }
-    } catch (InterruptedException e) {
-      LOGGER.warn(
-          "Pipe progressIndex background service is interrupted while waiting 
for termination");
-      Thread.currentThread().interrupt();
-    }
-  }
-
   public PipePeriodicalJobExecutor() {
     super(
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 55aa6c5f17a..d1ea8807252 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -544,7 +544,6 @@ public abstract class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when 
executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
 
     // Drop pipe tasks
     final Map<Integer, PipeTask> pipeTasks =
@@ -586,7 +585,6 @@ public abstract class PipeTaskAgent {
     // but the pipe task meta has not been cleaned up (in case of failure when 
executing
     // dropPipeTaskByConsensusGroup).
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
 
     // Drop pipe tasks
     final Map<Integer, PipeTask> pipeTasks =
@@ -679,7 +677,6 @@ public abstract class PipeTaskAgent {
 
     // Set pipe meta status to STOPPED
     existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
-    existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped();
   }
 
   ////////////////////////// Checker //////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index c71156a234b..997278010e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-
 import org.apache.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
@@ -83,9 +81,7 @@ public class PipeMeta {
 
   public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
     final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
-    final PipeRuntimeMeta runtimeMeta =
-        PipeRuntimeMeta.deserialize(
-            byteBuffer, 
PipeConfig.getInstance().isPipeProgressIndexPersistEnabled());
+    final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(byteBuffer);
     return new PipeMeta(
         staticMeta,
         runtimeMeta,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
index 752edae0cf0..8c22a7bfd55 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java
@@ -110,10 +110,6 @@ public class PipeRuntimeMeta {
     return status;
   }
 
-  public void onSetPipeDroppedOrStopped() {
-    
consensusGroupId2TaskMetaMap.values().forEach(PipeTaskMeta::cancelPersistProgressIndexFuture);
-  }
-
   public ConcurrentMap<Integer, PipeTaskMeta> 
getConsensusGroupId2TaskMetaMap() {
     return consensusGroupId2TaskMetaMap;
   }
@@ -200,9 +196,7 @@ public class PipeRuntimeMeta {
     for (int i = 0; i < size; ++i) {
       final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_1, inputStream, taskIndex, 
false));
+          taskIndex, 
PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, inputStream));
     }
 
     return pipeRuntimeMeta;
@@ -217,9 +211,7 @@ public class PipeRuntimeMeta {
     for (int i = 0; i < size; ++i) {
       final int taskIndex = ReadWriteIOUtils.readInt(inputStream);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_2, inputStream, taskIndex, 
false));
+          taskIndex, 
PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, inputStream));
     }
 
     size = ReadWriteIOUtils.readInt(inputStream);
@@ -235,20 +227,15 @@ public class PipeRuntimeMeta {
     return pipeRuntimeMeta;
   }
 
-  public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) {
-    return deserialize(byteBuffer, false);
-  }
-
-  public static PipeRuntimeMeta deserialize(
-      final ByteBuffer byteBuffer, final boolean needPersist) {
+  public static PipeRuntimeMeta deserialize(final ByteBuffer byteBuffer) {
     final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer);
     final PipeRuntimeMetaVersion pipeRuntimeMetaVersion =
         PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte);
     switch (pipeRuntimeMetaVersion) {
       case VERSION_1:
-        return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte, 
needPersist);
+        return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte);
       case VERSION_2:
-        return deserializeVersion2(byteBuffer, needPersist);
+        return deserializeVersion2(byteBuffer);
       default:
         throw new UnsupportedOperationException(
             "Unknown pipe runtime meta version: " + 
pipeRuntimeMetaVersion.getVersion());
@@ -256,7 +243,7 @@ public class PipeRuntimeMeta {
   }
 
   private static PipeRuntimeMeta deserializeVersion1(
-      ByteBuffer byteBuffer, byte pipeRuntimeVersionByte, final boolean 
needPersist) {
+      ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte));
@@ -265,16 +252,13 @@ public class PipeRuntimeMeta {
     for (int i = 0; i < size; ++i) {
       final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_1, byteBuffer, taskIndex, 
needPersist));
+          taskIndex, 
PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, byteBuffer));
     }
 
     return pipeRuntimeMeta;
   }
 
-  public static PipeRuntimeMeta deserializeVersion2(
-      ByteBuffer byteBuffer, final boolean needPersist) {
+  public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) {
     final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta();
 
     
pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer)));
@@ -283,9 +267,7 @@ public class PipeRuntimeMeta {
     for (int i = 0; i < size; ++i) {
       final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer);
       pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put(
-          taskIndex,
-          PipeTaskMeta.deserialize(
-              PipeRuntimeMetaVersion.VERSION_2, byteBuffer, taskIndex, 
needPersist));
+          taskIndex, 
PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, byteBuffer));
     }
 
     size = ReadWriteIOUtils.readInt(byteBuffer);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
index 6a4ab25db7e..638d879bfa7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,51 +26,30 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
-import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTaskMeta {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskMeta.class);
-  private static final String PREFIX = "__progressIndex_";
 
   private final AtomicReference<ProgressIndex> progressIndex = new 
AtomicReference<>();
   private final AtomicInteger leaderNodeId = new AtomicInteger(0);
 
-  private final AtomicLong updateCount = new AtomicLong(0);
-  private final AtomicLong lastPersistCount = new AtomicLong(0);
-  private final long checkPointGap =
-      PipeConfig.getInstance().getPipeProgressIndexPersistCheckPointGap();
-  private File progressIndexPersistFile;
-  private final AtomicBoolean isRegisterPersistTask = new AtomicBoolean(false);
-  private Future<?> persistProgressIndexFuture;
-
   /**
    * Stores the exceptions encountered during run time of each pipe task.
    *
@@ -84,26 +62,9 @@ public class PipeTaskMeta {
   private final Set<PipeRuntimeException> exceptionMessages =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-  public PipeTaskMeta(
-      /* @NotNull */ final ProgressIndex progressIndex,
-      final int leaderNodeId,
-      final int taskIndex,
-      final boolean needPersistProgressIndex) {
+  public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final 
int leaderNodeId) {
     this.progressIndex.set(progressIndex);
     this.leaderNodeId.set(leaderNodeId);
-    // PipeTaskMeta created in configNode doesn't need to persist progress 
index.
-    if (needPersistProgressIndex) {
-      this.progressIndexPersistFile =
-          new File(
-              IoTDBConstant.DN_DEFAULT_DATA_DIR
-                  + File.separator
-                  + IoTDBConstant.SYSTEM_FOLDER_NAME
-                  + File.separator
-                  + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
-                  + File.separator
-                  + 
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
-              PREFIX + taskIndex);
-    }
   }
 
   public ProgressIndex getProgressIndex() {
@@ -111,89 +72,8 @@ public class PipeTaskMeta {
   }
 
   public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
-    // only pipeTaskMeta that need to updateProgressIndex will persist 
progress index
-    // isRegisterPersistTask is used to avoid multiple threads registering 
persist task concurrently
-    if (Objects.nonNull(progressIndexPersistFile)
-        && !isRegisterPersistTask.getAndSet(true)
-        && this.persistProgressIndexFuture == null
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-      this.persistProgressIndexFuture =
-          PipePeriodicalJobExecutor.submitBackgroundJob(
-              () -> {
-                if 
(PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-                  persistProgressIndex();
-                }
-              },
-              0,
-              PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
-    }
-
-    progressIndex.updateAndGet(
+    return progressIndex.updateAndGet(
         index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
-    if (Objects.nonNull(progressIndexPersistFile)
-        && updateCount.incrementAndGet() - lastPersistCount.get() > 
checkPointGap
-        && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
-      persistProgressIndex();
-    }
-    return progressIndex.get();
-  }
-
-  private synchronized void persistProgressIndex() {
-    if (lastPersistCount.get() == updateCount.get()) {
-      // in case of multiple threads calling updateProgressIndex at the same 
time
-      return;
-    }
-
-    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
-        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
-      progressIndex.get().serialize(outputStream);
-      // append is false by default.
-      FileUtils.writeByteArrayToFile(
-          progressIndexPersistFile,
-          byteArrayOutputStream.getBuf(),
-          0,
-          byteArrayOutputStream.size());
-      lastPersistCount.set(updateCount.get());
-    } catch (IOException e) {
-      LOGGER.warn("Failed to persist progress index {} for {}", 
progressIndex.get(), this, e);
-    }
-  }
-
-  public ProgressIndex restoreProgressIndex() {
-    if (!progressIndexPersistFile.exists() || 
progressIndexPersistFile.length() == 0) {
-      return progressIndex.get();
-    }
-
-    try {
-      final byte[] fileData = 
Files.readAllBytes(progressIndexPersistFile.toPath());
-
-      try (final ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(fileData);
-          final DataInputStream inputStream = new 
DataInputStream(byteArrayInputStream)) {
-        final ProgressIndex restoredIndex = 
ProgressIndexType.deserializeFrom(inputStream);
-        
progressIndex.get().updateToMinimumEqualOrIsAfterProgressIndex(restoredIndex);
-        LOGGER.info(
-            "{} successfully restored progress index from [{}], current index: 
{}",
-            this,
-            progressIndexPersistFile.getAbsolutePath(),
-            progressIndex.get());
-      }
-    } catch (final IOException e) {
-      LOGGER.warn(
-          "{} failed to restore progress index from [{}].",
-          this,
-          progressIndexPersistFile.getAbsolutePath(),
-          e);
-    }
-    return progressIndex.get();
-  }
-
-  public void cancelPersistProgressIndexFuture() {
-    if (Objects.nonNull(progressIndexPersistFile)
-        && isRegisterPersistTask.getAndSet(false)
-        && persistProgressIndexFuture != null) {
-      persistProgressIndexFuture.cancel(false);
-      persistProgressIndexFuture = null;
-    }
   }
 
   public int getLeaderNodeId() {
@@ -245,16 +125,12 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(
-      final PipeRuntimeMetaVersion version,
-      final ByteBuffer byteBuffer,
-      final int taskIndex,
-      final boolean needPersist) {
+      final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer);
 
-    final PipeTaskMeta pipeTaskMeta =
-        new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+    final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderNodeId);
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
@@ -265,17 +141,12 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(
-      final PipeRuntimeMetaVersion version,
-      final InputStream inputStream,
-      final int taskIndex,
-      final boolean needPersist)
-      throws IOException {
+      final PipeRuntimeMetaVersion version, final InputStream inputStream) 
throws IOException {
     final ProgressIndex progressIndex = 
ProgressIndexType.deserializeFrom(inputStream);
 
     final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream);
 
-    final PipeTaskMeta pipeTaskMeta =
-        new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist);
+    final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, 
leaderNodeId);
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final PipeRuntimeException pipeRuntimeException =
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index d4e6008547c..8e04baf7cc0 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -97,27 +97,20 @@ public class PipeMetaDeSerTest {
         new PipeRuntimeMeta(
             new ConcurrentHashMap<Integer, PipeTaskMeta>() {
               {
-                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987, 
123, false));
-                put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789, 
234, false));
-                put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789, 
345, false));
-                put(456, new PipeTaskMeta(finalHybridProgressIndex, 789, 456, 
false));
+                put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987));
+                put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789));
+                put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789));
+                put(456, new PipeTaskMeta(finalHybridProgressIndex, 789));
                 put(
                     567,
                     new PipeTaskMeta(
-                        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 
9)),
-                        123,
-                        567,
-                        false));
+                        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 
9)), 123));
                 put(
                     678,
                     new PipeTaskMeta(
                         new 
TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap),
-                        789,
-                        678,
-                        false));
-                put(
-                    Integer.MIN_VALUE,
-                    new PipeTaskMeta(new MetaProgressIndex(987), 0, 
Integer.MIN_VALUE, false));
+                        789));
+                put(Integer.MIN_VALUE, new PipeTaskMeta(new 
MetaProgressIndex(987), 0));
               }
             });
     ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
@@ -136,7 +129,6 @@ public class PipeMetaDeSerTest {
     Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
 
     pipeRuntimeMeta.getStatus().set(PipeStatus.DROPPED);
-    pipeRuntimeMeta.onSetPipeDroppedOrStopped();
     pipeRuntimeMeta.setIsStoppedByRuntimeException(true);
     pipeRuntimeMeta.setExceptionsClearTime(0);
     pipeRuntimeMeta


Reply via email to