This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eebbd461 [#2241] improvement(server): Introduce storage flush 
operation timeout cancel to avoid disk hang (#2247)
4eebbd461 is described below

commit 4eebbd461f97cbddf97a77cbf6234c82355c6e36
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Nov 14 18:08:05 2024 +0800

    [#2241] improvement(server): Introduce storage flush operation timeout 
cancel to avoid disk hang (#2247)
    
    ### What changes were proposed in this pull request?
    
    Introduce storage flush operation timeout cancel to avoid disk hang
    
    ### Why are the changes needed?
    
    For #2241
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    `rss.server.storage.flushOptionTimeoutSec` is introduced, the default value 
= -1 means this timeout cancel will not be activated by default
    
    ### How was this patch tested?
    
    unit tests and existing tests.
    
    ---------
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../common/future/CompletableFutureUtils.java      | 72 ++++++++++++++++++++
 .../common/future/CompletableFutureUtilsTest.java  | 78 ++++++++++++++++++++++
 .../uniffle/server/DefaultFlushEventHandler.java   | 25 ++++++-
 .../apache/uniffle/server/ShuffleServerConf.java   |  7 ++
 4 files changed, 181 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java
 
b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java
new file mode 100644
index 000000000..7a1419d60
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/future/CompletableFutureUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.future;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class CompletableFutureUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CompletableFutureUtils.class);
+
+  public static <T> T withTimeoutCancel(Supplier<T> supplier, long 
timeoutMills) throws Exception {
+    return withTimeoutCancel(supplier, timeoutMills, "");
+  }
+
+  public static <T> T withTimeoutCancel(
+      Supplier<T> supplier, long timeoutMills, String operationName) throws 
Exception {
+    CompletableFuture<T> future =
+        CompletableFuture.supplyAsync(
+            supplier,
+            
Executors.newSingleThreadExecutor(ThreadUtils.getThreadFactory(operationName)));
+    future.exceptionally(
+        throwable -> {
+          throw new RssException(throwable);
+        });
+
+    CompletableFuture<T> extended =
+        CompletableFutureExtension.orTimeout(future, timeoutMills, 
TimeUnit.MILLISECONDS);
+    try {
+      return extended.get();
+    } catch (Exception e) {
+      if (e instanceof ExecutionException) {
+        Throwable internalThrowable = e.getCause();
+        if (internalThrowable instanceof TimeoutException) {
+          LOGGER.error(
+              "The operation of [{}] haven't finished in the {}(millis). Drop 
this execution!",
+              operationName,
+              timeoutMills);
+          throw new TimeoutException();
+        }
+        if (internalThrowable instanceof Exception) {
+          throw (Exception) internalThrowable;
+        }
+      }
+      throw e;
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java
 
b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java
new file mode 100644
index 000000000..39d92f48c
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/future/CompletableFutureUtilsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.future;
+
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.exception.RssException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CompletableFutureUtilsTest {
+
+  @Test
+  public void timeoutTest() {
+    // case1: legal operation
+    Supplier<Integer> supplier = () -> 1;
+    try {
+      int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
+      assertEquals(1, result);
+    } catch (Exception e) {
+      fail();
+    }
+
+    // case2: illegal
+    supplier =
+        () -> {
+          try {
+            Thread.sleep(100000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          return 10;
+        };
+    try {
+      int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
+      fail();
+    } catch (Exception e) {
+      if (!(e instanceof TimeoutException)) {
+        fail();
+      }
+    }
+
+    // case3: fast fail when internal supplier throw exception
+    supplier =
+        () -> {
+          throw new RssException("Hello");
+        };
+    try {
+      int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
+      fail();
+    } catch (Exception e) {
+      if (e instanceof RssException) {
+        // ignore
+      } else {
+        fail();
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 56dcc02bf..49f6463ee 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -30,8 +30,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.executor.ThreadPoolManager;
 import org.apache.uniffle.common.function.ConsumerWithException;
+import org.apache.uniffle.common.future.CompletableFutureUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.flush.EventDiscardException;
 import org.apache.uniffle.server.flush.EventInvalidException;
@@ -42,6 +44,7 @@ import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static 
org.apache.uniffle.server.ShuffleServerConf.STORAGE_FLUSH_OPERATION_TIMEOUT_SEC;
 import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE;
 
 public class DefaultFlushEventHandler implements FlushEventHandler {
@@ -56,6 +59,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
   protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
   private ConsumerWithException<ShuffleDataFlushEvent> eventConsumer;
   private final ShuffleServer shuffleServer;
+  private final long flushMaxWaitTimeoutSec;
 
   private volatile boolean stopped = false;
 
@@ -65,6 +69,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
       ShuffleServer shuffleServer,
       ConsumerWithException<ShuffleDataFlushEvent> eventConsumer) {
     this.shuffleServerConf = conf;
+    this.flushMaxWaitTimeoutSec = 
conf.getLong(STORAGE_FLUSH_OPERATION_TIMEOUT_SEC);
     this.storageType =
         
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name());
     this.storageManager = storageManager;
@@ -83,6 +88,24 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
     }
   }
 
+  private void consumeEvent(ShuffleDataFlushEvent event) throws Exception {
+    if (flushMaxWaitTimeoutSec <= 0) {
+      eventConsumer.accept(event);
+      return;
+    }
+
+    Supplier<Void> supplier =
+        () -> {
+          try {
+            this.eventConsumer.accept(event);
+          } catch (Exception e) {
+            throw new RssException(e);
+          }
+          return null;
+        };
+    CompletableFutureUtils.withTimeoutCancel(supplier, flushMaxWaitTimeoutSec 
* 1000);
+  }
+
   /**
    * @param event
    * @param storage
@@ -95,7 +118,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
     try {
       readLock.lock();
       try {
-        eventConsumer.accept(event);
+        consumeEvent(event);
       } finally {
         readLock.unlock();
       }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a1a4f1f9e..fd2f5802c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -676,6 +676,13 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(10 * 60L)
           .withDescription("The storage remove resource operation timeout.");
 
+  public static final ConfigOption<Long> STORAGE_FLUSH_OPERATION_TIMEOUT_SEC =
+      ConfigOptions.key("rss.server.storage.flushOperationTimeoutSec")
+          .longType()
+          .defaultValue(-1L)
+          .withDescription(
+              "The storage flush max timeout second, this will not be 
activated by default");
+
   public static final ConfigOption<Boolean> SERVER_MERGE_ENABLE =
       ConfigOptions.key("rss.server.merge.enable")
           .booleanType()

Reply via email to