This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c03e58 [FLINK-19769][streaming] Reuse StreamRecord when emitting
records from source outputs
1c03e58 is described below
commit 1c03e5818523ede61d661ad5843309a75d06f31b
Author: TsReaper <[email protected]>
AuthorDate: Thu Oct 22 20:57:17 2020 +0800
[FLINK-19769][streaming] Reuse StreamRecord when emitting records from
source outputs
This closes #13748
---
.../streaming/api/operators/source/BatchTimestampsAndWatermarks.java | 4 +++-
.../streaming/api/operators/source/SourceOutputWithWatermarks.java | 5 ++++-
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
index 135cad9..7475825 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/BatchTimestampsAndWatermarks.java
@@ -77,6 +77,7 @@ public class BatchTimestampsAndWatermarks<T> implements
TimestampsAndWatermarks<
private final PushingAsyncDataInput.DataOutput<T> output;
private final TimestampAssigner<T> timestampAssigner;
+ private final StreamRecord<T> reusingRecord;
private TimestampsOnlyOutput(
PushingAsyncDataInput.DataOutput<T> output,
@@ -84,6 +85,7 @@ public class BatchTimestampsAndWatermarks<T> implements
TimestampsAndWatermarks<
this.output = output;
this.timestampAssigner = timestampAssigner;
+ this.reusingRecord = new StreamRecord<>(null);
}
@Override
@@ -94,7 +96,7 @@ public class BatchTimestampsAndWatermarks<T> implements
TimestampsAndWatermarks<
@Override
public void collect(T record, long timestamp) {
try {
- output.emitRecord(new StreamRecord<>(record,
timestampAssigner.extractTimestamp(record, timestamp)));
+ output.emitRecord(reusingRecord.replace(record,
timestampAssigner.extractTimestamp(record, timestamp)));
} catch (ExceptionInChainedOperatorException e) {
throw e;
} catch (Exception e) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
index 3a8396e..a8d8ef9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
@@ -67,6 +67,8 @@ public class SourceOutputWithWatermarks<T> implements
SourceOutput<T> {
private final WatermarkOutput periodicWatermarkOutput;
+ private final StreamRecord<T> reusingRecord;
+
/**
* Creates a new SourceOutputWithWatermarks that emits records to the
given DataOutput
* and watermarks to the (possibly different) WatermarkOutput.
@@ -83,6 +85,7 @@ public class SourceOutputWithWatermarks<T> implements
SourceOutput<T> {
this.periodicWatermarkOutput =
checkNotNull(periodicWatermarkOutput);
this.timestampAssigner = checkNotNull(timestampAssigner);
this.watermarkGenerator = checkNotNull(watermarkGenerator);
+ this.reusingRecord = new StreamRecord<>(null);
}
//
------------------------------------------------------------------------
@@ -103,7 +106,7 @@ public class SourceOutputWithWatermarks<T> implements
SourceOutput<T> {
final long assignedTimestamp =
timestampAssigner.extractTimestamp(record, timestamp);
// IMPORTANT: The event must be emitted before the
watermark generator is called.
- recordsOutput.emitRecord(new StreamRecord<>(record,
assignedTimestamp));
+ recordsOutput.emitRecord(reusingRecord.replace(record,
assignedTimestamp));
watermarkGenerator.onEvent(record, assignedTimestamp,
onEventWatermarkOutput);
} catch (ExceptionInChainedOperatorException e) {
throw e;