This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d17d0ef7d92 Pipe: Fix websocket (Flink CDC) connector may report 
tsfile event progress in advance (#12284)
d17d0ef7d92 is described below

commit d17d0ef7d923633d6909fca13dc1d0bf38433bf6
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 14:54:17 2024 +0800

    Pipe: Fix websocket (Flink CDC) connector may report tsfile event progress 
in advance (#12284)
---
 .../db/pipe/connector/protocol/websocket/WebSocketConnector.java    | 6 +++---
 .../iotdb/db/pipe/processor/aggregate/AggregateProcessor.java       | 2 +-
 .../java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java     | 4 ++--
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index ad6581ef511..049ab3d2e05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -120,9 +120,9 @@ public class WebSocketConnector implements PipeConnector {
 
     try {
       for (TabletInsertionEvent event : 
tsFileInsertionEvent.toTabletInsertionEvents()) {
-        ((EnrichedEvent) 
event).increaseReferenceCount(WebSocketConnector.class.getName());
-
-        server.addEvent(event, this);
+        // Skip report if any tablet events is added
+        ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
+        transfer(event);
       }
     } finally {
       tsFileInsertionEvent.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 6f6a83cfd5b..472c8a6c8d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -452,7 +452,7 @@ public class AggregateProcessor implements PipeProcessor {
     // The timeProgressIndex shall only be reported by the output events
     // whose progressIndex is bounded with tablet events
     if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
-      ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReport();
+      ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
     }
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index a782db82adb..3a147a38682 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -175,9 +175,9 @@ public abstract class EnrichedEvent implements Event {
 
   /**
    * Externally skip the report of the processing {@link ProgressIndex} of 
this {@link
-   * EnrichedEvent}.
+   * EnrichedEvent} when committed. Report by generated events are still 
allowed.
    */
-  public void skipReport() {
+  public void skipReportOnCommit() {
     shouldReportOnCommit = false;
   }
 

Reply via email to