This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c03e58  [FLINK-19769][streaming] Reuse StreamRecord when emitting 
records from source outputs
1c03e58 is described below

commit 1c03e5818523ede61d661ad5843309a75d06f31b
Author: TsReaper <[email protected]>
AuthorDate: Thu Oct 22 20:57:17 2020 +0800

    [FLINK-19769][streaming] Reuse StreamRecord when emitting records from 
source outputs
    
    This closes #13748
---
 .../streaming/api/operators/source/BatchTimestampsAndWatermarks.java | 4 +++-
 .../streaming/api/operators/source/SourceOutputWithWatermarks.java   | 5 ++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
index 135cad9..7475825 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
@@ -77,6 +77,7 @@ public class BatchTimestampsAndWatermarks<T> implements 
TimestampsAndWatermarks<
 
                private final PushingAsyncDataInput.DataOutput<T> output;
                private final TimestampAssigner<T> timestampAssigner;
+               private final StreamRecord<T> reusingRecord;
 
                private TimestampsOnlyOutput(
                                PushingAsyncDataInput.DataOutput<T> output,
@@ -84,6 +85,7 @@ public class BatchTimestampsAndWatermarks<T> implements 
TimestampsAndWatermarks<
 
                        this.output = output;
                        this.timestampAssigner = timestampAssigner;
+                       this.reusingRecord = new StreamRecord<>(null);
                }
 
                @Override
@@ -94,7 +96,7 @@ public class BatchTimestampsAndWatermarks<T> implements 
TimestampsAndWatermarks<
                @Override
                public void collect(T record, long timestamp) {
                        try {
-                               output.emitRecord(new StreamRecord<>(record, 
timestampAssigner.extractTimestamp(record, timestamp)));
+                               output.emitRecord(reusingRecord.replace(record, 
timestampAssigner.extractTimestamp(record, timestamp)));
                        } catch (ExceptionInChainedOperatorException e) {
                                throw e;
                        } catch (Exception e) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
index 3a8396e..a8d8ef9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
@@ -67,6 +67,8 @@ public class SourceOutputWithWatermarks<T> implements 
SourceOutput<T> {
 
        private final WatermarkOutput periodicWatermarkOutput;
 
+       private final StreamRecord<T> reusingRecord;
+
        /**
         * Creates a new SourceOutputWithWatermarks that emits records to the 
given DataOutput
         * and watermarks to the (possibly different) WatermarkOutput.
@@ -83,6 +85,7 @@ public class SourceOutputWithWatermarks<T> implements 
SourceOutput<T> {
                this.periodicWatermarkOutput = 
checkNotNull(periodicWatermarkOutput);
                this.timestampAssigner = checkNotNull(timestampAssigner);
                this.watermarkGenerator = checkNotNull(watermarkGenerator);
+               this.reusingRecord = new StreamRecord<>(null);
        }
 
        // 
------------------------------------------------------------------------
@@ -103,7 +106,7 @@ public class SourceOutputWithWatermarks<T> implements 
SourceOutput<T> {
                        final long assignedTimestamp = 
timestampAssigner.extractTimestamp(record, timestamp);
 
                        // IMPORTANT: The event must be emitted before the 
watermark generator is called.
-                       recordsOutput.emitRecord(new StreamRecord<>(record, 
assignedTimestamp));
+                       recordsOutput.emitRecord(reusingRecord.replace(record, 
assignedTimestamp));
                        watermarkGenerator.onEvent(record, assignedTimestamp, 
onEventWatermarkOutput);
                } catch (ExceptionInChainedOperatorException e) {
                        throw e;

Reply via email to