This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d5579ad43ccd20d4d6ba18fbee6d6677990616c7 Author: V_Galaxy <[email protected]> AuthorDate: Fri Aug 2 18:08:29 2024 +0800 Subscription: avoid incomplete resource release of prefetching queue caused by restarting subscription pipe (#13079) (cherry picked from commit 83f8db0a8b3763dc02b4bcae76f3a9e10439c9c1) --- .../agent/SubscriptionBrokerAgent.java | 15 ++- .../agent/SubscriptionConsumerAgent.java | 4 +- .../db/subscription/broker/SubscriptionBroker.java | 102 +++++++++++++++------ .../broker/SubscriptionPrefetchingQueue.java | 69 +++++--------- .../broker/SubscriptionPrefetchingTabletQueue.java | 6 +- .../broker/SubscriptionPrefetchingTsFileQueue.java | 24 ++++- .../SubscriptionConnectorSubtaskLifeCycle.java | 2 +- .../iotdb/commons/pipe/event/EnrichedEvent.java | 19 +++- 8 files changed, 152 insertions(+), 89 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 538004a3ed8..43b16cf2f4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -158,15 +158,24 @@ public class SubscriptionBrokerAgent { broker.bindPrefetchingQueue(subtask.getTopicName(), subtask.getInputPendingQueue()); } - public void unbindPrefetchingQueue( - final String consumerGroupId, final String topicName, final boolean doRemove) { + public void unbindPrefetchingQueue(final String consumerGroupId, final String topicName) { final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); if (Objects.isNull(broker)) { LOGGER.warn( "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); return; } - broker.unbindPrefetchingQueue(topicName, doRemove); + broker.unbindPrefetchingQueue(topicName); + } + + public void removePrefetchingQueue(final String consumerGroupId, final String topicName) { + final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); + if (Objects.isNull(broker)) { + LOGGER.warn( + "Subscription: broker bound to consumer group [{}] does not exist", consumerGroupId); + return; + } + broker.removePrefetchingQueue(topicName); } public void executePrefetch(final String consumerGroupId, final String topicName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java index eee14943df0..075098f8f93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java @@ -117,11 +117,11 @@ public class SubscriptionConsumerAgent { return; } - // unbind and remove prefetching queue + // remove prefetching queue final Set<String> topicsUnsubByGroup = ConsumerGroupMeta.getTopicsUnsubByGroup(metaInAgent, metaFromCoordinator); for (final String topicName : topicsUnsubByGroup) { - SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, topicName, true); + SubscriptionAgent.broker().removePrefetchingQueue(consumerGroupId, topicName); } // TODO: Currently we fully replace the entire ConsumerGroupMeta without carefully checking the diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 43acf12f180..acb4882a4e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -20,13 +20,19 @@ package org.apache.iotdb.db.subscription.broker; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.rpc.subscription.config.TopicConstant; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; +import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; +import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +44,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; public class SubscriptionBroker { @@ -46,14 +55,22 @@ public class SubscriptionBroker { private final String brokerId; // consumer group id private final Map<String, SubscriptionPrefetchingQueue> topicNameToPrefetchingQueue; + private final Map<String, String> completedTopicNames; + + // The subscription pipe that was restarted should reuse the previous commit ID. + private final Map<String, AtomicLong> topicNameToCommitIdGenerator; public SubscriptionBroker(final String brokerId) { this.brokerId = brokerId; this.topicNameToPrefetchingQueue = new ConcurrentHashMap<>(); + this.completedTopicNames = new ConcurrentHashMap<>(); + this.topicNameToCommitIdGenerator = new ConcurrentHashMap<>(); } public boolean isEmpty() { - return topicNameToPrefetchingQueue.isEmpty(); + return topicNameToPrefetchingQueue.isEmpty() + && completedTopicNames.isEmpty() + && topicNameToCommitIdGenerator.isEmpty(); } //////////////////////////// provided for SubscriptionBrokerAgent //////////////////////////// @@ -64,21 +81,32 @@ public class SubscriptionBroker { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { + // check if completed + if (completedTopicNames.containsKey(topicName)) { + LOGGER.info( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client", + topicName, + brokerId); + events.add( + new SubscriptionEvent( + new SubscriptionPipeEmptyEvent(), + new SubscriptionPollResponse( + SubscriptionPollResponseType.TERMINATION.getType(), + new TerminationPayload(), + new SubscriptionCommitContext( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), + PipeDataNodeAgent.runtime().getRebootTimes(), + topicName, + brokerId, + INVALID_COMMIT_ID)))); + continue; + } // There are two reasons for not printing logs here: // 1. There will be a delay in the creation of the prefetching queue after subscription. // 2. There is no corresponding prefetching queue on this DN (currently the consumer is // fully connected to all DNs). continue; } - // check if completed before closed - if (prefetchingQueue.isCompleted()) { - LOGGER.info( - "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, return termination response to client", - topicName, - brokerId); - events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse()); - continue; - } if (prefetchingQueue.isClosed()) { LOGGER.warn( "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is closed", @@ -186,11 +214,17 @@ public class SubscriptionBroker { if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) { prefetchingQueue = new SubscriptionPrefetchingTsFileQueue( - brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue)); + brokerId, + topicName, + new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue), + topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) -> new AtomicLong())); } else { prefetchingQueue = new SubscriptionPrefetchingTabletQueue( - brokerId, topicName, new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue)); + brokerId, + topicName, + new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue), + topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) -> new AtomicLong())); } SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue); topicNameToPrefetchingQueue.put(topicName, prefetchingQueue); @@ -200,7 +234,7 @@ public class SubscriptionBroker { brokerId); } - public void unbindPrefetchingQueue(final String topicName, final boolean doRemove) { + public void unbindPrefetchingQueue(final String topicName) { final SubscriptionPrefetchingQueue prefetchingQueue = topicNameToPrefetchingQueue.get(topicName); if (Objects.isNull(prefetchingQueue)) { @@ -214,28 +248,40 @@ public class SubscriptionBroker { // mark prefetching queue closed first prefetchingQueue.markClosed(); - // mark prefetching queue completed only for topic of snapshot mode - if (SubscriptionAgent.topic() - .getTopicMode(topicName) - .equals(TopicConstant.MODE_SNAPSHOT_VALUE)) { - prefetchingQueue.markCompleted(); + // mark topic name completed only for topic of snapshot mode + if (SubscriptionAgent.topic().getTopicMode(topicName).equals(TopicConstant.MODE_SNAPSHOT_VALUE) + && prefetchingQueue.isCompleted()) { + completedTopicNames.put(topicName, topicName); } - if (doRemove) { - // clean up events in prefetching queue - prefetchingQueue.cleanup(); + // clean up events in prefetching queue + prefetchingQueue.cleanup(); - // deregister metrics - SubscriptionPrefetchingQueueMetrics.getInstance() - .deregister(prefetchingQueue.getPrefetchingQueueId()); + // deregister metrics + SubscriptionPrefetchingQueueMetrics.getInstance() + .deregister(prefetchingQueue.getPrefetchingQueueId()); - // remove prefetching queue - topicNameToPrefetchingQueue.remove(topicName); - LOGGER.info( - "Subscription: drop prefetching queue bound to topic [{}] for consumer group [{}]", + // remove prefetching queue + topicNameToPrefetchingQueue.remove(topicName); + LOGGER.info( + "Subscription: drop prefetching queue bound to topic [{}] for consumer group [{}]", + topicName, + brokerId); + } + + public void removePrefetchingQueue(final String topicName) { + final SubscriptionPrefetchingQueue prefetchingQueue = + topicNameToPrefetchingQueue.get(topicName); + if (Objects.nonNull(prefetchingQueue)) { + LOGGER.warn( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] still exists", topicName, brokerId); + return; } + + completedTopicNames.remove(topicName); + topicNameToCommitIdGenerator.remove(topicName); } public void executePrefetch(final String topicName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 749bcf13530..d4e441eb0fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -27,14 +27,9 @@ import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; -import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; -import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse; -import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType; -import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,19 +42,18 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; - public abstract class SubscriptionPrefetchingQueue { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPrefetchingQueue.class); protected final String brokerId; // consumer group id protected final String topicName; + protected final SubscriptionBlockingPendingQueue inputPendingQueue; protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue; - protected final Map<SubscriptionCommitContext, SubscriptionEvent> uncommittedEvents; - private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0); + + private final AtomicLong commitIdGenerator; private volatile boolean isCompleted = false; private volatile boolean isClosed = false; @@ -67,13 +61,15 @@ public abstract class SubscriptionPrefetchingQueue { public SubscriptionPrefetchingQueue( final String brokerId, final String topicName, - final SubscriptionBlockingPendingQueue inputPendingQueue) { + final SubscriptionBlockingPendingQueue inputPendingQueue, + final AtomicLong commitIdGenerator) { this.brokerId = brokerId; this.topicName = topicName; this.inputPendingQueue = inputPendingQueue; this.prefetchingQueue = new LinkedBlockingQueue<>(); this.uncommittedEvents = new ConcurrentHashMap<>(); + this.commitIdGenerator = commitIdGenerator; } public void cleanup() { @@ -170,13 +166,20 @@ public abstract class SubscriptionPrefetchingQueue { } if (event instanceof PipeTerminateEvent) { - LOGGER.info( - "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", - this, - event); + final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event; + // add mark completed hook + terminateEvent.addOnCommittedHook( + () -> { + markCompleted(); + return null; + }); // commit directly ((PipeTerminateEvent) event) .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true); + LOGGER.info( + "Subscription: SubscriptionPrefetchingQueue {} commit PipeTerminateEvent {}", + this, + terminateEvent); continue; } @@ -318,16 +321,7 @@ public abstract class SubscriptionPrefetchingQueue { PipeDataNodeAgent.runtime().getRebootTimes(), topicName, brokerId, - subscriptionCommitIdGenerator.getAndIncrement()); - } - - private SubscriptionCommitContext generateInvalidSubscriptionCommitContext() { - return new SubscriptionCommitContext( - IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), - PipeDataNodeAgent.runtime().getRebootTimes(), - topicName, - brokerId, - INVALID_COMMIT_ID); + commitIdGenerator.getAndIncrement()); } //////////////////////////// APIs provided for metric framework //////////////////////////// @@ -346,10 +340,10 @@ public abstract class SubscriptionPrefetchingQueue { } public long getCurrentCommitId() { - return subscriptionCommitIdGenerator.get(); + return commitIdGenerator.get(); } - /////////////////////////////// termination /////////////////////////////// + /////////////////////////////// close & termination /////////////////////////////// public boolean isClosed() { return isClosed; @@ -367,25 +361,6 @@ public abstract class SubscriptionPrefetchingQueue { isCompleted = true; } - public SubscriptionEvent generateSubscriptionPollTerminationResponse() { - return new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), - new SubscriptionPollResponse( - SubscriptionPollResponseType.TERMINATION.getType(), - new TerminationPayload(), - generateInvalidSubscriptionCommitContext())); - } - - public SubscriptionEvent generateSubscriptionPollErrorResponse( - final String errorMessage, final boolean critical) { - return new SubscriptionEvent( - new SubscriptionPipeEmptyEvent(), - new SubscriptionPollResponse( - SubscriptionPollResponseType.ERROR.getType(), - new ErrorPayload(errorMessage, critical), - generateInvalidSubscriptionCommitContext())); - } - /////////////////////////////// stringify /////////////////////////////// protected Map<String, String> coreReportMessage() { @@ -393,7 +368,7 @@ public abstract class SubscriptionPrefetchingQueue { result.put("brokerId", brokerId); result.put("topicName", topicName); result.put("size of uncommittedEvents", String.valueOf(uncommittedEvents.size())); - result.put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); + result.put("commitIdGenerator", commitIdGenerator.toString()); result.put("isCompleted", String.valueOf(isCompleted)); result.put("isClosed", String.valueOf(isClosed)); return result; @@ -406,7 +381,7 @@ public abstract class SubscriptionPrefetchingQueue { result.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size())); result.put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size())); result.put("uncommittedEvents", uncommittedEvents.toString()); - result.put("subscriptionCommitIdGenerator", subscriptionCommitIdGenerator.toString()); + result.put("commitIdGenerator", commitIdGenerator.toString()); result.put("isCompleted", String.valueOf(isCompleted)); result.put("isClosed", String.valueOf(isClosed)); return result; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java index 1c327d2671d..47c57cb1897 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQueue { @@ -53,8 +54,9 @@ public class SubscriptionPrefetchingTabletQueue extends SubscriptionPrefetchingQ public SubscriptionPrefetchingTabletQueue( final String brokerId, final String topicName, - final SubscriptionBlockingPendingQueue inputPendingQueue) { - super(brokerId, topicName, inputPendingQueue); + final SubscriptionBlockingPendingQueue inputPendingQueue, + final AtomicLong commitIdGenerator) { + super(brokerId, topicName, inputPendingQueue, commitIdGenerator); this.currentBatchRef.set( new SubscriptionPipeTabletEventBatch(this, BATCH_MAX_DELAY_IN_MS, BATCH_MAX_SIZE_IN_BYTES)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java index f0604eca157..d8ccc8fed69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java @@ -20,11 +20,15 @@ package org.apache.iotdb.db.subscription.broker; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.subscription.event.SubscriptionEvent; import org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch; +import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent; import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload; import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload; import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload; import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext; @@ -46,8 +50,11 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID; + public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQueue { private static final Logger LOGGER = @@ -66,8 +73,9 @@ public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQ public SubscriptionPrefetchingTsFileQueue( final String brokerId, final String topicName, - final SubscriptionBlockingPendingQueue inputPendingQueue) { - super(brokerId, topicName, inputPendingQueue); + final SubscriptionBlockingPendingQueue inputPendingQueue, + final AtomicLong commitIdGenerator) { + super(brokerId, topicName, inputPendingQueue, commitIdGenerator); this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>(); this.currentBatchRef.set( @@ -429,7 +437,17 @@ public class SubscriptionPrefetchingTsFileQueue extends SubscriptionPrefetchingQ private SubscriptionEvent generateSubscriptionPollErrorResponse(final String errorMessage) { // consider non-critical by default, meaning the client can retry - return super.generateSubscriptionPollErrorResponse(errorMessage, false); + return new SubscriptionEvent( + new SubscriptionPipeEmptyEvent(), + new SubscriptionPollResponse( + SubscriptionPollResponseType.ERROR.getType(), + new ErrorPayload(errorMessage, false), + new SubscriptionCommitContext( + IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), + PipeDataNodeAgent.runtime().getRebootTimes(), + topicName, + brokerId, + INVALID_COMMIT_ID))); } /////////////////////////////// stringify /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java index 41e690cbce9..c05bd13f07c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java @@ -103,6 +103,6 @@ public class SubscriptionConnectorSubtaskLifeCycle extends PipeConnectorSubtaskL // when dropping the subscription. final String consumerGroupId = ((SubscriptionConnectorSubtask) subtask).getConsumerGroupId(); final String topicName = ((SubscriptionConnectorSubtask) subtask).getTopicName(); - SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, topicName, false); + SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, topicName); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index 6d44d26c332..cbdbef1e4e9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -29,9 +29,12 @@ import org.apache.iotdb.pipe.api.event.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; /** * {@link EnrichedEvent} is an {@link Event} that can be enriched with additional runtime @@ -65,6 +68,7 @@ public abstract class EnrichedEvent implements Event { protected boolean isTimeParsed; protected boolean shouldReportOnCommit = true; + protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>(); protected EnrichedEvent( final String pipeName, @@ -83,6 +87,13 @@ public abstract class EnrichedEvent implements Event { this.endTime = endTime; isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot(); isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime; + addOnCommittedHook( + () -> { + if (shouldReportOnCommit) { + reportProgress(); + } + return null; + }); } /** @@ -346,9 +357,11 @@ public abstract class EnrichedEvent implements Event { } public void onCommitted() { - if (shouldReportOnCommit) { - reportProgress(); - } + onCommittedHooks.forEach(Supplier::get); + } + + public void addOnCommittedHook(final Supplier<Void> hook) { + onCommittedHooks.add(hook); } public boolean isReleased() {
