This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 537d24e63f05a44e7817c7f7f7abcbb90fa76a2c Author: V_Galaxy <[email protected]> AuthorDate: Wed Jul 31 11:51:35 2024 +0800 Subscription: improve deduplication logic for PipeRawTabletInsertionEvent (#13061) (cherry picked from commit 7bf6eea139c5a6ad3c0f8e56b830045a208ed125) --- .../it/local/IoTDBSubscriptionBasicIT.java | 63 ++++++++++++++++++++++ .../common/tablet/PipeRawTabletInsertionEvent.java | 4 ++ .../common/tsfile/PipeTsFileInsertionEvent.java | 14 ++++- .../event/realtime/PipeRealtimeEventFactory.java | 2 +- .../PipeHistoricalDataRegionTsFileExtractor.java | 1 + .../TsFileDeduplicationBlockingPendingQueue.java | 48 +++++++++++++---- 6 files changed, 121 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java index 8f5e8683746..68287ab6619 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java @@ -482,4 +482,67 @@ public class IoTDBSubscriptionBasicIT extends AbstractSubscriptionLocalIT { fail(e.getMessage()); } } + + @Test + public void testDataSetDeduplication() { + // Insert some historical data + try (final ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.createDatabase("root.db"); + for (int i = 0; i < 100; ++i) { + session.executeNonQueryStatement( + String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 2)", i)); + session.executeNonQueryStatement( + String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 4)", i)); + } + // DO NOT FLUSH HERE + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Create topic + final String topicName = "topic6"; + final String host = EnvFactory.getEnv().getIP(); + final int port = Integer.parseInt(EnvFactory.getEnv().getPort()); + try (final SubscriptionSession session = new SubscriptionSession(host, port)) { + session.open(); + final Properties config = new Properties(); + config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1"); + session.createTopic(topicName, config); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + // Subscription + final AtomicInteger rowCount = new AtomicInteger(); + try (final SubscriptionPushConsumer consumer = + new SubscriptionPushConsumer.Builder() + .host(host) + .port(port) + .consumerId("c1") + .consumerGroupId("cg1") + .ackStrategy(AckStrategy.AFTER_CONSUME) + .consumeListener( + message -> { + for (final SubscriptionSessionDataSet dataSet : + message.getSessionDataSetsHandler()) { + while (dataSet.hasNext()) { + dataSet.next(); + rowCount.addAndGet(1); + } + } + return ConsumeResult.SUCCESS; + }) + .buildPushConsumer()) { + + consumer.open(); + consumer.subscribe(topicName); + + AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get())); + } catch (final Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 96d5f2d90c3..285121ae63c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -208,6 +208,10 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet return Objects.nonNull(tablet) ? tablet.getDeviceId() : deviceId; } + public EnrichedEvent getSourceEvent() { + return sourceEvent; + } + /////////////////////////// TabletInsertionEvent /////////////////////////// @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 79fa1f18388..7ae01e77bd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -62,6 +62,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns private final boolean isLoaded; private final boolean isGeneratedByPipe; private final boolean isGeneratedByPipeConsensus; + private final boolean isGeneratedByHistoricalExtractor; private final AtomicBoolean isClosed; private TsFileInsertionDataContainer dataContainer; @@ -71,13 +72,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET; public PipeTsFileInsertionEvent( - final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe) { + final TsFileResource resource, + final boolean isLoaded, + final boolean isGeneratedByPipe, + final boolean isGeneratedByHistoricalExtractor) { // The modFile must be copied before the event is assigned to the listening pipes this( resource, true, isLoaded, isGeneratedByPipe, + isGeneratedByHistoricalExtractor, null, 0, null, @@ -91,6 +96,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns final boolean isWithMod, final boolean isLoaded, final boolean isGeneratedByPipe, + final boolean isGeneratedByHistoricalExtractor, final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, @@ -109,6 +115,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns this.isLoaded = isLoaded; this.isGeneratedByPipe = isGeneratedByPipe; this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus(); + this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor; isClosed = new AtomicBoolean(resource.isClosed()); // Register close listener if TsFile is not closed @@ -290,6 +297,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns isWithMod, isLoaded, isGeneratedByPipe, + isGeneratedByHistoricalExtractor, pipeName, creationTime, pipeTaskMeta, @@ -365,6 +373,10 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns return isGeneratedByPipeConsensus; } + public boolean isGeneratedByHistoricalExtractor() { + return isGeneratedByHistoricalExtractor; + } + private TsFileInsertionDataContainer initDataContainer() { try { if (dataContainer == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index 60786cfed25..1dd86e1e5d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -37,7 +37,7 @@ public class PipeRealtimeEventFactory { public static PipeRealtimeEvent createRealtimeEvent( final TsFileResource resource, final boolean isLoaded, final boolean isGeneratedByPipe) { return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent( - new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe), resource); + new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe, false), resource); } public static PipeRealtimeEvent createRealtimeEvent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 3d6f75e02e5..8120c8364e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -587,6 +587,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa shouldTransferModFile, false, false, + true, pipeName, creationTime, pipeTaskMeta, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java index 7929d2a0ee1..5b4001890e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.subscription.broker; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -37,15 +39,15 @@ public class TsFileDeduplicationBlockingPendingQueue extends SubscriptionBlockin private static final Logger LOGGER = LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class); - private final Cache<Integer, Integer> polledTsFiles; + private final Cache<Integer, Boolean> hashCodeToIsGeneratedByHistoricalExtractor; public TsFileDeduplicationBlockingPendingQueue( final UnboundedBlockingPendingQueue<Event> inputPendingQueue) { super(inputPendingQueue); - this.polledTsFiles = + this.hashCodeToIsGeneratedByHistoricalExtractor = Caffeine.newBuilder() - .expireAfterWrite( + .expireAfterAccess( SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(), TimeUnit.SECONDS) .build(); @@ -61,21 +63,49 @@ public class TsFileDeduplicationBlockingPendingQueue extends SubscriptionBlockin return null; } + if (event instanceof PipeRawTabletInsertionEvent) { + final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) event; + final EnrichedEvent sourceEvent = pipeRawTabletInsertionEvent.getSourceEvent(); + if (sourceEvent instanceof PipeTsFileInsertionEvent + && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) { + // commit directly + pipeRawTabletInsertionEvent.decreaseReferenceCount( + TsFileDeduplicationBlockingPendingQueue.class.getName(), true); + return null; + } + } + if (event instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) event; - final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode(); - if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) { - LOGGER.info( - "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit it directly", - pipeTsFileInsertionEvent.coreReportMessage()); + if (isDuplicated(pipeTsFileInsertionEvent)) { // commit directly pipeTsFileInsertionEvent.decreaseReferenceCount( TsFileDeduplicationBlockingPendingQueue.class.getName(), true); return null; } - polledTsFiles.put(hashcode, hashcode); } return event; } + + private boolean isDuplicated(final PipeTsFileInsertionEvent event) { + final int hashCode = event.getTsFile().hashCode(); + final boolean isGeneratedByHistoricalExtractor = event.isGeneratedByHistoricalExtractor(); + final Boolean existedIsGeneratedByHistoricalExtractor = + hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode); + if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) { + hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode, isGeneratedByHistoricalExtractor); + return false; + } + // Multiple PipeRawTabletInsertionEvents parsed from the same PipeTsFileInsertionEvent (i.e., + // with the same isGeneratedByHistoricalExtractor field) are not considered duplicates. + if (Objects.equals(existedIsGeneratedByHistoricalExtractor, isGeneratedByHistoricalExtractor)) { + return false; + } + LOGGER.info( + "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit it directly", + event.coreReportMessage()); + return true; + } }
