This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 15b3645  [FLINK-31620] [upsert-kafka] Flush wrapped writer
15b3645 is described below

commit 15b3645abe9611ec49f3bc0d6875e87bc8fd14ec
Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com>
AuthorDate: Sun Mar 26 15:36:30 2023 -0700

    [FLINK-31620] [upsert-kafka] Flush wrapped writer
    
    - with the solution, the actual flushing only still happens on checkpoints
    - when buffer size is full or periodically triggered, we don't actually do 
flushing on the wrapped writer.
    
    This closes #17.
---
 .../streaming/connectors/kafka/table/ReducingUpsertWriter.java   | 9 +++++----
 .../connectors/kafka/table/ReducingUpsertWriterTest.java         | 7 ++++++-
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
index 5a87add..67df4a6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java
@@ -87,7 +87,8 @@ class ReducingUpsertWriter<WriterState>
 
     @Override
     public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
-        flush();
+        sinkBuffer();
+        wrappedWriter.flush(endOfInput);
     }
 
     @Override
@@ -109,7 +110,7 @@ class ReducingUpsertWriter<WriterState>
         reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
 
         if (reduceBuffer.size() >= batchMaxRowNums) {
-            flush();
+            sinkBuffer();
         }
     }
 
@@ -121,7 +122,7 @@ class ReducingUpsertWriter<WriterState>
                 lastFlush + batchIntervalMs,
                 (t) -> {
                     if (t >= lastFlush + batchIntervalMs) {
-                        flush();
+                        sinkBuffer();
                     }
                     registerFlush();
                 });
@@ -140,7 +141,7 @@ class ReducingUpsertWriter<WriterState>
         return value;
     }
 
-    private void flush() throws IOException, InterruptedException {
+    private void sinkBuffer() throws IOException, InterruptedException {
         for (Tuple2<RowData, Long> value : reduceBuffer.values()) {
             wrappedContext.setTimestamp(value.f1);
             wrappedWriter.write(value.f0, wrappedContext);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
index b6f2788..1ad9d09 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java
@@ -222,6 +222,7 @@ public class ReducingUpsertWriterTest {
         writeData(bufferedWriter, new ReusableIterator(0, 4));
         // snapshot should flush the buffer
         bufferedWriter.flush(true);
+        assertThat(writer.flushed).isTrue();
 
         HashMap<Integer, List<RowData>> expected = new HashMap<>();
         expected.put(
@@ -328,6 +329,8 @@ public class ReducingUpsertWriterTest {
     private static class MockedSinkWriter
             implements StatefulSink.StatefulSinkWriter<RowData, Void> {
 
+        boolean flushed = false;
+
         transient List<RowData> rowDataCollectors;
 
         MockedSinkWriter() {
@@ -343,7 +346,9 @@ public class ReducingUpsertWriterTest {
         }
 
         @Override
-        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {}
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+            flushed = true;
+        }
 
         @Override
         public void close() throws Exception {}

Reply via email to