This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 0dfb5abf038 [FLINK-31414][checkpointing] make BarrierAlignmentUtil.DelayableTimer not hidden exception 0dfb5abf038 is described below commit 0dfb5abf0384a2a9381e16c76449f483e38cba29 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Mon Mar 13 16:11:21 2023 +0800 [FLINK-31414][checkpointing] make BarrierAlignmentUtil.DelayableTimer not hidden exception --- .../io/checkpointing/BarrierAlignmentUtil.java | 4 +- .../io/checkpointing/BarrierAlignmentUtilTest.java | 74 ++++++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java index 85e5c701e39..c78a73b0832 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtil.java @@ -47,8 +47,8 @@ public class BarrierAlignmentUtil { timerService.registerTimer( timerService.getCurrentProcessingTime() + delay.toMillis(), timestamp -> - mailboxExecutor.submit( - callable, + mailboxExecutor.execute( + () -> callable.call(), "Execute checkpoint barrier handler delayed action")); return () -> scheduledFuture.cancel(false); }; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java new file mode 100644 index 00000000000..fa593490094 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/BarrierAlignmentUtilTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io.checkpointing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.time.Duration; + +import static org.junit.Assert.assertThrows; + +/** {@link BarrierAlignmentUtil} test. */ +public class BarrierAlignmentUtilTest { + + @Test + public void testDelayableTimerNotHiddenException() throws Exception { + TaskMailbox mailbox = new TaskMailboxImpl(); + MailboxProcessor mailboxProcessor = + new MailboxProcessor(controller -> {}, mailbox, StreamTaskActionExecutor.IMMEDIATE); + MailboxExecutor mailboxExecutor = + new MailboxExecutorImpl( + mailbox, 0, StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor); + + TestProcessingTimeService timerService = new TestProcessingTimeService(); + timerService.setCurrentTime(System.currentTimeMillis()); + + BarrierAlignmentUtil.DelayableTimer delayableTimer = + BarrierAlignmentUtil.createRegisterTimerCallback(mailboxExecutor, timerService); + + Duration delay = Duration.ofMinutes(10); + + delayableTimer.registerTask( + () -> { + // simulate Exception in checkpoint sync phase + throw new ExpectedTestException(); + }, + delay); + + timerService.advance(delay.toMillis()); + + Throwable t = + assertThrows( + "BarrierAlignmentUtil.DelayableTimer should not hidden exception", + Exception.class, + () -> mailboxProcessor.runMailboxStep()); + + ExceptionUtils.assertThrowable(t, ExpectedTestException.class); + } +}