rkhachatryan commented on code in PR #26453: URL: https://github.com/apache/flink/pull/26453#discussion_r2044014039
########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java: ########## @@ -24,4 +24,12 @@ * Root interface for all events sent between {@link OperatorCoordinator} and an {@link * OperatorEventHandler}. */ -public interface OperatorEvent extends Serializable {} +public interface OperatorEvent extends Serializable { + /** + * @return true if event is optional and an occasional loss or inability to deliver that event + * doesn't affect the job's correctness. + */ + default boolean isOptional() { + return false; Review Comment: NIT: How about returning an `enum DeliveryRequirement { MANDATORY, OPTIONAL }` or `void handleDeliveryFailure(Exception failure);`? For more flexibility (e.g. tolerating up to N failures) ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -218,15 +218,12 @@ void announceCombinedWatermark() { operatorName); for (Integer subtaskId : subTaskIds) { - // when subtask have been finished, do not send event. - if (!context.hasNoMoreSplits(subtaskId)) { Review Comment: Could you explain/refer to what will happen in tasks that don't have splits? I guess it will be just ignored? What if the task/operator has switched to FINISHED? ########## flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java: ########## @@ -111,6 +111,10 @@ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) { sendResult.whenCompleteAsync( (success, failure) -> { if (failure != null && subtaskAccess.isStillRunning()) { + if (failure instanceof TaskNotRunningException Review Comment: Just to clarify: we only tolerate `TaskNotRunningException` and not arbitrary exceptions (e.g. network failures) because if the error persists then the pipeline will be blocked by alignment, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org