This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6405a06885fd5c420650b2b89c4c7773e29d790f Author: 陈 哲涵 <[email protected]> AuthorDate: Tue Jul 15 10:55:27 2025 +0000 [TIMECHODB] Continue to revert the local persisting logic --- .../runtime/heartbeat/PipeHeartbeatParser.java | 2 - .../confignode/persistence/pipe/PipeTaskInfo.java | 7 +- .../impl/pipe/task/AlterPipeProcedureV2.java | 10 +- .../impl/pipe/task/CreatePipeProcedureV2.java | 14 +- .../confignode/service/ConfigNodeShutdownHook.java | 3 - .../request/ConfigPhysicalPlanSerDeTest.java | 12 +- .../consensus/response/pipe/PipeTableRespTest.java | 6 +- .../agent/PipeConfigNodeSubtaskExecutorTest.java | 3 +- .../iotdb/confignode/persistence/PipeInfoTest.java | 4 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 5 +- .../iotdb/db/service/DataNodeShutdownHook.java | 3 - .../agent/runtime/PipePeriodicalJobExecutor.java | 33 ----- .../commons/pipe/agent/task/PipeTaskAgent.java | 3 - .../commons/pipe/agent/task/meta/PipeMeta.java | 6 +- .../pipe/agent/task/meta/PipeRuntimeMeta.java | 36 ++---- .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 141 +-------------------- .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 22 +--- 17 files changed, 42 insertions(+), 268 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 6f6c0b2d443..e2303fecdea 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -244,7 +244,6 @@ public class PipeHeartbeatParser { .equals(PipeStatus.STOPPED)) { PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.onSetPipeDroppedOrStopped(); runtimeMeta.setIsStoppedByRuntimeException(true); needWriteConsensusOnConfigNodes.set(true); @@ -274,7 +273,6 @@ public class PipeHeartbeatParser { exceptionMap.put(nodeId, exception); } runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.onSetPipeDroppedOrStopped(); runtimeMeta.setIsStoppedByRuntimeException(true); needWriteConsensusOnConfigNodes.set(true); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 880271f8d01..372209bfd74 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -616,11 +616,7 @@ public class PipeTaskInfo implements SnapshotProcessor { if (newLeader != -1) { consensusGroupIdToTaskMetaMap.put( consensusGroupId.getId(), - new PipeTaskMeta( - MinimumProgressIndex.INSTANCE, - newLeader, - consensusGroupId.getId(), - false)); + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, newLeader)); } // else: // "The pipe task meta does not contain the data region group {} or @@ -794,7 +790,6 @@ public class PipeTaskInfo implements SnapshotProcessor { // Mark the status of the pipe with exception as stopped runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.onSetPipeDroppedOrStopped(); runtimeMeta.setIsStoppedByRuntimeException(true); final Map<Integer, PipeRuntimeException> exceptionMap = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java index b17795afa43..b11c74408a6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java @@ -155,11 +155,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // Pipe only collect user's data, filter metric database here. updatedConsensusGroupIdToTaskMetaMap.put( regionGroupId.getId(), - new PipeTaskMeta( - currentPipeTaskMeta.getProgressIndex(), - regionLeaderNodeId, - regionGroupId.getId(), - false)); + new PipeTaskMeta(currentPipeTaskMeta.getProgressIndex(), regionLeaderNodeId)); } }); @@ -174,9 +170,7 @@ public class AlterPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { new PipeTaskMeta( configRegionTaskMeta.getProgressIndex(), // The leader of the config region is the config node itself - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - Integer.MIN_VALUE, - false)); + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); } updatedPipeRuntimeMeta = new PipeRuntimeMeta(updatedConsensusGroupIdToTaskMetaMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 3b91aa07802..813d4ebe69e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -171,9 +171,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { groupId.getId(), new PipeTaskMeta( new RecoverProgressIndex(senderDataNodeId, new SimpleProgressIndex(0, 0)), - senderDataNodeId, - groupId.getId(), - false)); + senderDataNodeId)); } else { // data regions & schema regions env.getConfigManager() @@ -189,11 +187,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // Pipe only collect user's data, filter out metric database here. consensusGroupIdToTaskMetaMap.put( regionGroupId.getId(), - new PipeTaskMeta( - MinimumProgressIndex.INSTANCE, - regionLeaderNodeId, - regionGroupId.getId(), - false)); + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, regionLeaderNodeId)); } }); @@ -206,9 +200,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { new PipeTaskMeta( MinimumProgressIndex.INSTANCE, // The leader of the config region is the config node itself - ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(), - Integer.MIN_VALUE, - false)); + ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId())); } pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java index bd12adbd804..5c3ec5af063 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNodeShutdownHook.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.confignode.client.CnToCnNodeRequestType; import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; @@ -88,8 +87,6 @@ public class ConfigNodeShutdownHook extends Thread { "Reporting ConfigNode shutdown failed. The cluster will still take the current ConfigNode as Running for a few seconds."); } } - // Shutdown pipe progressIndex background service - PipePeriodicalJobExecutor.shutdownBackgroundService(); if (LOGGER.isInfoEnabled()) { LOGGER.info( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index c09dafb1d29..de2a350571d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -849,7 +849,7 @@ public class ConfigPhysicalPlanSerDeTest { extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor"); processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor"); connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocol.ThriftTransporter"); - final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = @@ -874,7 +874,7 @@ public class ConfigPhysicalPlanSerDeTest { extractorAttributes.put("pattern", "root.db"); processorAttributes.put("processor", "do-nothing-processor"); connectorAttributes.put("batch.enable", "false"); - final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = @@ -912,7 +912,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void OperateMultiplePipesPlanV2Test() throws IOException { - final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = @@ -925,7 +925,7 @@ public class ConfigPhysicalPlanSerDeTest { PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); CreatePipePlanV2 createPipePlanV2 = new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta); - final PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2, 2, false); + final PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 2); final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new ConcurrentHashMap<>(); pipeTasks.put(2, pipeTaskMeta1); PipeStaticMeta pipeStaticMeta1 = @@ -1024,8 +1024,8 @@ public class ConfigPhysicalPlanSerDeTest { new PipeRuntimeMeta( new ConcurrentHashMap<Integer, PipeTaskMeta>() { { - put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987, 1, false)); - put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789, 1, false)); + put(456, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 987)); + put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 789)); } }); pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta)); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java index 04dea675015..94189a19d99 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java @@ -54,7 +54,7 @@ public class PipeTableRespTest { connectorAttributes.put("host", "127.0.0.1"); connectorAttributes.put("port", "6667"); - PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = @@ -74,7 +74,7 @@ public class PipeTableRespTest { connectorAttributes1.put("host", "127.0.0.1"); connectorAttributes1.put("port", "6667"); - PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + PipeTaskMeta pipeTaskMeta1 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks1 = new ConcurrentHashMap<>(); pipeTasks1.put(1, pipeTaskMeta1); PipeStaticMeta pipeStaticMeta1 = @@ -94,7 +94,7 @@ public class PipeTableRespTest { connectorAttributes2.put("host", "172.30.30.30"); connectorAttributes2.put("port", "6667"); - PipeTaskMeta pipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + PipeTaskMeta pipeTaskMeta2 = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks2 = new ConcurrentHashMap<>(); pipeTasks2.put(1, pipeTaskMeta2); PipeStaticMeta pipeStaticMeta2 = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java index a43a87b120e..f2fa5b0205a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/agent/PipeConfigNodeSubtaskExecutorTest.java @@ -62,8 +62,7 @@ public class PipeConfigNodeSubtaskExecutorTest { BuiltinPipePlugin.DO_NOTHING_CONNECTOR.getPipePluginName()); } }, - new PipeTaskMeta( - MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE, Integer.MIN_VALUE, false))); + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, Integer.MIN_VALUE))); } @After diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java index 815d5c1757e..c3e7916108f 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java @@ -86,7 +86,7 @@ public class PipeInfoTest { connectorAttributes.put("host", "127.0.0.1"); connectorAttributes.put("port", "6667"); - PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = @@ -121,7 +121,7 @@ public class PipeInfoTest { extractorAttributes.put("extractor", "org.apache.iotdb.pipe.extractor.DefaultExtractor"); processorAttributes.put("processor", "org.apache.iotdb.pipe.processor.SDTFilterProcessor"); connectorAttributes.put("connector", "org.apache.iotdb.pipe.protocol.ThriftTransporter"); - PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1, 1, false); + PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); pipeTasks.put(1, pipeTaskMeta); PipeStaticMeta pipeStaticMeta = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 41cc715eb52..c1c4893ae8a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -261,10 +261,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa pipeName = environment.getPipeName(); creationTime = environment.getCreationTime(); pipeTaskMeta = environment.getPipeTaskMeta(); - - // progressIndex is immutable in `updateToMinimumEqualOrIsAfterProgressIndex`, so data - // consistency in `environment.getPipeTaskMeta().getProgressIndex()` is ensured. - startIndex = environment.getPipeTaskMeta().restoreProgressIndex(); + startIndex = environment.getPipeTaskMeta().getProgressIndex(); dataRegionId = environment.getRegionId(); pipePattern = PipePattern.parsePipePatternFromSourceParameters(parameters); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 8dd3d39c92c..5b62a0d614c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -123,8 +122,6 @@ public class DataNodeShutdownHook extends Thread { } // Persist progress index before shutdown to accurate recovery after restart PipeDataNodeAgent.task().persistAllProgressIndex(); - // Shutdown pipe progressIndex background service - PipePeriodicalJobExecutor.shutdownBackgroundService(); // Actually stop all services started by the DataNode. // If we don't call this, services like the RestService are not stopped and I can't re-start diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java index 33ac03c5c96..3226b3947f0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java @@ -21,16 +21,8 @@ package org.apache.iotdb.commons.pipe.agent.runtime; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - /** * The shortest scheduling cycle for these jobs is {@link * PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()}, suitable for jobs that are @@ -38,31 +30,6 @@ import java.util.concurrent.TimeUnit; */ public class PipePeriodicalJobExecutor extends AbstractPipePeriodicalJobExecutor { - private static final Logger LOGGER = LoggerFactory.getLogger(PipePeriodicalJobExecutor.class); - // This background service is used to execute jobs that need to be cancelled and released. - private static final ScheduledExecutorService backgroundService = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName()); - - public static Future<?> submitBackgroundJob( - Runnable job, long initialDelayInMs, long periodInMs) { - return ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - backgroundService, job, initialDelayInMs, periodInMs, TimeUnit.MILLISECONDS); - } - - public static void shutdownBackgroundService() { - backgroundService.shutdownNow(); - try { - if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) { - LOGGER.warn("Pipe progressIndex background service did not terminate within {}s", 30); - } - } catch (InterruptedException e) { - LOGGER.warn( - "Pipe progressIndex background service is interrupted while waiting for termination"); - Thread.currentThread().interrupt(); - } - } - public PipePeriodicalJobExecutor() { super( IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 55aa6c5f17a..d1ea8807252 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -544,7 +544,6 @@ public abstract class PipeTaskAgent { // but the pipe task meta has not been cleaned up (in case of failure when executing // dropPipeTaskByConsensusGroup). existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED); - existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped(); // Drop pipe tasks final Map<Integer, PipeTask> pipeTasks = @@ -586,7 +585,6 @@ public abstract class PipeTaskAgent { // but the pipe task meta has not been cleaned up (in case of failure when executing // dropPipeTaskByConsensusGroup). existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.DROPPED); - existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped(); // Drop pipe tasks final Map<Integer, PipeTask> pipeTasks = @@ -679,7 +677,6 @@ public abstract class PipeTaskAgent { // Set pipe meta status to STOPPED existedPipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED); - existedPipeMeta.getRuntimeMeta().onSetPipeDroppedOrStopped(); } ////////////////////////// Checker ////////////////////////// diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java index c71156a234b..997278010e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java @@ -19,8 +19,6 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; - import org.apache.tsfile.utils.PublicBAOS; import java.io.DataOutputStream; @@ -83,9 +81,7 @@ public class PipeMeta { public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) { final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer); - final PipeRuntimeMeta runtimeMeta = - PipeRuntimeMeta.deserialize( - byteBuffer, PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()); + final PipeRuntimeMeta runtimeMeta = PipeRuntimeMeta.deserialize(byteBuffer); return new PipeMeta( staticMeta, runtimeMeta, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 752edae0cf0..8c22a7bfd55 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -110,10 +110,6 @@ public class PipeRuntimeMeta { return status; } - public void onSetPipeDroppedOrStopped() { - consensusGroupId2TaskMetaMap.values().forEach(PipeTaskMeta::cancelPersistProgressIndexFuture); - } - public ConcurrentMap<Integer, PipeTaskMeta> getConsensusGroupId2TaskMetaMap() { return consensusGroupId2TaskMetaMap; } @@ -200,9 +196,7 @@ public class PipeRuntimeMeta { for (int i = 0; i < size; ++i) { final int taskIndex = ReadWriteIOUtils.readInt(inputStream); pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put( - taskIndex, - PipeTaskMeta.deserialize( - PipeRuntimeMetaVersion.VERSION_1, inputStream, taskIndex, false)); + taskIndex, PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, inputStream)); } return pipeRuntimeMeta; @@ -217,9 +211,7 @@ public class PipeRuntimeMeta { for (int i = 0; i < size; ++i) { final int taskIndex = ReadWriteIOUtils.readInt(inputStream); pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put( - taskIndex, - PipeTaskMeta.deserialize( - PipeRuntimeMetaVersion.VERSION_2, inputStream, taskIndex, false)); + taskIndex, PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, inputStream)); } size = ReadWriteIOUtils.readInt(inputStream); @@ -235,20 +227,15 @@ public class PipeRuntimeMeta { return pipeRuntimeMeta; } - public static PipeRuntimeMeta deserialize(ByteBuffer byteBuffer) { - return deserialize(byteBuffer, false); - } - - public static PipeRuntimeMeta deserialize( - final ByteBuffer byteBuffer, final boolean needPersist) { + public static PipeRuntimeMeta deserialize(final ByteBuffer byteBuffer) { final byte pipeRuntimeVersionByte = ReadWriteIOUtils.readByte(byteBuffer); final PipeRuntimeMetaVersion pipeRuntimeMetaVersion = PipeRuntimeMetaVersion.deserialize(pipeRuntimeVersionByte); switch (pipeRuntimeMetaVersion) { case VERSION_1: - return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte, needPersist); + return deserializeVersion1(byteBuffer, pipeRuntimeVersionByte); case VERSION_2: - return deserializeVersion2(byteBuffer, needPersist); + return deserializeVersion2(byteBuffer); default: throw new UnsupportedOperationException( "Unknown pipe runtime meta version: " + pipeRuntimeMetaVersion.getVersion()); @@ -256,7 +243,7 @@ public class PipeRuntimeMeta { } private static PipeRuntimeMeta deserializeVersion1( - ByteBuffer byteBuffer, byte pipeRuntimeVersionByte, final boolean needPersist) { + ByteBuffer byteBuffer, byte pipeRuntimeVersionByte) { final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(); pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(pipeRuntimeVersionByte)); @@ -265,16 +252,13 @@ public class PipeRuntimeMeta { for (int i = 0; i < size; ++i) { final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer); pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put( - taskIndex, - PipeTaskMeta.deserialize( - PipeRuntimeMetaVersion.VERSION_1, byteBuffer, taskIndex, needPersist)); + taskIndex, PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_1, byteBuffer)); } return pipeRuntimeMeta; } - public static PipeRuntimeMeta deserializeVersion2( - ByteBuffer byteBuffer, final boolean needPersist) { + public static PipeRuntimeMeta deserializeVersion2(ByteBuffer byteBuffer) { final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(); pipeRuntimeMeta.status.set(PipeStatus.getPipeStatus(ReadWriteIOUtils.readByte(byteBuffer))); @@ -283,9 +267,7 @@ public class PipeRuntimeMeta { for (int i = 0; i < size; ++i) { final int taskIndex = ReadWriteIOUtils.readInt(byteBuffer); pipeRuntimeMeta.consensusGroupId2TaskMetaMap.put( - taskIndex, - PipeTaskMeta.deserialize( - PipeRuntimeMetaVersion.VERSION_2, byteBuffer, taskIndex, needPersist)); + taskIndex, PipeTaskMeta.deserialize(PipeRuntimeMetaVersion.VERSION_2, byteBuffer)); } size = ReadWriteIOUtils.readInt(byteBuffer); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 6a4ab25db7e..638d879bfa7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -19,7 +19,6 @@ package org.apache.iotdb.commons.pipe.agent.task.meta; -import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException; @@ -27,51 +26,30 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; -import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.commons.io.FileUtils; -import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class PipeTaskMeta { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTaskMeta.class); - private static final String PREFIX = "__progressIndex_"; private final AtomicReference<ProgressIndex> progressIndex = new AtomicReference<>(); private final AtomicInteger leaderNodeId = new AtomicInteger(0); - private final AtomicLong updateCount = new AtomicLong(0); - private final AtomicLong lastPersistCount = new AtomicLong(0); - private final long checkPointGap = - PipeConfig.getInstance().getPipeProgressIndexPersistCheckPointGap(); - private File progressIndexPersistFile; - private final AtomicBoolean isRegisterPersistTask = new AtomicBoolean(false); - private Future<?> persistProgressIndexFuture; - /** * Stores the exceptions encountered during run time of each pipe task. * @@ -84,26 +62,9 @@ public class PipeTaskMeta { private final Set<PipeRuntimeException> exceptionMessages = Collections.newSetFromMap(new ConcurrentHashMap<>()); - public PipeTaskMeta( - /* @NotNull */ final ProgressIndex progressIndex, - final int leaderNodeId, - final int taskIndex, - final boolean needPersistProgressIndex) { + public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final int leaderNodeId) { this.progressIndex.set(progressIndex); this.leaderNodeId.set(leaderNodeId); - // PipeTaskMeta created in configNode doesn't need to persist progress index. - if (needPersistProgressIndex) { - this.progressIndexPersistFile = - new File( - IoTDBConstant.DN_DEFAULT_DATA_DIR - + File.separator - + IoTDBConstant.SYSTEM_FOLDER_NAME - + File.separator - + PipeConfig.getInstance().getPipeHardlinkBaseDirName() - + File.separator - + PipeConfig.getInstance().getPipeProgressIndexPersistDirName(), - PREFIX + taskIndex); - } } public ProgressIndex getProgressIndex() { @@ -111,89 +72,8 @@ public class PipeTaskMeta { } public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) { - // only pipeTaskMeta that need to updateProgressIndex will persist progress index - // isRegisterPersistTask is used to avoid multiple threads registering persist task concurrently - if (Objects.nonNull(progressIndexPersistFile) - && !isRegisterPersistTask.getAndSet(true) - && this.persistProgressIndexFuture == null - && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { - this.persistProgressIndexFuture = - PipePeriodicalJobExecutor.submitBackgroundJob( - () -> { - if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { - persistProgressIndex(); - } - }, - 0, - PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs()); - } - - progressIndex.updateAndGet( + return progressIndex.updateAndGet( index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex)); - if (Objects.nonNull(progressIndexPersistFile) - && updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap - && PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) { - persistProgressIndex(); - } - return progressIndex.get(); - } - - private synchronized void persistProgressIndex() { - if (lastPersistCount.get() == updateCount.get()) { - // in case of multiple threads calling updateProgressIndex at the same time - return; - } - - try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); - final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - progressIndex.get().serialize(outputStream); - // append is false by default. - FileUtils.writeByteArrayToFile( - progressIndexPersistFile, - byteArrayOutputStream.getBuf(), - 0, - byteArrayOutputStream.size()); - lastPersistCount.set(updateCount.get()); - } catch (IOException e) { - LOGGER.warn("Failed to persist progress index {} for {}", progressIndex.get(), this, e); - } - } - - public ProgressIndex restoreProgressIndex() { - if (!progressIndexPersistFile.exists() || progressIndexPersistFile.length() == 0) { - return progressIndex.get(); - } - - try { - final byte[] fileData = Files.readAllBytes(progressIndexPersistFile.toPath()); - - try (final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(fileData); - final DataInputStream inputStream = new DataInputStream(byteArrayInputStream)) { - final ProgressIndex restoredIndex = ProgressIndexType.deserializeFrom(inputStream); - progressIndex.get().updateToMinimumEqualOrIsAfterProgressIndex(restoredIndex); - LOGGER.info( - "{} successfully restored progress index from [{}], current index: {}", - this, - progressIndexPersistFile.getAbsolutePath(), - progressIndex.get()); - } - } catch (final IOException e) { - LOGGER.warn( - "{} failed to restore progress index from [{}].", - this, - progressIndexPersistFile.getAbsolutePath(), - e); - } - return progressIndex.get(); - } - - public void cancelPersistProgressIndexFuture() { - if (Objects.nonNull(progressIndexPersistFile) - && isRegisterPersistTask.getAndSet(false) - && persistProgressIndexFuture != null) { - persistProgressIndexFuture.cancel(false); - persistProgressIndexFuture = null; - } } public int getLeaderNodeId() { @@ -245,16 +125,12 @@ public class PipeTaskMeta { } public static PipeTaskMeta deserialize( - final PipeRuntimeMetaVersion version, - final ByteBuffer byteBuffer, - final int taskIndex, - final boolean needPersist) { + final PipeRuntimeMetaVersion version, final ByteBuffer byteBuffer) { final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(byteBuffer); final int leaderNodeId = ReadWriteIOUtils.readInt(byteBuffer); - final PipeTaskMeta pipeTaskMeta = - new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist); + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, leaderNodeId); final int size = ReadWriteIOUtils.readInt(byteBuffer); for (int i = 0; i < size; ++i) { final PipeRuntimeException pipeRuntimeException = @@ -265,17 +141,12 @@ public class PipeTaskMeta { } public static PipeTaskMeta deserialize( - final PipeRuntimeMetaVersion version, - final InputStream inputStream, - final int taskIndex, - final boolean needPersist) - throws IOException { + final PipeRuntimeMetaVersion version, final InputStream inputStream) throws IOException { final ProgressIndex progressIndex = ProgressIndexType.deserializeFrom(inputStream); final int leaderNodeId = ReadWriteIOUtils.readInt(inputStream); - final PipeTaskMeta pipeTaskMeta = - new PipeTaskMeta(progressIndex, leaderNodeId, taskIndex, needPersist); + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(progressIndex, leaderNodeId); final int size = ReadWriteIOUtils.readInt(inputStream); for (int i = 0; i < size; ++i) { final PipeRuntimeException pipeRuntimeException = diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java index d4e6008547c..8e04baf7cc0 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java @@ -97,27 +97,20 @@ public class PipeMetaDeSerTest { new PipeRuntimeMeta( new ConcurrentHashMap<Integer, PipeTaskMeta>() { { - put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987, 123, false)); - put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789, 234, false)); - put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789, 345, false)); - put(456, new PipeTaskMeta(finalHybridProgressIndex, 789, 456, false)); + put(123, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 987)); + put(234, new PipeTaskMeta(new IoTProgressIndex(1, 2L), 789)); + put(345, new PipeTaskMeta(new SimpleProgressIndex(3, 4), 789)); + put(456, new PipeTaskMeta(finalHybridProgressIndex, 789)); put( 567, new PipeTaskMeta( - new RecoverProgressIndex(1, new SimpleProgressIndex(1, 9)), - 123, - 567, - false)); + new RecoverProgressIndex(1, new SimpleProgressIndex(1, 9)), 123)); put( 678, new PipeTaskMeta( new TimeWindowStateProgressIndex(timeSeries2TimestampWindowBufferPairMap), - 789, - 678, - false)); - put( - Integer.MIN_VALUE, - new PipeTaskMeta(new MetaProgressIndex(987), 0, Integer.MIN_VALUE, false)); + 789)); + put(Integer.MIN_VALUE, new PipeTaskMeta(new MetaProgressIndex(987), 0)); } }); ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize(); @@ -136,7 +129,6 @@ public class PipeMetaDeSerTest { Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1); pipeRuntimeMeta.getStatus().set(PipeStatus.DROPPED); - pipeRuntimeMeta.onSetPipeDroppedOrStopped(); pipeRuntimeMeta.setIsStoppedByRuntimeException(true); pipeRuntimeMeta.setExceptionsClearTime(0); pipeRuntimeMeta
