This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch benchmark-request in repository https://gitbox.apache.org/repos/asf/flink.git
commit d83d0e92701866ca181c7c8d88a2d438a61e3be0 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed May 26 08:51:55 2021 +0200 Revert IDLE/ACTIVE on records --- .../java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java | 2 +- .../flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 8d1d38c..05351b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -100,7 +100,7 @@ public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<Str private <X> void pushToRecordWriter(StreamRecord<X> record) { // record could've been generated somewhere in the pipeline even though an IDLE status was // emitted. It might've originated from a timer or just a wrong behaving operator - try (AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus)) { + try /*(AutoCloseable ignored = announcedStatus.ensureActive(this::writeStreamStatus))*/ { serializationDelegate.setInstance(record); recordWriter.emit(serializationDelegate); } catch (Exception e) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 352557c..5ed7b4a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -618,10 +618,8 @@ public class MultipleInputStreamTaskTest { // FLIP-27 sources do not emit active status on new records, we wrap a record with // ACTIVE/IDLE sequence - expectedOutput.add(StreamStatus.ACTIVE); expectedOutput.add( new StreamRecord<>("" + (initialTime + 10), TimestampAssigner.NO_TIMESTAMP)); - expectedOutput.add(StreamStatus.IDLE); expectedOutput.add(StreamStatus.ACTIVE); // activate source on new watermark expectedOutput.add(new Watermark(initialTime + 10)); // forward W from source expectedOutput.add(StreamStatus.IDLE); // go idle after reading all records