dawidwys commented on a change in pull request #18702:
URL: https://github.com/apache/flink/pull/18702#discussion_r804523534



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -370,7 +411,17 @@ public DataInputStatus emitNext(DataOutput<OUT> output) 
throws Exception {
     private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws 
Exception {
         switch (operatingMode) {
             case OUTPUT_NOT_INITIALIZED:
-                currentMainOutput = eventTimeLogic.createMainOutput(output);
+                if (watermarkAlignmentParams.isEnabled()) {
+                    // Only wrap the output when watermark alignment is 
enabled, as otherwise this
+                    // introduces a small performance regression (probably 
because of an extra
+                    // virtual call)
+                    processingTimeService.scheduleWithFixedDelay(
+                            this::emitLatestWatermark,
+                            watermarkAlignmentParams.getUpdateInterval(),
+                            watermarkAlignmentParams.getUpdateInterval());
+                }
+                currentMainOutput =
+                        eventTimeLogic.createMainOutput(output, 
this::onWatermarkEmitted);

Review comment:
       Does it make sense to pass an empty method instead of 
`onWatermarkEmitted` if watermark alignment is disabled? Right now we execute 
some unnecessary logic on every watermark if the alignment is disabled.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
##########
@@ -35,12 +36,21 @@
 public final class WatermarkToDataOutput implements WatermarkOutput {
 
     private final PushingAsyncDataInput.DataOutput<?> output;
+    private final TimestampsAndWatermarks.OnWatermarkEmitted watermarkEmitted;
     private long maxWatermarkSoFar;
     private boolean isIdle;
 
-    /** Creates a new WatermarkOutput against the given DataOutput. */
+    @VisibleForTesting
     public WatermarkToDataOutput(PushingAsyncDataInput.DataOutput<?> output) {
+        this(output, watermark -> {});
+    }
+
+    /** Creates a new WatermarkOutput against the given DataOutput. */
+    public WatermarkToDataOutput(
+            PushingAsyncDataInput.DataOutput<?> output,
+            TimestampsAndWatermarks.OnWatermarkEmitted watermarkEmitted) {
         this.output = checkNotNull(output);
+        this.watermarkEmitted = watermarkEmitted;

Review comment:
       nit: for the sake of consistency: `checkNotNull`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to