This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d42111cd019 Merge pull request #37748: Where a Throwable is being 
caught, take care to rethrow OutOfMemoryError
d42111cd019 is described below

commit d42111cd0199e9ecee3cbc3211d54fab0aec76ff
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Mar 4 16:27:11 2026 +0000

    Merge pull request #37748: Where a Throwable is being caught, take care to 
rethrow OutOfMemoryError
---
 .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 18 ++++++++----------
 .../org/apache/beam/runners/direct/DirectRunner.java   |  5 ++---
 .../runners/dataflow/worker/BatchDataflowWorker.java   |  4 ++--
 .../dataflow/worker/GroupAlsoByWindowFnRunner.java     |  6 ++----
 .../dataflow/worker/GroupAlsoByWindowsParDoFn.java     |  2 ++
 .../runners/dataflow/worker/WorkerCustomSources.java   |  6 ++++++
 .../worker/logging/DataflowWorkerLoggingHandler.java   |  2 +-
 .../beam/runners/dataflow/worker/util/JfrInterop.java  |  2 ++
 .../runners/dataflow/worker/util/MemoryMonitor.java    |  2 +-
 .../util/common/worker/BatchingShuffleEntryReader.java |  2 +-
 .../worker/util/common/worker/WorkProgressUpdater.java |  4 ++++
 .../work/processing/StreamingCommitFinalizer.java      |  2 ++
 .../fnexecution/artifact/ArtifactStagingService.java   |  2 ++
 ...ReferenceCountingExecutableStageContextFactory.java |  2 ++
 .../environment/EmbeddedEnvironmentFactory.java        |  2 ++
 .../beam/sdk/fn/data/BeamFnDataOutboundAggregator.java |  2 ++
 .../org/apache/beam/sdk/options/ValueProvider.java     |  2 ++
 .../apache/beam/sdk/transforms/ApproximateUnique.java  |  2 ++
 .../beam/sdk/transforms/display/DisplayData.java       |  2 ++
 .../java/org/apache/beam/sdk/util/MoreFutures.java     |  4 ++++
 .../org/apache/beam/fn/harness/FnApiDoFnRunner.java    |  2 ++
 .../SplittablePairWithRestrictionDoFnRunner.java       |  2 ++
 .../SplittableSplitAndSizeRestrictionsDoFnRunner.java  |  2 ++
 .../SplittableTruncateSizedRestrictionsDoFnRunner.java |  2 ++
 .../beam/fn/harness/control/ProcessBundleHandler.java  |  2 ++
 .../beam/fn/harness/logging/BeamFnLoggingClient.java   |  2 ++
 .../beam/fn/harness/state/FnApiTimerBundleTracker.java |  2 ++
 27 files changed, 65 insertions(+), 22 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 05186ba5adb..a255467fc59 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -176,9 +176,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     // This can contain user code. Wrap it in case it throws an exception.
     try {
       invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider());
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
+    } catch (Exception e) {
+      throw wrapUserCodeException(e);
     }
   }
 
@@ -215,8 +214,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     // This can contain user code. Wrap it in case it throws an exception.
     try {
       invoker.invokeProcessElement(new DoFnProcessContext(elem));
-    } catch (Exception ex) {
-      throw wrapUserCodeException(ex);
+    } catch (Exception e) {
+      throw wrapUserCodeException(e);
     }
   }
 
@@ -225,9 +224,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     // This can contain user code. Wrap it in case it throws an exception.
     try {
       invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider());
-    } catch (Throwable t) {
-      // Exception in user code.
-      throw wrapUserCodeException(t);
+    } catch (Exception e) {
+      throw wrapUserCodeException(e);
     }
   }
 
@@ -237,8 +235,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         new OnWindowExpirationArgumentProvider<>(window, timestamp, key));
   }
 
-  private RuntimeException wrapUserCodeException(Throwable t) {
-    throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+  private RuntimeException wrapUserCodeException(Exception e) {
+    throw UserCodeException.wrapIf(!isSystemDoFn(), e);
   }
 
   private boolean isSystemDoFn() {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7db6f61fbfd..6997b590e01 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -223,10 +223,9 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
           result.waitUntilFinish();
         } catch (UserCodeException userException) {
           throw new PipelineExecutionException(userException.getCause());
+        } catch (RuntimeException | OutOfMemoryError e) {
+          throw e;
         } catch (Throwable t) {
-          if (t instanceof RuntimeException) {
-            throw (RuntimeException) t;
-          }
           throw new RuntimeException(t);
         }
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 7407c97619b..feaf5e74139 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -276,11 +276,11 @@ public class BatchDataflowWorker implements Closeable {
       executeWork(worker, progressUpdater);
       workItemStatusClient.reportSuccess();
       return true;
-
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable e) {
       workItemStatusClient.reportError(e);
       return false;
-
     } finally {
       if (worker != null) {
         try {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index 37e04316e23..4845bb0c98e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -92,11 +92,9 @@ public class GroupAlsoByWindowFnRunner<InputT, OutputT> 
implements DoFnRunner<In
     // This can contain user code. Wrap it in case it throws an exception.
     try {
       fn.processElement(elem.getValue(), options, stepContext, 
sideInputReader, outputManager);
+    } catch (RuntimeException ex) {
+      throw ex;
     } catch (Exception ex) {
-      if (ex instanceof RuntimeException) {
-        throw (RuntimeException) ex;
-      }
-
       throw new RuntimeException(ex);
     }
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
index 9f5e96297e4..882dd497e3f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
@@ -172,6 +172,8 @@ public class GroupAlsoByWindowsParDoFn<InputT, K, V, W 
extends BoundedWindow> im
         output -> {
           try {
             receiver.process(output);
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             throw new RuntimeException(t);
           }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 0181e647ac7..9585b1c42bd 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -680,6 +680,8 @@ public class WorkerCustomSources {
         if (fractionConsumed != null) {
           progress.setFractionConsumed(fractionConsumed);
         }
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable t) {
         LOG.warn("Error estimating fraction consumed from reader {}", reader, 
t);
       }
@@ -689,6 +691,8 @@ public class WorkerCustomSources {
         if (parallelism != null) {
           progress.setConsumedParallelism(parallelism);
         }
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable t) {
         LOG.warn("Error estimating consumed parallelism from reader {}", 
reader, t);
       }
@@ -698,6 +702,8 @@ public class WorkerCustomSources {
         if (parallelism != null) {
           progress.setRemainingParallelism(parallelism);
         }
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable t) {
         LOG.warn("Error estimating remaining parallelism from reader {}", 
reader, t);
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 864887f9bd3..14af9851080 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -377,7 +377,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
       if (manager != null) {
         manager.error(message, e, code);
       }
-    } catch (Throwable t) {
+    } catch (Exception ex) {
       // Failed to report logging failure. No meaningful action left.
     }
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
index 936b310071f..3669e2dccae 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
@@ -59,6 +59,8 @@ class JfrInterop {
       recordingClose = jfrClass.getMethod("close");
 
       jfrConfig = configClass.getMethod("getConfiguration", 
String.class).invoke(null, "profile");
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw new RuntimeException(t);
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
index 80d32fc6a98..e0288e3433e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
@@ -529,7 +529,7 @@ public class MemoryMonitor implements Runnable, 
StatusDataProvider {
 
     // Clearing this list should "release" some memory that will be needed to 
dump the heap.
     // We could try to reallocate it again if we later notice memory pressure 
has subsided,
-    // but that is risk. Further, leaving this released may help with the 
memory pressure.
+    // but that is risky. Further, leaving this released may help with the 
memory pressure.
     reservedForDumpingHeap = null;
 
     try {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
index 31dea074aa6..f6918267f0b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
@@ -129,7 +129,7 @@ public final class BatchingShuffleEntryReader implements 
ShuffleEntryReader {
         nextStartPosition = batch.nextStartPosition;
         entries = batch.entries.listIterator();
         currentBatch = batch;
-      } catch (RuntimeException e) {
+      } catch (RuntimeException | OutOfMemoryError e) {
         throw e;
       } catch (Throwable t) {
         throw new RuntimeException(t);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
index f04e9a47afe..5b9c828e16c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
@@ -292,6 +292,8 @@ public abstract class WorkProgressUpdater {
           checkpointState = CheckpointState.CHECKPOINT_SUCCESSFUL;
           return true;
         }
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable e) {
         LOG.warn("Error trying to checkpoint the worker: ", e);
       }
@@ -305,6 +307,8 @@ public abstract class WorkProgressUpdater {
     LOG.debug("Updating progress on work item {}", workString());
     try {
       reportProgressHelper();
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (InterruptedException e) {
       LOG.info("Cancelling workitem execution: {}", workString(), e);
       worker.abort();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
index d663b4fca27..7116aed3a2b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
@@ -74,6 +74,8 @@ final class StreamingCommitFinalizer {
             () -> {
               try {
                 finalizeCommit.run();
+              } catch (OutOfMemoryError oom) {
+                throw oom;
               } catch (Throwable t) {
                 LOG.error("Source checkpoint finalization failed:", t);
               }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
index c64d8ce7733..0e38abb1b78 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
@@ -583,6 +583,8 @@ public class ArtifactStagingService
       if (completionFuture.isCompletedExceptionally()) {
         try {
           completionFuture.get();
+        } catch (OutOfMemoryError oom) {
+          throw oom;
         } catch (Throwable th) {
           responseObserver.onError(th);
           return;
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
index 45b1b1942c4..4bee41a630b 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
@@ -180,6 +180,8 @@ public class ReferenceCountingExecutableStageContextFactory
         if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
           try {
             wrapper.closeActual();
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             LOG.error("Unable to close ExecutableStageContext.", t);
           }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index 470692e7510..3f3bd02dd8a 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -139,6 +139,8 @@ public class EmbeddedEnvironmentFactory implements 
EnvironmentFactory {
         () -> {
           try {
             fnHarness.get();
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             // Print stacktrace to stderr. Could be useful if underlying error 
not surfaced earlier
             t.printStackTrace();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index aaa27ef7445..109a057585a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -268,6 +268,8 @@ public class BeamFnDataOutboundAggregator {
       synchronized (flushLock) {
         flushInternal();
       }
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw new RuntimeException(t);
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 9cfedd42c77..896eacb28ff 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -269,6 +269,8 @@ public interface ValueProvider<T> extends Serializable {
           return result.get();
         }
         return defaultValue;
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable e) {
         throw new RuntimeException("Unable to load runtime value.", e);
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index fbf134e4cd2..2ee4eeba6cb 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -384,6 +384,8 @@ public class ApproximateUnique {
       try {
         heap.add(hash(input, coder));
         return heap;
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable e) {
         throw new RuntimeException(e);
       }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 966145f0836..3be59e55615 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -781,6 +781,8 @@ public class DisplayData implements Serializable {
       } catch (PopulateDisplayDataException e) {
         // Don't re-wrap exceptions recursively.
         throw e;
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable e) {
         String msg =
             String.format(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index 441e604af3a..40e624db144 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -103,6 +103,8 @@ public class MoreFutures {
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new CompletionException(e);
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             throw new CompletionException(t);
           }
@@ -132,6 +134,8 @@ public class MoreFutures {
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new CompletionException(e);
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             throw new CompletionException(t);
           }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 052ff324490..3893c0f405e 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1321,6 +1321,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     }
     try {
       consumer.accept(output);
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw UserCodeException.wrap(t);
     }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
index 3bfb914a4a9..e83db404cb7 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
@@ -226,6 +226,8 @@ public class SplittablePairWithRestrictionDoFnRunner<
   private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer, 
WindowedValue<T> output) {
     try {
       consumer.accept(output);
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw UserCodeException.wrap(t);
     }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
index e42cbdaf643..fac83485d22 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
@@ -316,6 +316,8 @@ public class SplittableSplitAndSizeRestrictionsDoFnRunner<
   private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer, 
WindowedValue<T> output) {
     try {
       consumer.accept(output);
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw UserCodeException.wrap(t);
     }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
index 6c300295eb6..56c109da84b 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
@@ -764,6 +764,8 @@ public class SplittableTruncateSizedRestrictionsDoFnRunner<
     }
     try {
       consumer.accept(output);
+    } catch (OutOfMemoryError oom) {
+      throw oom;
     } catch (Throwable t) {
       throw UserCodeException.wrap(t);
     }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index b8ad51816a7..f773e794579 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -1188,6 +1188,8 @@ public class ProcessBundleHandler {
         for (ThrowingRunnable teardownFunction : 
Lists.reverse(this.getTearDownFunctions())) {
           try {
             teardownFunction.run();
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable e) {
             LOG.warn(
                 "Exceptions are thrown from DoFn.teardown method when trying 
to discard "
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 99cfeb5f799..99a7ee64832 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -265,6 +265,8 @@ public class BeamFnLoggingClient implements LoggingClient {
             throw new RuntimeException(e);
           }
         }
+      } catch (OutOfMemoryError oom) {
+        throw oom;
       } catch (Throwable t) {
         thrown = t;
         throw new RuntimeException(t);
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
index e5e0e6d29ea..b9bc79968c3 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
@@ -198,6 +198,8 @@ public class FnApiTimerBundleTracker<K> {
             if (timerFamilyOrId != null && timer != null) {
               
getTimersReceiverFromTimerIdFn.apply(timerFamilyOrId).accept(timer);
             }
+          } catch (OutOfMemoryError oom) {
+            throw oom;
           } catch (Throwable t) {
             throw UserCodeException.wrap(t);
           }

Reply via email to