This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 537d24e63f05a44e7817c7f7f7abcbb90fa76a2c
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Jul 31 11:51:35 2024 +0800

    Subscription: improve deduplication logic for PipeRawTabletInsertionEvent 
(#13061)
    
    (cherry picked from commit 7bf6eea139c5a6ad3c0f8e56b830045a208ed125)
---
 .../it/local/IoTDBSubscriptionBasicIT.java         | 63 ++++++++++++++++++++++
 .../common/tablet/PipeRawTabletInsertionEvent.java |  4 ++
 .../common/tsfile/PipeTsFileInsertionEvent.java    | 14 ++++-
 .../event/realtime/PipeRealtimeEventFactory.java   |  2 +-
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  1 +
 .../TsFileDeduplicationBlockingPendingQueue.java   | 48 +++++++++++++----
 6 files changed, 121 insertions(+), 11 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 8f5e8683746..68287ab6619 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -482,4 +482,67 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testDataSetDeduplication() {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      session.createDatabase("root.db");
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1, s2) values (%s, 1, 
2)", i));
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d2(time, s1, s2) values (%s, 3, 
4)", i));
+      }
+      // DO NOT FLUSH HERE
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic6";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.PATTERN_KEY, "root.db.d1.s1");
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    try (final SubscriptionPushConsumer consumer =
+        new SubscriptionPushConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .ackStrategy(AckStrategy.AFTER_CONSUME)
+            .consumeListener(
+                message -> {
+                  for (final SubscriptionSessionDataSet dataSet :
+                      message.getSessionDataSetsHandler()) {
+                    while (dataSet.hasNext()) {
+                      dataSet.next();
+                      rowCount.addAndGet(1);
+                    }
+                  }
+                  return ConsumeResult.SUCCESS;
+                })
+            .buildPushConsumer()) {
+
+      consumer.open();
+      consumer.subscribe(topicName);
+
+      AWAIT.untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 96d5f2d90c3..285121ae63c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -208,6 +208,10 @@ public class PipeRawTabletInsertionEvent extends 
EnrichedEvent implements Tablet
     return Objects.nonNull(tablet) ? tablet.getDeviceId() : deviceId;
   }
 
+  public EnrichedEvent getSourceEvent() {
+    return sourceEvent;
+  }
+
   /////////////////////////// TabletInsertionEvent ///////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 79fa1f18388..7ae01e77bd3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -62,6 +62,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private final boolean isLoaded;
   private final boolean isGeneratedByPipe;
   private final boolean isGeneratedByPipeConsensus;
+  private final boolean isGeneratedByHistoricalExtractor;
 
   private final AtomicBoolean isClosed;
   private TsFileInsertionDataContainer dataContainer;
@@ -71,13 +72,17 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
 
   public PipeTsFileInsertionEvent(
-      final TsFileResource resource, final boolean isLoaded, final boolean 
isGeneratedByPipe) {
+      final TsFileResource resource,
+      final boolean isLoaded,
+      final boolean isGeneratedByPipe,
+      final boolean isGeneratedByHistoricalExtractor) {
     // The modFile must be copied before the event is assigned to the 
listening pipes
     this(
         resource,
         true,
         isLoaded,
         isGeneratedByPipe,
+        isGeneratedByHistoricalExtractor,
         null,
         0,
         null,
@@ -91,6 +96,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
       final boolean isWithMod,
       final boolean isLoaded,
       final boolean isGeneratedByPipe,
+      final boolean isGeneratedByHistoricalExtractor,
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
@@ -109,6 +115,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
     this.isLoaded = isLoaded;
     this.isGeneratedByPipe = isGeneratedByPipe;
     this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
+    this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
 
     isClosed = new AtomicBoolean(resource.isClosed());
     // Register close listener if TsFile is not closed
@@ -290,6 +297,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
         isWithMod,
         isLoaded,
         isGeneratedByPipe,
+        isGeneratedByHistoricalExtractor,
         pipeName,
         creationTime,
         pipeTaskMeta,
@@ -365,6 +373,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return isGeneratedByPipeConsensus;
   }
 
+  public boolean isGeneratedByHistoricalExtractor() {
+    return isGeneratedByHistoricalExtractor;
+  }
+
   private TsFileInsertionDataContainer initDataContainer() {
     try {
       if (dataContainer == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 60786cfed25..1dd86e1e5d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -37,7 +37,7 @@ public class PipeRealtimeEventFactory {
   public static PipeRealtimeEvent createRealtimeEvent(
       final TsFileResource resource, final boolean isLoaded, final boolean 
isGeneratedByPipe) {
     return TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(
-        new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe), 
resource);
+        new PipeTsFileInsertionEvent(resource, isLoaded, isGeneratedByPipe, 
false), resource);
   }
 
   public static PipeRealtimeEvent createRealtimeEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 3d6f75e02e5..8120c8364e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -587,6 +587,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             shouldTransferModFile,
             false,
             false,
+            true,
             pipeName,
             creationTime,
             pipeTaskMeta,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
index 7929d2a0ee1..5b4001890e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/TsFileDeduplicationBlockingPendingQueue.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -37,15 +39,15 @@ public class TsFileDeduplicationBlockingPendingQueue 
extends SubscriptionBlockin
   private static final Logger LOGGER =
       LoggerFactory.getLogger(TsFileDeduplicationBlockingPendingQueue.class);
 
-  private final Cache<Integer, Integer> polledTsFiles;
+  private final Cache<Integer, Boolean> 
hashCodeToIsGeneratedByHistoricalExtractor;
 
   public TsFileDeduplicationBlockingPendingQueue(
       final UnboundedBlockingPendingQueue<Event> inputPendingQueue) {
     super(inputPendingQueue);
 
-    this.polledTsFiles =
+    this.hashCodeToIsGeneratedByHistoricalExtractor =
         Caffeine.newBuilder()
-            .expireAfterWrite(
+            .expireAfterAccess(
                 
SubscriptionConfig.getInstance().getSubscriptionTsFileDeduplicationWindowSeconds(),
                 TimeUnit.SECONDS)
             .build();
@@ -61,21 +63,49 @@ public class TsFileDeduplicationBlockingPendingQueue 
extends SubscriptionBlockin
       return null;
     }
 
+    if (event instanceof PipeRawTabletInsertionEvent) {
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) event;
+      final EnrichedEvent sourceEvent = 
pipeRawTabletInsertionEvent.getSourceEvent();
+      if (sourceEvent instanceof PipeTsFileInsertionEvent
+          && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+        // commit directly
+        pipeRawTabletInsertionEvent.decreaseReferenceCount(
+            TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
+        return null;
+      }
+    }
+
     if (event instanceof PipeTsFileInsertionEvent) {
       final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = 
(PipeTsFileInsertionEvent) event;
-      final int hashcode = pipeTsFileInsertionEvent.getTsFile().hashCode();
-      if (Objects.nonNull(polledTsFiles.getIfPresent(hashcode))) {
-        LOGGER.info(
-            "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, 
commit it directly",
-            pipeTsFileInsertionEvent.coreReportMessage());
+      if (isDuplicated(pipeTsFileInsertionEvent)) {
         // commit directly
         pipeTsFileInsertionEvent.decreaseReferenceCount(
             TsFileDeduplicationBlockingPendingQueue.class.getName(), true);
         return null;
       }
-      polledTsFiles.put(hashcode, hashcode);
     }
 
     return event;
   }
+
+  private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+    final int hashCode = event.getTsFile().hashCode();
+    final boolean isGeneratedByHistoricalExtractor = 
event.isGeneratedByHistoricalExtractor();
+    final Boolean existedIsGeneratedByHistoricalExtractor =
+        hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+    if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+      hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode, 
isGeneratedByHistoricalExtractor);
+      return false;
+    }
+    // Multiple PipeRawTabletInsertionEvents parsed from the same 
PipeTsFileInsertionEvent (i.e.,
+    // with the same isGeneratedByHistoricalExtractor field) are not 
considered duplicates.
+    if (Objects.equals(existedIsGeneratedByHistoricalExtractor, 
isGeneratedByHistoricalExtractor)) {
+      return false;
+    }
+    LOGGER.info(
+        "Subscription: Detect duplicated PipeTsFileInsertionEvent {}, commit 
it directly",
+        event.coreReportMessage());
+    return true;
+  }
 }

Reply via email to