This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch benchmark-request
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d83d0e92701866ca181c7c8d88a2d438a61e3be0
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed May 26 08:51:55 2021 +0200

    Revert IDLE/ACTIVE on records
---
 .../java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java  | 2 +-
 .../flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java      | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 8d1d38c..05351b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -100,7 +100,7 @@ public class RecordWriterOutput<OUT> implements 
WatermarkGaugeExposingOutput<Str
     private <X> void pushToRecordWriter(StreamRecord<X> record) {
         // record could've been generated somewhere in the pipeline even 
though an IDLE status was
         // emitted. It might've originated from a timer or just a wrong 
behaving operator
-        try (AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus)) {
+        try /*(AutoCloseable ignored = 
announcedStatus.ensureActive(this::writeStreamStatus))*/ {
             serializationDelegate.setInstance(record);
             recordWriter.emit(serializationDelegate);
         } catch (Exception e) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 352557c..5ed7b4a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -618,10 +618,8 @@ public class MultipleInputStreamTaskTest {
 
             // FLIP-27 sources do not emit active status on new records, we 
wrap a record with
             // ACTIVE/IDLE sequence
-            expectedOutput.add(StreamStatus.ACTIVE);
             expectedOutput.add(
                     new StreamRecord<>("" + (initialTime + 10), 
TimestampAssigner.NO_TIMESTAMP));
-            expectedOutput.add(StreamStatus.IDLE);
             expectedOutput.add(StreamStatus.ACTIVE); // activate source on new 
watermark
             expectedOutput.add(new Watermark(initialTime + 10)); // forward W 
from source
             expectedOutput.add(StreamStatus.IDLE); // go idle after reading 
all records

Reply via email to