scwhittle commented on code in PR #32922:
URL: https://github.com/apache/beam/pull/32922#discussion_r1820519685


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/TerminatingExecutors.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.util;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.runners.dataflow.worker.WorkerUncaughtExceptionHandler;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.JvmRuntime;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Utility class for {@link java.util.concurrent.ExecutorService}s that will 
terminate the JVM on
+ * uncaught exceptions.
+ *
+ * @implNote Ensures that all threads produced by the {@link ExecutorService}s 
have a {@link
+ *     WorkerUncaughtExceptionHandler} attached to prevent hidden/silent 
exceptions and errors.
+ */
+public final class TerminatingExecutors {
+  private TerminatingExecutors() {}
+
+  public static TerminatingExecutorService newSingleThreadExecutor(
+      ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
+    return new TerminatingExecutorService(
+        
Executors.newSingleThreadExecutor(terminatingThreadFactory(threadFactoryBuilder,
 logger)));
+  }
+
+  public static TerminatingScheduledExecutorService 
newSingleThreadScheduledExecutor(
+      ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
+    return new TerminatingScheduledExecutorService(
+        Executors.newSingleThreadScheduledExecutor(
+            terminatingThreadFactory(threadFactoryBuilder, logger)));
+  }
+
+  public static TerminatingExecutorService newCachedThreadPool(
+      ThreadFactoryBuilder threadFactoryBuilder, Logger logger) {
+    return new TerminatingExecutorService(
+        
Executors.newCachedThreadPool(terminatingThreadFactory(threadFactoryBuilder, 
logger)));
+  }
+
+  public static TerminatingExecutorService newFixedThreadPool(
+      int numThreads, ThreadFactoryBuilder threadFactoryBuilder, Logger 
logger) {
+    return new TerminatingExecutorService(
+        Executors.newFixedThreadPool(
+            numThreads, terminatingThreadFactory(threadFactoryBuilder, 
logger)));
+  }
+
+  public static TerminatingExecutorService newSingleThreadedExecutorForTesting(
+      JvmRuntime jvmRuntime, ThreadFactoryBuilder threadFactoryBuilder, Logger 
logger) {
+    return new TerminatingExecutorService(
+        Executors.newSingleThreadExecutor(

Review Comment:
   I think it depends on the use case. There can be cases where the futures are 
being examined and having the exceptions manually handled by retrying etc.  If 
we are using schedule and not ignoring the future result then that seems like a 
case we wouldn't want a failure.  If we are using execute or otherwise ignoring 
the future it seems like we would want a failure.
   
   I think Arun's advice to use ExecutorService where we can makes sense.  We 
can look at the remaining scheduledexecutorservice cases to see if we want to 
fail or if the errors are already handled via examining the futures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to