pnowojski commented on a change in pull request #15294:
URL: https://github.com/apache/flink/pull/15294#discussion_r598078125



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -715,7 +715,12 @@ public OP getMainOperator() {
                             
taskEnvironment.getUserCodeClassLoader().asClassLoader());
         }
 
-        return new RecordWriterOutput<>(recordWriter, outSerializer, 
sideOutputTag, this);
+        return new RecordWriterOutput<>(
+                recordWriter,
+                outSerializer,
+                sideOutputTag,
+                this,
+                edge.getPartitioner().isPointwise());

Review comment:
       Can downstream task have two different incoming edges, one pointwise the 
other not? If so that might require a bit code/testing (combination of non 
timeoutable CB and UC CB probably doesn't work) .

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
##########
@@ -140,7 +146,8 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) {
     }
 
     public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) 
throws IOException {
-        recordWriter.broadcastEvent(event, isPriorityEvent);
+        recordWriter.broadcastEvent(
+                event, isPriorityEvent && !(event instanceof CheckpointBarrier 
&& isPointwise));

Review comment:
       I think this is not enough. This way you are sending potentially an 
unaligned or timeoutable aligned CB as non priority event, but this priority 
flag only has an effect on the output. On the input this could still be 
processed as UC or timeout to UC?
   
   Unless you think it's fine, and we can rescale from pointwise connections if 
UC is supported only on the inputs? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to