pnowojski commented on a change in pull request #8467: [FLINK-12535][network]
Make CheckpointBarrierHandler non-blocking
URL: https://github.com/apache/flink/pull/8467#discussion_r285917056
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
##########
@@ -207,11 +210,28 @@ else if (bufferOrEvent.getEvent().getClass() ==
CancelCheckpointMarker.class) {
if (bufferOrEvent.getEvent().getClass() ==
EndOfPartitionEvent.class) {
processEndOfPartition();
}
- return bufferOrEvent;
+ return next;
}
}
}
+ private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
+ if (!inputGate.isFinished()) {
+ return Optional.empty();
+ }
+
+ if (!endOfStream) {
+ // end of input stream. stream continues with the
buffered data
Review comment:
I wouldn't refactor this code too much in the same effort. I have already
done quite some refactoring and we still have quite a bit things to do :(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services