rkhachatryan commented on code in PR #26453:
URL: https://github.com/apache/flink/pull/26453#discussion_r2044014039


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java:
##########
@@ -24,4 +24,12 @@
  * Root interface for all events sent between {@link OperatorCoordinator} and 
an {@link
  * OperatorEventHandler}.
  */
-public interface OperatorEvent extends Serializable {}
+public interface OperatorEvent extends Serializable {
+    /**
+     * @return true if event is optional and an occasional loss or inability 
to deliver that event
+     *     doesn't affect the job's correctness.
+     */
+    default boolean isOptional() {
+        return false;

Review Comment:
   NIT: 
   How about returning an `enum DeliveryRequirement { MANDATORY, OPTIONAL }`
   or `void handleDeliveryFailure(Exception failure);`?
   
   For more flexibility (e.g. tolerating up to N failures)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -218,15 +218,12 @@ void announceCombinedWatermark() {
                 operatorName);
 
         for (Integer subtaskId : subTaskIds) {
-            // when subtask have been finished, do not send event.
-            if (!context.hasNoMoreSplits(subtaskId)) {

Review Comment:
   Could you explain/refer to what will happen in tasks that don't have splits? 
I guess it will be just ignored?
   
   What if the task/operator has switched to FINISHED?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -111,6 +111,10 @@ public CompletableFuture<Acknowledge> 
sendEvent(OperatorEvent evt) {
                 sendResult.whenCompleteAsync(
                         (success, failure) -> {
                             if (failure != null && 
subtaskAccess.isStillRunning()) {
+                                if (failure instanceof TaskNotRunningException

Review Comment:
   Just to clarify: we only tolerate `TaskNotRunningException` and not 
arbitrary exceptions (e.g. network failures) because if the error persists then 
the pipeline will be blocked by alignment, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to