This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch potential-leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3167d75f51b61eaf707602253572903c5ff07b92 Author: Caideyipi <[email protected]> AuthorDate: Wed May 13 18:54:58 2026 +0800 Fix --- .../schemaregion/SchemaExecutionVisitor.java | 20 +++-- .../schemaregion/SchemaRegionStateMachine.java | 46 ++++++++---- .../agent/runtime/PipeDataNodeRuntimeAgent.java | 12 +++ .../runtime/PipeSchemaRegionListenerManager.java | 85 ++++++++++++---------- .../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 15 +++- .../processor/aggregate/AggregateProcessor.java | 19 +++-- .../thrift/async/IoTDBDataRegionAsyncSink.java | 1 + .../thrift/sync/IoTDBDataRegionSyncSink.java | 1 + .../thrift/impl/DataNodeRegionManager.java | 8 +- .../PipeSchemaRegionListenerManagerTest.java | 72 ++++++++++++++++++ .../task/progress/PipeEventCommitManager.java | 21 ++++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 2 +- 12 files changed, 229 insertions(+), 73 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index f3ddfbab7b2..2fc2694c4c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -843,13 +843,19 @@ public class SchemaExecutionVisitor implements PlanVisitor<TSStatus, ISchemaRegi public TSStatus visitPipeOperateSchemaQueueNode( final PipeOperateSchemaQueueNode node, final ISchemaRegion schemaRegion) { final SchemaRegionId id = schemaRegion.getSchemaRegionId(); - final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id); - if (node.isOpen() && !queue.isOpened()) { - logger.info("Opened pipe listening queue on schema region {}", id); - queue.open(); - } else if (!node.isOpen() && queue.isOpened()) { - logger.info("Closed pipe listening queue on schema region {}", id); - queue.close(); + if (node.isOpen()) { + final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListener(id); + if (!queue.isOpened()) { + logger.info("Opened pipe listening queue on schema region {}", id); + queue.open(); + } + } else { + final SchemaRegionListeningQueue queue = PipeDataNodeAgent.runtime().schemaListenerIfPresent(id); + if (queue != null && queue.isOpened()) { + logger.info("Closed pipe listening queue on schema region {}", id); + queue.close(); + PipeDataNodeAgent.runtime().cleanupSchemaListenerIfUnused(id); + } } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index edab5862d11..d904da7df31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -125,25 +125,34 @@ public class SchemaRegionStateMachine extends BaseStateMachine { @Override public boolean takeSnapshot(final File snapshotDir) { - if (schemaRegion.createSnapshot(snapshotDir) - && PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .createSnapshot(snapshotDir)) { - listen2Snapshot4PipeListener(true); + if (!schemaRegion.createSnapshot(snapshotDir)) { + return false; + } + + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener == null) { return true; } - return false; + + if (!listener.createSnapshot(snapshotDir)) { + return false; + } + listen2Snapshot4PipeListener(true); + return true; } @Override public void loadSnapshot(final File latestSnapshotRootDir) { schemaRegion.loadSnapshot(latestSnapshotRootDir); - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .loadSnapshot(latestSnapshotRootDir); - // We recompute the snapshot for pipe listener when loading snapshot - // to recover the newest snapshot in cache - listen2Snapshot4PipeListener(false); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener != null) { + listener.loadSnapshot(latestSnapshotRootDir); + // We recompute the snapshot for pipe listener when loading snapshot + // to recover the newest snapshot in cache + listen2Snapshot4PipeListener(false); + } } public void listen2Snapshot4PipeListener(final boolean isTmp) { @@ -154,7 +163,10 @@ public class SchemaRegionStateMachine extends BaseStateMachine { .toString(), isTmp); final SchemaRegionListeningQueue listener = - PipeDataNodeAgent.runtime().schemaListener(schemaRegion.getSchemaRegionId()); + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener == null) { + return; + } if (Objects.isNull(snapshotPaths) || Objects.isNull(snapshotPaths.get(0))) { if (listener.isOpened()) { logger.warn( @@ -181,9 +193,11 @@ public class SchemaRegionStateMachine extends BaseStateMachine { final TSStatus result = ((PlanNode) request).accept(new SchemaExecutionVisitor(), schemaRegion); if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .tryListenToNode((PlanNode) request); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegion.getSchemaRegionId()); + if (listener != null) { + listener.tryListenToNode((PlanNode) request); + } } return result; } catch (final IllegalArgumentException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 8f75b7b1885..3d3547f649b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -148,6 +148,10 @@ public class PipeDataNodeRuntimeAgent implements IService { return regionListenerManager.listener(schemaRegionId); } + public SchemaRegionListeningQueue schemaListenerIfPresent(SchemaRegionId schemaRegionId) { + return regionListenerManager.listenerIfPresent(schemaRegionId); + } + public int increaseAndGetSchemaListenerReferenceCount(SchemaRegionId schemaRegionId) { return regionListenerManager.increaseAndGetReferenceCount(schemaRegionId); } @@ -156,6 +160,14 @@ public class PipeDataNodeRuntimeAgent implements IService { return regionListenerManager.decreaseAndGetReferenceCount(schemaRegionId); } + public void cleanupSchemaListenerIfUnused(SchemaRegionId schemaRegionId) { + regionListenerManager.cleanupListenerIfUnused(schemaRegionId); + } + + public void clearSchemaRegionState(SchemaRegionId schemaRegionId) { + regionListenerManager.clearSchemaRegionState(schemaRegionId); + } + public void notifySchemaLeaderReady(SchemaRegionId schemaRegionId) { regionListenerManager.notifyLeaderReady(schemaRegionId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java index d28929b36a2..3b03146256b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManager.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.pipe.agent.runtime; import org.apache.iotdb.commons.consensus.SchemaRegionId; -import org.apache.iotdb.commons.pipe.agent.task.PipeTask; import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionListenerMetrics; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; @@ -34,6 +33,7 @@ public class PipeSchemaRegionListenerManager { private final Map<SchemaRegionId, PipeSchemaRegionListener> id2ListenerMap = new ConcurrentHashMap<>(); + private final Map<SchemaRegionId, AtomicBoolean> id2LeaderReadyMap = new ConcurrentHashMap<>(); public synchronized Set<SchemaRegionId> regionIds() { return id2ListenerMap.keySet(); @@ -44,6 +44,12 @@ public class PipeSchemaRegionListenerManager { .listeningQueue; } + public synchronized SchemaRegionListeningQueue listenerIfPresent( + final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + return listener == null ? null : listener.listeningQueue; + } + public synchronized int increaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) { return id2ListenerMap .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) @@ -52,28 +58,55 @@ public class PipeSchemaRegionListenerManager { } public synchronized int decreaseAndGetReferenceCount(final SchemaRegionId schemaRegionId) { - return id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .listeningQueueReferenceCount - .updateAndGet(v -> v > 0 ? v - 1 : 0); + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + if (listener == null) { + return 0; + } + + final int referenceCount = + listener.listeningQueueReferenceCount.updateAndGet(v -> v > 0 ? v - 1 : 0); + if (referenceCount == 0 && !listener.listeningQueue.isOpened()) { + cleanupListenerIfUnused(schemaRegionId, listener); + } + return referenceCount; } public synchronized void notifyLeaderReady(final SchemaRegionId schemaRegionId) { - id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .notifyLeaderReady(); + id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(true); } public synchronized void notifyLeaderUnavailable(final SchemaRegionId schemaRegionId) { - id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .notifyLeaderUnavailable(); + id2LeaderReadyMap.computeIfAbsent(schemaRegionId, id -> new AtomicBoolean()).set(false); } public synchronized boolean isLeaderReady(final SchemaRegionId schemaRegionId) { - return id2ListenerMap - .computeIfAbsent(schemaRegionId, PipeSchemaRegionListener::new) - .isLeaderReady(); + final AtomicBoolean isLeaderReady = id2LeaderReadyMap.get(schemaRegionId); + return isLeaderReady != null && isLeaderReady.get(); + } + + public synchronized void cleanupListenerIfUnused(final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.get(schemaRegionId); + if (listener != null) { + cleanupListenerIfUnused(schemaRegionId, listener); + } + } + + public synchronized void clearSchemaRegionState(final SchemaRegionId schemaRegionId) { + final PipeSchemaRegionListener listener = id2ListenerMap.remove(schemaRegionId); + if (listener != null) { + PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId()); + } + id2LeaderReadyMap.remove(schemaRegionId); + } + + private void cleanupListenerIfUnused( + final SchemaRegionId schemaRegionId, final PipeSchemaRegionListener listener) { + if (listener.listeningQueueReferenceCount.get() > 0 || listener.listeningQueue.isOpened()) { + return; + } + if (id2ListenerMap.remove(schemaRegionId, listener)) { + PipeSchemaRegionListenerMetrics.getInstance().deregister(schemaRegionId.getId()); + } } private static class PipeSchemaRegionListener { @@ -81,33 +114,9 @@ public class PipeSchemaRegionListenerManager { private final SchemaRegionListeningQueue listeningQueue = new SchemaRegionListeningQueue(); private final AtomicInteger listeningQueueReferenceCount = new AtomicInteger(0); - private final AtomicBoolean isLeaderReady = new AtomicBoolean(false); - protected PipeSchemaRegionListener(final SchemaRegionId schemaRegionId) { PipeSchemaRegionListenerMetrics.getInstance() .register(listeningQueue, schemaRegionId.getId()); } - - /** - * Get leader ready state, DO NOT use consensus layer's leader ready flag because - * SimpleConsensus' ready flag is always {@code true}. Note that this flag has nothing to do - * with listening and a {@link PipeTask} starts only iff the current node is a leader and ready. - * - * @return {@code true} iff the current node is a leader and ready - */ - private boolean isLeaderReady() { - return isLeaderReady.get(); - } - - // Leader ready flag has the following effect - // 1. The linked list starts serving only after leader gets ready - // 2. Config pipe task is only created after leader gets ready - private void notifyLeaderReady() { - isLeaderReady.set(true); - } - - private void notifyLeaderUnavailable() { - isLeaderReady.set(false); - } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 06d6a512b30..fd57495b14c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -39,6 +39,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; +import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; @@ -59,6 +60,7 @@ import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningFilter; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; @@ -267,10 +269,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { } schemaRegionId2ListeningQueueNewFirstIndex.forEach( - (schemaRegionId, listeningQueueNewFirstIndex) -> - PipeDataNodeAgent.runtime() - .schemaListener(new SchemaRegionId(schemaRegionId)) - .removeBefore(listeningQueueNewFirstIndex)); + (schemaRegionId, listeningQueueNewFirstIndex) -> { + final SchemaRegionListeningQueue listeningQueue = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(new SchemaRegionId(schemaRegionId)); + if (listeningQueue != null) { + listeningQueue.removeBefore(listeningQueueNewFirstIndex); + } + }); return schemaRegionId2ListeningQueueNewFirstIndex.keySet(); } @@ -323,6 +328,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); + PipeEventCommitManager.getInstance().clear(pipeName, creationTime); return true; } @@ -351,6 +357,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent { final String taskId = pipeName + "_" + creationTime; PipeTsFileToTabletsMetrics.getInstance().deregister(taskId); PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId); + PipeEventCommitManager.getInstance().clear(pipeName, creationTime); // When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the // subscribed pipe, so the subscription needs to be manually marked as completed. if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 63bdbd1daec..24c930a04a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -842,12 +842,19 @@ public class AggregateProcessor implements PipeProcessor { @Override public void close() throws Exception { - if (Objects.nonNull(pipeName) - && pipeName2referenceCountMap.compute( - pipeName, (name, count) -> Objects.nonNull(count) ? count - 1 : 0) - == 0) { - pipeName2timeSeries2TimeSeriesRuntimeStateMap.get(pipeName).clear(); - pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName); + boolean isLastReference = false; + if (Objects.nonNull(pipeName)) { + isLastReference = + pipeName2referenceCountMap.computeIfPresent( + pipeName, (name, count) -> count > 1 ? count - 1 : null) + == null; + } + if (isLastReference) { + final ConcurrentMap<String, AtomicReference<TimeSeriesRuntimeState>> + timeSeries2RuntimeStateMap = pipeName2timeSeries2TimeSeriesRuntimeStateMap.remove(pipeName); + if (timeSeries2RuntimeStateMap != null) { + timeSeries2RuntimeStateMap.clear(); + } pipeName2LastValueReceiveTimeMap.remove(pipeName); } if (Objects.nonNull(windowingProcessor)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 554c6c43dfb..4030ae580c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -743,6 +743,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index e8c4420861c..b255e67404e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -601,6 +601,7 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { @Override public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); if (Objects.nonNull(tabletBatchBuilder)) { tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index 4fc2cfbff9c..1cd49963240 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.DataRegionException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.schemaengine.SchemaEngine; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.rpc.RpcUtils; @@ -203,7 +204,12 @@ public class DataNodeRegionManager { public TSStatus deleteSchemaRegion(SchemaRegionId schemaRegionId) { try { schemaEngine.deleteSchemaRegion(schemaRegionId); - PipeDataNodeAgent.runtime().schemaListener(schemaRegionId).close(); + final SchemaRegionListeningQueue listener = + PipeDataNodeAgent.runtime().schemaListenerIfPresent(schemaRegionId); + if (listener != null) { + listener.close(); + } + PipeDataNodeAgent.runtime().clearSchemaRegionState(schemaRegionId); schemaRegionLockMap.remove(schemaRegionId); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); } catch (MetadataException e) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java new file mode 100644 index 00000000000..3e08d23978b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/runtime/PipeSchemaRegionListenerManagerTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.runtime; + +import org.apache.iotdb.commons.consensus.SchemaRegionId; +import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; + +import org.junit.Assert; +import org.junit.Test; + +public class PipeSchemaRegionListenerManagerTest { + + @Test + public void testLeaderStateDoesNotCreateListener() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(1); + + manager.notifyLeaderReady(schemaRegionId); + + Assert.assertTrue(manager.isLeaderReady(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + } + + @Test + public void testCleanupUnusedClosedListener() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(2); + + final SchemaRegionListeningQueue listeningQueue = manager.listener(schemaRegionId); + Assert.assertEquals(1, manager.increaseAndGetReferenceCount(schemaRegionId)); + + listeningQueue.open(); + Assert.assertEquals(0, manager.decreaseAndGetReferenceCount(schemaRegionId)); + Assert.assertNotNull(manager.listenerIfPresent(schemaRegionId)); + + listeningQueue.close(); + manager.cleanupListenerIfUnused(schemaRegionId); + + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + } + + @Test + public void testAutoCleanupAfterLastReferenceReleased() { + final PipeSchemaRegionListenerManager manager = new PipeSchemaRegionListenerManager(); + final SchemaRegionId schemaRegionId = new SchemaRegionId(3); + + manager.increaseAndGetReferenceCount(schemaRegionId); + + Assert.assertEquals(0, manager.decreaseAndGetReferenceCount(schemaRegionId)); + Assert.assertNull(manager.listenerIfPresent(schemaRegionId)); + Assert.assertTrue(manager.regionIds().isEmpty()); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java index b47441753c4..82ee87f6c8d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java @@ -78,6 +78,13 @@ public class PipeEventCommitManager { LOGGER.info("Pipe committer deregistered for pipe on region: {}", committerKey); } + public void clear(final String pipeName, final long creationTime) { + eventCommitterMap.keySet().removeIf(key -> clearIfMatches(key, pipeName, creationTime, true)); + eventCommitterRestartTimesMap + .keySet() + .removeIf(key -> clearIfMatches(key, pipeName, creationTime, false)); + } + /** * Assign a commit id and a key for commit. Make sure {@code EnrichedEvent.pipeName} is set before * calling this. @@ -197,6 +204,20 @@ public class PipeEventCommitManager { this.commitRateMarker = commitRateMarker; } + private boolean clearIfMatches( + final CommitterKey key, + final String pipeName, + final long creationTime, + final boolean shouldClearMetrics) { + if (!Objects.equals(key.getPipeName(), pipeName) || key.getCreationTime() != creationTime) { + return false; + } + if (shouldClearMetrics) { + PipeEventCommitMetrics.getInstance().deregister(key.stringify()); + } + return true; + } + //////////////////////////// singleton //////////////////////////// private PipeEventCommitManager() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index ebb3d90bc8c..a7e603bd9bc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -643,7 +643,7 @@ public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEvent */ public synchronized void discardEventsOfPipe( final String pipeName, final long creationTime, final int regionId) { - // Do nothing by default + PIPE_END_POINT_RATE_LIMITER_MAP.remove(new Pair<>(pipeName, creationTime)); } public PipeReceiverStatusHandler statusHandler() {
