This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix/clear-pipe-runtime-error-message in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4227fb55b3a34bab1fc89d89e0ae1dbb52ffd659 Author: Caideyipi <[email protected]> AuthorDate: Mon Jun 29 18:34:33 2026 +0800 Fix stale pipe runtime error messages --- .../response/pipe/task/PipeTableResp.java | 6 + .../runtime/heartbeat/PipeHeartbeatParser.java | 19 ++- .../confignode/persistence/pipe/PipeTaskInfo.java | 33 ++++-- .../impl/pipe/task/StartPipeProcedureV2.java | 37 +++++- .../runtime/heartbeat/PipeHeartbeatParserTest.java | 129 ++++++++++++++++++++- .../pipe/PipeTaskInfoAutoRestartTest.java | 28 +++++ .../commons/pipe/agent/task/PipeTaskAgent.java | 13 +++ .../pipe/agent/task/meta/PipeRuntimeMeta.java | 9 ++ .../commons/pipe/agent/task/meta/PipeTaskMeta.java | 4 + .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java | 26 +++++ 10 files changed, 283 insertions(+), 21 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index 49f10a79e8a..a2d2856e751 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -194,6 +194,9 @@ public class PipeTableResp implements DataSet { runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) { final Integer nodeId = entry.getKey(); final PipeRuntimeException e = entry.getValue(); + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); @@ -206,6 +209,9 @@ public class PipeTableResp implements DataSet { runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { final Integer regionId = entry.getKey(); for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) { + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); pipeExceptionMessage2RegionIdsMap diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 97c6795daf4..718a9fe97e4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -248,24 +248,21 @@ public class PipeHeartbeatParser { // Update runtime exception final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue(); + final PipeRuntimeMeta pipeRuntimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); pipeTaskMetaFromCoordinator.clearExceptionMessages(); for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) { - - // Do not judge the exception's clear time to avoid the restart process - // being ended after the failure of some pipe + if (exception.getTimeStamp() <= pipeRuntimeMeta.getExceptionsClearTime()) { + needPushPipeMetaToDataNodes.set(true); + continue; + } pipeTaskMetaFromCoordinator.trackExceptionMessage(exception); if (exception instanceof PipeRuntimeCriticalException) { final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName(); - if (!pipeMetaFromCoordinator - .getRuntimeMeta() - .getStatus() - .get() - .equals(PipeStatus.STOPPED)) { - PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); - runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.setIsStoppedByRuntimeException(true); + if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) { + pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED); + pipeRuntimeMeta.setIsStoppedByRuntimeException(true); needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(false); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 247f803152b..d231a613621 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -763,14 +763,26 @@ public class PipeTaskInfo implements SnapshotProcessor { public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) { acquireWriteLock(); try { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName); + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, System.currentTimeMillis()); + } finally { + releaseWriteLock(); + } + } + + public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( + final String pipeName, final long exceptionsClearTime) { + acquireWriteLock(); + try { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, exceptionsClearTime); } finally { releaseWriteLock(); } } private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - final String pipeName) { + final String pipeName, final long exceptionsClearTime) { if (!pipeMetaKeeper.containsPipeMeta(pipeName)) { return; } @@ -780,7 +792,7 @@ public class PipeTaskInfo implements SnapshotProcessor { // To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false runtimeMeta.setIsStoppedByRuntimeException(false); - runtimeMeta.setExceptionsClearTime(System.currentTimeMillis()); + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); final Map<Integer, PipeRuntimeException> exceptionMap = runtimeMeta.getNodeId2PipeRuntimeExceptionMap(); @@ -904,14 +916,17 @@ public class PipeTaskInfo implements SnapshotProcessor { */ private boolean autoRestartInternal() { final AtomicBoolean needRestart = new AtomicBoolean(false); + final long exceptionsClearTime = System.currentTimeMillis(); final List<String> pipeToRestart = new LinkedList<>(); pipeMetaKeeper .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) { - pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getIsStoppedByRuntimeException()) { + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); + runtimeMeta.getStatus().set(PipeStatus.RUNNING); needRestart.set(true); pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName()); @@ -945,9 +960,11 @@ public class PipeTaskInfo implements SnapshotProcessor { .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( - pipeMeta.getStaticMeta().getPipeName()); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING) + && runtimeMeta.getIsStoppedByRuntimeException()) { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeMeta.getStaticMeta().getPipeName(), runtimeMeta.getExceptionsClearTime()); } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 7c9bff96f8f..42ac65a74c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -20,7 +20,9 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; @@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { @@ -102,6 +106,9 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { LOGGER.info(ProcedureMessages.STARTPIPEPROCEDUREV2_EXECUTEFROMOPERATEONDATANODES, pipeName); + final long exceptionsClearTime = System.currentTimeMillis(); + final boolean isStoppedByRuntimeException = + pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); final String exceptionMessage = parsePushPipeMetaExceptionForPipe(pipeName, pushSinglePipeMetaToDataNodes(pipeName, env)); if (!exceptionMessage.isEmpty()) { @@ -114,7 +121,35 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { // Clear exceptions and set isStoppedByRuntimeException to false if the pipe is // started successfully on all data nodes - pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName); + pipeTaskInfo + .get() + .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, exceptionsClearTime); + + if (isStoppedByRuntimeException) { + writePipeMetaChangesToConfigNodeConsensus(env); + } + } + + private void writePipeMetaChangesToConfigNodeConsensus(final ConfigNodeProcedureEnv env) { + final List<PipeMeta> pipeMetaList = new ArrayList<>(); + for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { + pipeMetaList.add(pipeMeta); + } + + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new PipeHandleMetaChangePlan(pipeMetaList)); + } catch (ConsensusException e) { + LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); + } } @Override diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java index d5a46d42c84..7383bf7937a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java @@ -20,6 +20,14 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.node.NodeManager; @@ -29,13 +37,18 @@ import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordin import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.lang.reflect.Field; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -49,6 +62,8 @@ import static org.mockito.Mockito.when; public class PipeHeartbeatParserTest { + private static final int DATA_NODE_ID = 1; + private boolean originalSeparatedPipeHeartbeatEnabled; @Before @@ -117,7 +132,88 @@ public class PipeHeartbeatParserTest { verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false); } + @Test + public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "staleExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + coordinatorTaskMeta.trackExceptionMessage( + new PipeRuntimeCriticalException("stale failure", 100L)); + + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale failure", 100L)); + final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D)); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, true); + } + + @Test + public void testParseHeartbeatTracksExceptionsAfterClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "freshExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh failure", 300L)); + final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D)); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + } + private ParserTestContext createParserTestContext(final int registeredDataNodeCount) { + return createParserTestContext(registeredDataNodeCount, new PipeTaskInfo()); + } + + private ParserTestContext createParserTestContext( + final int registeredDataNodeCount, final PipeTaskInfo pipeTaskInfo) { final ConfigManager configManager = Mockito.mock(ConfigManager.class); final NodeManager nodeManager = Mockito.mock(NodeManager.class); final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class); @@ -134,7 +230,7 @@ public class PipeHeartbeatParserTest { when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator); when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator); when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter); - when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo())); + when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(pipeTaskInfo)); when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true); Mockito.doAnswer( invocation -> { @@ -147,6 +243,37 @@ public class PipeHeartbeatParserTest { return new ParserTestContext(new PipeHeartbeatParser(configManager), procedureManager); } + private void createPipe( + final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus initialStatus) { + final Map<String, String> extractorAttributes = new HashMap<>(); + extractorAttributes.put("extractor", "iotdb-source"); + final Map<String, String> processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); + final Map<String, String> connectorAttributes = new HashMap<>(); + connectorAttributes.put("connector", "iotdb-thrift-sink"); + + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>(); + pipeTasks.put(DATA_NODE_ID, pipeTaskMeta); + final PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + pipeName, + System.currentTimeMillis(), + extractorAttributes, + processorAttributes, + connectorAttributes); + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); + pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo + .getPipeMetaByPipeName(pipeName) + .getRuntimeMeta() + .getStatus() + .set(PipeStatus.RUNNING); + } + } + private void setMetaChangeFlags( final PipeHeartbeatParser parser, final boolean needWriteConsensusOnConfigNodes, diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 7b78f59253d..39f0095d00e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -231,6 +231,34 @@ public class PipeTaskInfoAutoRestartTest { rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); } + @Test + public void testHandleSuccessfulRestartClearsRuntimeExceptionMessages() { + final String pipeName = "restartPipe"; + createPipe(pipeName, PipeStatus.RUNNING); + + Assert.assertTrue( + pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName))); + + final PipeRuntimeMeta runtimeMeta = + pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta(); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + + Assert.assertTrue(pipeTaskInfo.autoRestart()); + final long exceptionsClearTime = runtimeMeta.getExceptionsClearTime(); + Assert.assertTrue( + runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values().stream() + .allMatch(exception -> exception.getTimeStamp() <= exceptionsClearTime)); + + pipeTaskInfo.handleSuccessfulRestart(); + + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + Assert.assertEquals(exceptionsClearTime, runtimeMeta.getExceptionsClearTime()); + } + private Map<Integer, TPushPipeMetaResp> createErrorRespMap(final String pipeName) { final TPushPipeMetaRespExceptionMessage exceptionMessage = new TPushPipeMetaRespExceptionMessage( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index c30a50ac495..3daeec4ebd8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -305,6 +305,8 @@ public abstract class PipeTaskAgent { } } + syncRuntimeExceptionClearTime(runtimeMetaFromCoordinator, runtimeMetaInAgent); + // 2. Handle pipe runtime meta status changes final PipeStatus statusFromCoordinator = runtimeMetaFromCoordinator.getStatus().get(); final PipeStatus statusInAgent = runtimeMetaInAgent.getStatus().get(); @@ -347,6 +349,12 @@ public abstract class PipeTaskAgent { } } + private void syncRuntimeExceptionClearTime( + final PipeRuntimeMeta runtimeMetaFromCoordinator, final PipeRuntimeMeta runtimeMetaInAgent) { + runtimeMetaInAgent.setExceptionsClearTime(runtimeMetaFromCoordinator.getExceptionsClearTime()); + runtimeMetaInAgent.clearExceptionMessagesBefore(runtimeMetaInAgent.getExceptionsClearTime()); + } + protected abstract void thawRate(final String pipeName, final long creationTime); protected abstract void freezeRate(final String pipeName, final long creationTime); @@ -548,6 +556,11 @@ public abstract class PipeTaskAgent { pipeMetaKeeper.addPipeMeta(pipeMetaFromCoordinator); + pipeMetaFromCoordinator + .getRuntimeMeta() + .clearExceptionMessagesBefore( + pipeMetaFromCoordinator.getRuntimeMeta().getExceptionsClearTime()); + // If the pipe status from coordinator is RUNNING, we will start the pipe later. return needToStartPipe; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 1f28a24dd60..2aa1b65638f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -128,6 +128,15 @@ public class PipeRuntimeMeta { } } + public void clearExceptionMessagesBefore(final long exceptionsClearTime) { + nodeId2PipeRuntimeExceptionMap + .entrySet() + .removeIf(entry -> entry.getValue().getTimeStamp() <= exceptionsClearTime); + consensusGroupId2TaskMetaMap + .values() + .forEach(pipeTaskMeta -> pipeTaskMeta.clearExceptionMessagesBefore(exceptionsClearTime)); + } + public boolean getIsStoppedByRuntimeException() { return isStoppedByRuntimeException.get(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 9584ca8cbab..e9939d7b2c6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -134,6 +134,10 @@ public class PipeTaskMeta { exceptionMessages.clear(); } + public synchronized void clearExceptionMessagesBefore(final long exceptionsClearTime) { + exceptionMessages.removeIf(exception -> exception.getTimeStamp() <= exceptionsClearTime); + } + public synchronized void serialize(final OutputStream outputStream) throws IOException { progressIndex.get().serialize(outputStream); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java index 1e29c96e090..fa0ac2c1a37 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java @@ -151,4 +151,30 @@ public class PipeMetaDeSerTest { final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer); Assert.assertEquals(pipeMeta, pipeMeta1); } + + @Test + public void testClearExceptionMessagesBeforeClearTime() { + final PipeTaskMeta staleTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + staleTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale", 100L)); + final PipeTaskMeta freshTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + freshTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh", 300L)); + + final ConcurrentHashMap<Integer, PipeTaskMeta> taskMetaMap = new ConcurrentHashMap<>(); + taskMetaMap.put(1, staleTaskMeta); + taskMetaMap.put(2, freshTaskMeta); + final PipeRuntimeMeta runtimeMeta = new PipeRuntimeMeta(taskMetaMap); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(1, new PipeRuntimeCriticalException("stale node", 100L)); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(2, new PipeRuntimeCriticalException("fresh node", 300L)); + + runtimeMeta.clearExceptionMessagesBefore(200L); + + Assert.assertFalse(staleTaskMeta.hasExceptionMessages()); + Assert.assertTrue(freshTaskMeta.hasExceptionMessages()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(1)); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(2)); + } }
