This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 77cd1cd72fb [FLINK-35528][task] Skip execution of interruptible mails 
when yielding
77cd1cd72fb is described below

commit 77cd1cd72fbfa369d6f298d09b72dea8613774aa
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Thu May 23 13:04:44 2024 +0200

    [FLINK-35528][task] Skip execution of interruptible mails when yielding
    
    When operators are yielding, for example waiting for async state access to 
complete before a checkpoint, it would be beneficial to not execute 
interruptible mails. Otherwise continuation mail for firing timers would be 
continuously re-enqeueed. To achieve that MailboxExecutor must be aware which 
mails are interruptible.
    
    The easiest way to achieve this is to set MIN_PRIORITY for interruptible 
mails.
---
 .../api/common/operators/MailOptionsImpl.java      | 37 +++++++++++++
 .../api/common/operators/MailboxExecutor.java      | 64 ++++++++++++++++++++++
 .../AsyncExecutionControllerTest.java              |  1 +
 .../flink/runtime/mailbox/SyncMailboxExecutor.java |  1 +
 .../api/operators/MailboxWatermarkProcessor.java   |  1 +
 .../streaming/runtime/tasks/mailbox/Mail.java      | 13 ++++-
 .../runtime/tasks/mailbox/MailboxExecutorImpl.java |  8 ++-
 .../streaming/api/functions/PrintSinkTest.java     |  1 +
 .../operators/MailboxWatermarkProcessorTest.java   |  9 ++-
 .../tasks/mailbox/MailboxExecutorImplTest.java     | 20 +++++++
 10 files changed, 152 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
new file mode 100644
index 00000000000..3bb5b771309
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailOptionsImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.Internal;
+
+/** Options to configure behaviour of executing mailbox mails. */
+@Internal
+public class MailOptionsImpl implements MailboxExecutor.MailOptions {
+    static final MailboxExecutor.MailOptions DEFAULT = new 
MailOptionsImpl(false);
+    static final MailboxExecutor.MailOptions DEFERRABLE = new 
MailOptionsImpl(true);
+
+    private final boolean deferrable;
+
+    private MailOptionsImpl(boolean deferrable) {
+        this.deferrable = deferrable;
+    }
+
+    public boolean isDeferrable() {
+        return deferrable;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
index 639ed18b673..79e17931a3d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/MailboxExecutor.java
@@ -86,6 +86,26 @@ public interface MailboxExecutor {
     /** A constant for empty args to save on object allocation. */
     Object[] EMPTY_ARGS = new Object[0];
 
+    /** Extra options to configure enqueued mails. */
+    @PublicEvolving
+    interface MailOptions {
+        static MailOptions options() {
+            return MailOptionsImpl.DEFAULT;
+        }
+
+        /**
+         * Mark this mail as deferrable.
+         *
+         * <p>Runtime can decide to defer execution of deferrable mails. For 
example, to unblock
+         * subtask thread as quickly as possible, deferrable mails are not 
executed during {@link
+         * #yield()} or {@link #tryYield()}. This is done to speed up 
checkpointing, by skipping
+         * execution of potentially long-running mails.
+         */
+        static MailOptions deferrable() {
+            return MailOptionsImpl.DEFERRABLE;
+        }
+    }
+
     /**
      * Executes the given command at some time in the future in the mailbox 
thread.
      *
@@ -110,6 +130,49 @@ public interface MailboxExecutor {
      * The description may contain placeholder that refer to the provided 
description arguments
      * using {@link java.util.Formatter} syntax. The actual description is 
only formatted on demand.
      *
+     * @param mailOptions additional options to configure behaviour of the 
{@code command}
+     * @param command the runnable task to add to the mailbox for execution.
+     * @param description the optional description for the command that is 
used for debugging and
+     *     error-reporting.
+     * @throws RejectedExecutionException if this task cannot be accepted for 
execution, e.g.
+     *     because the mailbox is quiesced or closed.
+     */
+    default void execute(
+            MailOptions mailOptions,
+            ThrowingRunnable<? extends Exception> command,
+            String description) {
+        execute(mailOptions, command, description, EMPTY_ARGS);
+    }
+
+    /**
+     * Executes the given command at some time in the future in the mailbox 
thread.
+     *
+     * <p>An optional description can (and should) be added to ease debugging 
and error-reporting.
+     * The description may contain placeholder that refer to the provided 
description arguments
+     * using {@link java.util.Formatter} syntax. The actual description is 
only formatted on demand.
+     *
+     * @param command the runnable task to add to the mailbox for execution.
+     * @param descriptionFormat the optional description for the command that 
is used for debugging
+     *     and error-reporting.
+     * @param descriptionArgs the parameters used to format the final 
description string.
+     * @throws RejectedExecutionException if this task cannot be accepted for 
execution, e.g.
+     *     because the mailbox is quiesced or closed.
+     */
+    default void execute(
+            ThrowingRunnable<? extends Exception> command,
+            String descriptionFormat,
+            Object... descriptionArgs) {
+        execute(MailOptions.options(), command, descriptionFormat, 
descriptionArgs);
+    }
+
+    /**
+     * Executes the given command at some time in the future in the mailbox 
thread.
+     *
+     * <p>An optional description can (and should) be added to ease debugging 
and error-reporting.
+     * The description may contain placeholder that refer to the provided 
description arguments
+     * using {@link java.util.Formatter} syntax. The actual description is 
only formatted on demand.
+     *
+     * @param mailOptions additional options to configure behaviour of the 
{@code command}
      * @param command the runnable task to add to the mailbox for execution.
      * @param descriptionFormat the optional description for the command that 
is used for debugging
      *     and error-reporting.
@@ -118,6 +181,7 @@ public interface MailboxExecutor {
      *     because the mailbox is quiesced or closed.
      */
     void execute(
+            MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> command,
             String descriptionFormat,
             Object... descriptionArgs);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index f99dd877cf7..e98a66b82a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -824,6 +824,7 @@ class AsyncExecutionControllerTest {
 
         @Override
         public void execute(
+                MailOptions mailOptions,
                 ThrowingRunnable<? extends Exception> command,
                 String descriptionFormat,
                 Object... descriptionArgs) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
index 13ab803a385..5b862849938 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/mailbox/SyncMailboxExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 public class SyncMailboxExecutor implements MailboxExecutor {
     @Override
     public void execute(
+            MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> command,
             String descriptionFormat,
             Object... descriptionArgs) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
index 02cfd966c1c..76eabe6dec1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
@@ -77,6 +77,7 @@ public class MailboxWatermarkProcessor<OUT> {
             progressWatermarkScheduled = true;
             // We still have work to do, but we need to let other mails to be 
processed first.
             mailboxExecutor.execute(
+                    MailboxExecutor.MailOptions.deferrable(),
                     () -> {
                         progressWatermarkScheduled = false;
                         emitWatermarkInsideMailbox();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
index bed96ae559a..6afd8918e3d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/Mail.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.runtime.tasks.mailbox;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailOptionsImpl;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -30,6 +32,7 @@ import java.util.concurrent.Future;
  */
 @Internal
 public class Mail {
+    private final MailOptionsImpl mailOptions;
     /** The action to execute. */
     private final ThrowingRunnable<? extends Exception> runnable;
     /**
@@ -50,6 +53,7 @@ public class Mail {
             String descriptionFormat,
             Object... descriptionArgs) {
         this(
+                MailboxExecutor.MailOptions.options(),
                 runnable,
                 priority,
                 StreamTaskActionExecutor.IMMEDIATE,
@@ -58,11 +62,13 @@ public class Mail {
     }
 
     public Mail(
+            MailboxExecutor.MailOptions mailOptions,
             ThrowingRunnable<? extends Exception> runnable,
             int priority,
             StreamTaskActionExecutor actionExecutor,
             String descriptionFormat,
             Object... descriptionArgs) {
+        this.mailOptions = (MailOptionsImpl) mailOptions;
         this.runnable = Preconditions.checkNotNull(runnable);
         this.priority = priority;
         this.descriptionFormat =
@@ -71,8 +77,13 @@ public class Mail {
         this.actionExecutor = actionExecutor;
     }
 
+    public MailboxExecutor.MailOptions getMailOptions() {
+        return mailOptions;
+    }
+
     public int getPriority() {
-        return priority;
+        /** See {@link MailboxExecutor.MailOptions#deferrable()} ()}. */
+        return mailOptions.isDeferrable() ? TaskMailbox.MIN_PRIORITY : 
priority;
     }
 
     public void tryCancel(boolean mayInterruptIfRunning) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
index b2c8eac935a..2987ee21cca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImpl.java
@@ -67,13 +67,19 @@ public final class MailboxExecutorImpl implements 
MailboxExecutor {
 
     @Override
     public void execute(
+            MailOptions mailOptions,
             final ThrowingRunnable<? extends Exception> command,
             final String descriptionFormat,
             final Object... descriptionArgs) {
         try {
             mailbox.put(
                     new Mail(
-                            command, priority, actionExecutor, 
descriptionFormat, descriptionArgs));
+                            mailOptions,
+                            command,
+                            priority,
+                            actionExecutor,
+                            descriptionFormat,
+                            descriptionArgs));
         } catch (MailboxClosedException mbex) {
             throw new RejectedExecutionException(mbex);
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
index 88e0bf3bf46..bc92ac15184 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkTest.java
@@ -235,6 +235,7 @@ class PrintSinkTest {
 
         @Override
         public void execute(
+                MailOptions mailOptions,
                 ThrowingRunnable<? extends Exception> command,
                 String descriptionFormat,
                 Object... descriptionArgs) {}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
index 9fb9d348525..87f19f4032d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -43,6 +44,7 @@ class MailboxWatermarkProcessorTest {
 
     @Test
     void testEmitWatermarkInsideMailbox() throws Exception {
+        int priority = 42;
         final List<StreamElement> emittedElements = new ArrayList<>();
         final TaskMailboxImpl mailbox = new TaskMailboxImpl();
         final InternalTimeServiceManager<?> timerService = new 
NoOpInternalTimeServiceManager();
@@ -50,7 +52,8 @@ class MailboxWatermarkProcessorTest {
         final MailboxWatermarkProcessor<StreamRecord<String>> 
watermarkProcessor =
                 new MailboxWatermarkProcessor<>(
                         new CollectorOutput<>(emittedElements),
-                        new MailboxExecutorImpl(mailbox, 0, 
StreamTaskActionExecutor.IMMEDIATE),
+                        new MailboxExecutorImpl(
+                                mailbox, priority, 
StreamTaskActionExecutor.IMMEDIATE),
                         timerService);
         final List<Watermark> expectedOutput = new ArrayList<>();
         watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
@@ -69,6 +72,10 @@ class MailboxWatermarkProcessorTest {
 
         assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
 
+        // FLINK-35528: do not allow yielding to continuation mails
+        assertThat(mailbox.tryTake(priority)).isEqualTo(Optional.empty());
+        assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+
         while (mailbox.hasMail()) {
             mailbox.take(TaskMailbox.MIN_PRIORITY).run();
         }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
index cfeb92b2aa1..1b6fddc7d91 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxExecutorImplTest.java
@@ -110,6 +110,26 @@ class MailboxExecutorImplTest {
         assertThat(wasExecuted).isTrue();
     }
 
+    @Test
+    void testDeferrable() throws Exception {
+        int priority = 42;
+        MailboxExecutor localExecutor = 
mailboxProcessor.getMailboxExecutor(priority);
+
+        AtomicBoolean deferrableMailExecuted = new AtomicBoolean();
+
+        localExecutor.execute(
+                MailboxExecutor.MailOptions.deferrable(),
+                () -> deferrableMailExecuted.set(true),
+                "deferrable mail");
+        assertThat(localExecutor.tryYield()).isFalse();
+        assertThat(deferrableMailExecuted.get()).isFalse();
+        assertThat(mailboxExecutor.tryYield()).isFalse();
+        assertThat(deferrableMailExecuted.get()).isFalse();
+
+        assertThat(mailboxProcessor.runMailboxStep()).isTrue();
+        assertThat(deferrableMailExecuted.get()).isTrue();
+    }
+
     @Test
     void testClose() throws Exception {
         final TestRunnable yieldRun = new TestRunnable();

Reply via email to