This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push: new 15b3645 [FLINK-31620] [upsert-kafka] Flush wrapped writer 15b3645 is described below commit 15b3645abe9611ec49f3bc0d6875e87bc8fd14ec Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com> AuthorDate: Sun Mar 26 15:36:30 2023 -0700 [FLINK-31620] [upsert-kafka] Flush wrapped writer - with the solution, the actual flushing only still happens on checkpoints - when buffer size is full or periodically triggered, we don't actually do flushing on the wrapped writer. This closes #17. --- .../streaming/connectors/kafka/table/ReducingUpsertWriter.java | 9 +++++---- .../connectors/kafka/table/ReducingUpsertWriterTest.java | 7 ++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java index 5a87add..67df4a6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.java @@ -87,7 +87,8 @@ class ReducingUpsertWriter<WriterState> @Override public void flush(boolean endOfInput) throws IOException, InterruptedException { - flush(); + sinkBuffer(); + wrappedWriter.flush(endOfInput); } @Override @@ -109,7 +110,7 @@ class ReducingUpsertWriter<WriterState> reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp)); if (reduceBuffer.size() >= batchMaxRowNums) { - flush(); + sinkBuffer(); } } @@ -121,7 +122,7 @@ class ReducingUpsertWriter<WriterState> lastFlush + batchIntervalMs, (t) -> { if (t >= lastFlush + batchIntervalMs) { - flush(); + sinkBuffer(); } registerFlush(); }); @@ -140,7 +141,7 @@ class ReducingUpsertWriter<WriterState> return value; } - private void flush() throws IOException, InterruptedException { + private void sinkBuffer() throws IOException, InterruptedException { for (Tuple2<RowData, Long> value : reduceBuffer.values()) { wrappedContext.setTimestamp(value.f1); wrappedWriter.write(value.f0, wrappedContext); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java index b6f2788..1ad9d09 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java @@ -222,6 +222,7 @@ public class ReducingUpsertWriterTest { writeData(bufferedWriter, new ReusableIterator(0, 4)); // snapshot should flush the buffer bufferedWriter.flush(true); + assertThat(writer.flushed).isTrue(); HashMap<Integer, List<RowData>> expected = new HashMap<>(); expected.put( @@ -328,6 +329,8 @@ public class ReducingUpsertWriterTest { private static class MockedSinkWriter implements StatefulSink.StatefulSinkWriter<RowData, Void> { + boolean flushed = false; + transient List<RowData> rowDataCollectors; MockedSinkWriter() { @@ -343,7 +346,9 @@ public class ReducingUpsertWriterTest { } @Override - public void flush(boolean endOfInput) throws IOException, InterruptedException {} + public void flush(boolean endOfInput) throws IOException, InterruptedException { + flushed = true; + } @Override public void close() throws Exception {}