This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c24ae6865b0338b368b13735014d1aff8a18469
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Apr 14 11:15:26 2025 +0200

    [FLINK-37670][runtime] Fix watermark alignment can deadlock job if there 
are no more splits to be assigned
    
    Previously WatermarkAlignmentEvent was incorrectly ignored and never
    sent if source enumerator had no more splits to assign/send.
    This was causing deadlocks, with watermark aligmnent not being
    able to progress in such cases.
---
 .../operators/coordination/OperatorEvent.java      | 10 ++-
 .../operators/coordination/SubtaskGatewayImpl.java |  6 ++
 .../source/coordinator/SourceCoordinator.java      | 15 ++--
 .../source/event/WatermarkAlignmentEvent.java      |  9 +++
 .../coordination/EventReceivingTasks.java          |  9 ++-
 .../coordination/SubtaskGatewayImplTest.java       | 25 +++++++
 .../operators/coordination/TestOperatorEvent.java  | 12 +++
 .../api/datastream/WatermarkAlignmentITCase.java   | 87 +++++++++++++++++++++-
 8 files changed, 158 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
index 179aed1bbae..e7ed03ccdd4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEvent.java
@@ -24,4 +24,12 @@ import java.io.Serializable;
  * Root interface for all events sent between {@link OperatorCoordinator} and 
an {@link
  * OperatorEventHandler}.
  */
-public interface OperatorEvent extends Serializable {}
+public interface OperatorEvent extends Serializable {
+    /**
+     * @return true if event is optional and an occasional loss or inability 
to deliver that event
+     *     doesn't affect the job's correctness.
+     */
+    default boolean isLossTolerant() {
+        return false;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 1f6a120c400..d1ea73bcabe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -111,6 +111,12 @@ class SubtaskGatewayImpl implements 
OperatorCoordinator.SubtaskGateway {
                 sendResult.whenCompleteAsync(
                         (success, failure) -> {
                             if (failure != null && 
subtaskAccess.isStillRunning()) {
+                                if (ExceptionUtils.findThrowable(
+                                                        failure, 
TaskNotRunningException.class)
+                                                .isPresent()
+                                        && evt.isLossTolerant()) {
+                                    return;
+                                }
                                 String msg =
                                         String.format(
                                                 EVENT_LOSS_ERROR_MESSAGE,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3133bbe7ce7..b04e8ffa12c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -199,15 +199,12 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT>
                 operatorName);
 
         for (Integer subtaskId : subTaskIds) {
-            // when subtask have been finished, do not send event.
-            if (!context.hasNoMoreSplits(subtaskId)) {
-                // Subtask maybe during deploying or restarting, so we only 
send
-                // WatermarkAlignmentEvent to ready task to avoid period task 
fail
-                // (Java-ThreadPoolExecutor will not schedule the period task 
if it throws an
-                // exception).
-                context.sendEventToSourceOperatorIfTaskReady(
-                        subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
-            }
+            // Subtask maybe during deploying or restarting, so we only send
+            // WatermarkAlignmentEvent to ready task to avoid period task fail
+            // (Java-ThreadPoolExecutor will not schedule the period task if 
it throws an
+            // exception).
+            context.sendEventToSourceOperatorIfTaskReady(
+                    subtaskId, new 
WatermarkAlignmentEvent(maxAllowedWatermark));
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
index 0055c66d0ba..61e412a8f7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/WatermarkAlignmentEvent.java
@@ -36,6 +36,15 @@ public class WatermarkAlignmentEvent implements 
OperatorEvent {
         return maxWatermark;
     }
 
+    /**
+     * @return true, because even if lost, {@link WatermarkAlignmentEvent} 
will re-send again, and
+     *     it doesn't hold any critical information that could lead to a data 
loss.
+     */
+    @Override
+    public boolean isLossTolerant() {
+        return true;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
index 15659cb5c6e..bd94d6f62c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/EventReceivingTasks.java
@@ -173,11 +173,12 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
 
     // ------------------------------------------------------------------------
 
-    private final class TestSubtaskAccess implements SubtaskAccess {
+    final class TestSubtaskAccess implements SubtaskAccess {
 
         private final ExecutionAttemptID executionAttemptId;
         private final CompletableFuture<?> running;
         private final int subtaskIndex;
+        private final List<Throwable> taskFailoverReasons = new ArrayList<>();
 
         private TestSubtaskAccess(int subtaskIndex, int attemptNumber, boolean 
isRunning) {
             this.subtaskIndex = subtaskIndex;
@@ -234,7 +235,11 @@ public class EventReceivingTasks implements 
SubtaskAccess.SubtaskAccessFactory {
 
         @Override
         public void triggerTaskFailover(Throwable cause) {
-            // ignore this in the tests
+            taskFailoverReasons.add(cause);
+        }
+
+        public List<Throwable> getTaskFailoverReasons() {
+            return taskFailoverReasons;
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
index cd19fcfd5d4..fb8bbd47d9c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.messages.Acknowledge;
 import 
org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
+import 
org.apache.flink.runtime.operators.coordination.EventReceivingTasks.TestSubtaskAccess;
 import 
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -166,6 +167,30 @@ class SubtaskGatewayImplTest {
         assertThat(future).isCompletedExceptionally();
     }
 
+    @Test
+    void optionalEventsIgnoreTaskNotRunning() {
+        final EventReceivingTasks receiver =
+                EventReceivingTasks.createForRunningTasksFailingRpcs(
+                        new FlinkException(new 
TaskNotRunningException("test")));
+        TestSubtaskAccess subtaskAccess =
+                (TestSubtaskAccess) 
getUniqueElement(receiver.getAccessesForSubtask(10));
+        final SubtaskGatewayImpl gateway =
+                new SubtaskGatewayImpl(
+                        subtaskAccess,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
+                        new IncompleteFuturesTracker());
+
+        gateway.markForCheckpoint(17L);
+        gateway.tryCloseGateway(17L);
+
+        final CompletableFuture<Acknowledge> future =
+                gateway.sendEvent(new TestOperatorEvent(42, true));
+        gateway.openGatewayAndUnmarkAllCheckpoint();
+
+        assertThat(future).isCompletedExceptionally();
+        assertThat(subtaskAccess.getTaskFailoverReasons()).isEmpty();
+    }
+
     private static <T> T getUniqueElement(Collection<T> collection) {
         Iterator<T> iterator = collection.iterator();
         T element = iterator.next();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
index 7eb1937c1d7..9fbcbcab079 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestOperatorEvent.java
@@ -23,14 +23,26 @@ public final class TestOperatorEvent implements 
OperatorEvent {
     private static final long serialVersionUID = 1L;
 
     private final int value;
+    private final boolean optional;
 
     public TestOperatorEvent() {
         // pick some random and rather unique value
         this.value = System.identityHashCode(this);
+        this.optional = false;
     }
 
     public TestOperatorEvent(int value) {
+        this(value, false);
+    }
+
+    public TestOperatorEvent(int value, boolean optional) {
         this.value = value;
+        this.optional = optional;
+    }
+
+    @Override
+    public boolean isLossTolerant() {
+        return optional;
     }
 
     public int getValue() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
index a257a62e249..41b7f98dacb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
@@ -21,19 +21,30 @@ package org.apache.flink.test.streaming.api.datastream;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SupportsBatchSnapshot;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** This ITCase class tests the behavior of task execution with watermark 
alignment. */
 class WatermarkAlignmentITCase {
 
@@ -45,13 +56,13 @@ class WatermarkAlignmentITCase {
     @Test
     void testTaskFinishedWithWatermarkAlignmentExecution() throws Exception {
         // Set up the execution environment with parallelism of 2
-        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(2);
 
         // Create a stream from a custom source with watermark strategy
         DataStream<Long> stream =
                 env.fromSource(
-                                new NumberSequenceSource(0, 100),
+                                new EagerlyFinishingNumberSequenceSource(0, 
100),
                                 
WatermarkStrategy.<Long>forMonotonousTimestamps()
                                         .withTimestampAssigner(
                                                 
(SerializableTimestampAssigner<Long>)
@@ -67,11 +78,81 @@ class WatermarkAlignmentITCase {
                                         });
 
         // Execute the stream and collect the results
-        final List<Long> result = stream.executeAndCollect(101);
+        List<Long> result = stream.executeAndCollect(101);
         Collections.sort(result);
 
         // Assert that the collected result contains all numbers from 0 to 100
         Assertions.assertIterableEquals(
                 result, LongStream.rangeClosed(0, 
100).boxed().collect(Collectors.toList()));
     }
+
+    static class EagerlyFinishingNumberSequenceSource extends 
NumberSequenceSource {
+        public EagerlyFinishingNumberSequenceSource(long from, long to) {
+            super(from, to);
+        }
+
+        @Override
+        public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>>
+                createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> 
enumContext) {
+
+            List<NumberSequenceSplit> splits =
+                    splitNumberRange(getFrom(), getTo(), 
enumContext.currentParallelism());
+            return new EagerlyFinishingIteratorSourceEnumerator(enumContext, 
splits);
+        }
+    }
+
+    /**
+     * Contrary to the {@link
+     * 
org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator}, this 
enumerator
+     * signals no more available splits as soon as possible.
+     */
+    static class EagerlyFinishingIteratorSourceEnumerator
+            implements SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
+                    SupportsBatchSnapshot {
+
+        private final SplitEnumeratorContext<NumberSequenceSplit> context;
+        private final Queue<NumberSequenceSplit> remainingSplits;
+
+        public EagerlyFinishingIteratorSourceEnumerator(
+                SplitEnumeratorContext<NumberSequenceSplit> context,
+                Collection<NumberSequenceSplit> splits) {
+            this.context = checkNotNull(context);
+            this.remainingSplits = new ArrayDeque<>(splits);
+            this.context
+                    .metricGroup()
+                    .setUnassignedSplitsGauge(() -> (long) 
remainingSplits.size());
+        }
+
+        @Override
+        public void start() {}
+
+        @Override
+        public void close() {}
+
+        @Override
+        public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+            NumberSequenceSplit nextSplit = remainingSplits.poll();
+            if (nextSplit != null) {
+                context.assignSplit(nextSplit, subtaskId);
+            }
+            if (remainingSplits.size() == 0) {
+                for (int i = 0; i < context.currentParallelism(); i++) {
+                    context.signalNoMoreSplits(i);
+                }
+            }
+        }
+
+        @Override
+        public void addSplitsBack(List<NumberSequenceSplit> splits, int 
subtaskId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Collection<NumberSequenceSplit> snapshotState(long 
checkpointId) throws Exception {
+            return remainingSplits;
+        }
+
+        @Override
+        public void addReader(int subtaskId) {}
+    }
 }

Reply via email to