This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.10 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 6cafe465f0770b5f83b63a1d75acee319184559a Author: Junfan Zhang <[email protected]> AuthorDate: Thu Nov 14 18:08:05 2024 +0800 [#2241] improvement(server): Introduce storage flush operation timeout cancel to avoid disk hang (#2247) ### What changes were proposed in this pull request? Introduce storage flush operation timeout cancel to avoid disk hang ### Why are the changes needed? For #2241 ### Does this PR introduce _any_ user-facing change? Yes. `rss.server.storage.flushOptionTimeoutSec` is introduced, the default value = -1 means this timeout cancel will not be activated by default ### How was this patch tested? unit tests and existing tests. --------- Co-authored-by: Junfan Zhang <[email protected]> --- .../common/future/CompletableFutureUtils.java | 72 ++++++++++++++++++++ .../common/future/CompletableFutureUtilsTest.java | 78 ++++++++++++++++++++++ .../uniffle/server/DefaultFlushEventHandler.java | 25 ++++++- .../apache/uniffle/server/ShuffleServerConf.java | 7 ++ 4 files changed, 181 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java new file mode 100644 index 000000000..7a1419d60 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.ThreadUtils; + +public class CompletableFutureUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CompletableFutureUtils.class); + + public static <T> T withTimeoutCancel(Supplier<T> supplier, long timeoutMills) throws Exception { + return withTimeoutCancel(supplier, timeoutMills, ""); + } + + public static <T> T withTimeoutCancel( + Supplier<T> supplier, long timeoutMills, String operationName) throws Exception { + CompletableFuture<T> future = + CompletableFuture.supplyAsync( + supplier, + Executors.newSingleThreadExecutor(ThreadUtils.getThreadFactory(operationName))); + future.exceptionally( + throwable -> { + throw new RssException(throwable); + }); + + CompletableFuture<T> extended = + CompletableFutureExtension.orTimeout(future, timeoutMills, TimeUnit.MILLISECONDS); + try { + return extended.get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + Throwable internalThrowable = e.getCause(); + if (internalThrowable instanceof TimeoutException) { + LOGGER.error( + "The operation of [{}] haven't finished in the {}(millis). Drop this execution!", + operationName, + timeoutMills); + throw new TimeoutException(); + } + if (internalThrowable instanceof Exception) { + throw (Exception) internalThrowable; + } + } + throw e; + } + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java new file mode 100644 index 000000000..39d92f48c --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.future; + +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.exception.RssException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CompletableFutureUtilsTest { + + @Test + public void timeoutTest() { + // case1: legal operation + Supplier<Integer> supplier = () -> 1; + try { + int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100); + assertEquals(1, result); + } catch (Exception e) { + fail(); + } + + // case2: illegal + supplier = + () -> { + try { + Thread.sleep(100000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 10; + }; + try { + int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100); + fail(); + } catch (Exception e) { + if (!(e instanceof TimeoutException)) { + fail(); + } + } + + // case3: fast fail when internal supplier throw exception + supplier = + () -> { + throw new RssException("Hello"); + }; + try { + int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100); + fail(); + } catch (Exception e) { + if (e instanceof RssException) { + // ignore + } else { + fail(); + } + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java index 56dcc02bf..49f6463ee 100644 --- a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java @@ -30,8 +30,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.executor.ThreadPoolManager; import org.apache.uniffle.common.function.ConsumerWithException; +import org.apache.uniffle.common.future.CompletableFutureUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.server.flush.EventDiscardException; import org.apache.uniffle.server.flush.EventInvalidException; @@ -42,6 +44,7 @@ import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.common.Storage; import org.apache.uniffle.storage.util.StorageType; +import static org.apache.uniffle.server.ShuffleServerConf.STORAGE_FLUSH_OPERATION_TIMEOUT_SEC; import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE; public class DefaultFlushEventHandler implements FlushEventHandler { @@ -56,6 +59,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler { protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue(); private ConsumerWithException<ShuffleDataFlushEvent> eventConsumer; private final ShuffleServer shuffleServer; + private final long flushMaxWaitTimeoutSec; private volatile boolean stopped = false; @@ -65,6 +69,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler { ShuffleServer shuffleServer, ConsumerWithException<ShuffleDataFlushEvent> eventConsumer) { this.shuffleServerConf = conf; + this.flushMaxWaitTimeoutSec = conf.getLong(STORAGE_FLUSH_OPERATION_TIMEOUT_SEC); this.storageType = StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name()); this.storageManager = storageManager; @@ -83,6 +88,24 @@ public class DefaultFlushEventHandler implements FlushEventHandler { } } + private void consumeEvent(ShuffleDataFlushEvent event) throws Exception { + if (flushMaxWaitTimeoutSec <= 0) { + eventConsumer.accept(event); + return; + } + + Supplier<Void> supplier = + () -> { + try { + this.eventConsumer.accept(event); + } catch (Exception e) { + throw new RssException(e); + } + return null; + }; + CompletableFutureUtils.withTimeoutCancel(supplier, flushMaxWaitTimeoutSec * 1000); + } + /** * @param event * @param storage @@ -95,7 +118,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler { try { readLock.lock(); try { - eventConsumer.accept(event); + consumeEvent(event); } finally { readLock.unlock(); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index a1a4f1f9e..fd2f5802c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -676,6 +676,13 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(10 * 60L) .withDescription("The storage remove resource operation timeout."); + public static final ConfigOption<Long> STORAGE_FLUSH_OPERATION_TIMEOUT_SEC = + ConfigOptions.key("rss.server.storage.flushOperationTimeoutSec") + .longType() + .defaultValue(-1L) + .withDescription( + "The storage flush max timeout second, this will not be activated by default"); + public static final ConfigOption<Boolean> SERVER_MERGE_ENABLE = ConfigOptions.key("rss.server.merge.enable") .booleanType()
