This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d17d0ef7d92 Pipe: Fix websocket (Flink CDC) connector may report
tsfile event progress in advance (#12284)
d17d0ef7d92 is described below
commit d17d0ef7d923633d6909fca13dc1d0bf38433bf6
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 3 14:54:17 2024 +0800
Pipe: Fix websocket (Flink CDC) connector may report tsfile event progress
in advance (#12284)
---
.../db/pipe/connector/protocol/websocket/WebSocketConnector.java | 6 +++---
.../iotdb/db/pipe/processor/aggregate/AggregateProcessor.java | 2 +-
.../java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java | 4 ++--
3 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index ad6581ef511..049ab3d2e05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -120,9 +120,9 @@ public class WebSocketConnector implements PipeConnector {
try {
for (TabletInsertionEvent event :
tsFileInsertionEvent.toTabletInsertionEvents()) {
- ((EnrichedEvent)
event).increaseReferenceCount(WebSocketConnector.class.getName());
-
- server.addEvent(event, this);
+ // Skip report if any tablet events is added
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
+ transfer(event);
}
} finally {
tsFileInsertionEvent.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index 6f6a83cfd5b..472c8a6c8d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -452,7 +452,7 @@ public class AggregateProcessor implements PipeProcessor {
// The timeProgressIndex shall only be reported by the output events
// whose progressIndex is bounded with tablet events
if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
- ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReport();
+ ((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index a782db82adb..3a147a38682 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -175,9 +175,9 @@ public abstract class EnrichedEvent implements Event {
/**
* Externally skip the report of the processing {@link ProgressIndex} of
this {@link
- * EnrichedEvent}.
+ * EnrichedEvent} when committed. Report by generated events are still
allowed.
*/
- public void skipReport() {
+ public void skipReportOnCommit() {
shouldReportOnCommit = false;
}