pnowojski commented on a change in pull request #15294: URL: https://github.com/apache/flink/pull/15294#discussion_r598078125
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -715,7 +715,12 @@ public OP getMainOperator() { taskEnvironment.getUserCodeClassLoader().asClassLoader()); } - return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this); + return new RecordWriterOutput<>( + recordWriter, + outSerializer, + sideOutputTag, + this, + edge.getPartitioner().isPointwise()); Review comment: Can downstream task have two different incoming edges, one pointwise the other not? If so that might require a bit code/testing (combination of non timeoutable CB and UC CB probably doesn't work) . ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java ########## @@ -140,7 +146,8 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) { } public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { - recordWriter.broadcastEvent(event, isPriorityEvent); + recordWriter.broadcastEvent( + event, isPriorityEvent && !(event instanceof CheckpointBarrier && isPointwise)); Review comment: I think this is not enough. This way you are sending potentially an unaligned or timeoutable aligned CB as non priority event, but this priority flag only has an effect on the output. On the input this could still be processed as UC or timeout to UC? Unless you think it's fine, and we can rescale from pointwise connections if UC is supported only on the inputs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org