This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 0dfb5abf038 [FLINK-31414][checkpointing] make 
BarrierAlignmentUtil.DelayableTimer not hidden exception
0dfb5abf038 is described below

commit 0dfb5abf0384a2a9381e16c76449f483e38cba29
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Mon Mar 13 16:11:21 2023 +0800

    [FLINK-31414][checkpointing] make BarrierAlignmentUtil.DelayableTimer not 
hidden exception
---
 .../io/checkpointing/BarrierAlignmentUtil.java     |  4 +-
 .../io/checkpointing/BarrierAlignmentUtilTest.java | 74 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
index 85e5c701e39..c78a73b0832 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java
@@ -47,8 +47,8 @@ public class BarrierAlignmentUtil {
                     timerService.registerTimer(
                             timerService.getCurrentProcessingTime() + 
delay.toMillis(),
                             timestamp ->
-                                    mailboxExecutor.submit(
-                                            callable,
+                                    mailboxExecutor.execute(
+                                            () -> callable.call(),
                                             "Execute checkpoint barrier 
handler delayed action"));
             return () -> scheduledFuture.cancel(false);
         };
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java
new file mode 100644
index 00000000000..fa593490094
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.checkpointing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertThrows;
+
+/** {@link BarrierAlignmentUtil} test. */
+public class BarrierAlignmentUtilTest {
+
+    @Test
+    public void testDelayableTimerNotHiddenException() throws Exception {
+        TaskMailbox mailbox = new TaskMailboxImpl();
+        MailboxProcessor mailboxProcessor =
+                new MailboxProcessor(controller -> {}, mailbox, 
StreamTaskActionExecutor.IMMEDIATE);
+        MailboxExecutor mailboxExecutor =
+                new MailboxExecutorImpl(
+                        mailbox, 0, StreamTaskActionExecutor.IMMEDIATE, 
mailboxProcessor);
+
+        TestProcessingTimeService timerService = new 
TestProcessingTimeService();
+        timerService.setCurrentTime(System.currentTimeMillis());
+
+        BarrierAlignmentUtil.DelayableTimer delayableTimer =
+                
BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService);
+
+        Duration delay = Duration.ofMinutes(10);
+
+        delayableTimer.registerTask(
+                () -> {
+                    // simulate Exception in checkpoint sync phase
+                    throw new ExpectedTestException();
+                },
+                delay);
+
+        timerService.advance(delay.toMillis());
+
+        Throwable t =
+                assertThrows(
+                        "BarrierAlignmentUtil.DelayableTimer should not hidden 
exception",
+                        Exception.class,
+                        () -> mailboxProcessor.runMailboxStep());
+
+        ExceptionUtils.assertThrowable(t, ExpectedTestException.class);
+    }
+}

Reply via email to