This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d42111cd019 Merge pull request #37748: Where a Throwable is being
caught, take care to rethrow OutOfMemoryError
d42111cd019 is described below
commit d42111cd0199e9ecee3cbc3211d54fab0aec76ff
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Mar 4 16:27:11 2026 +0000
Merge pull request #37748: Where a Throwable is being caught, take care to
rethrow OutOfMemoryError
---
.../org/apache/beam/runners/core/SimpleDoFnRunner.java | 18 ++++++++----------
.../org/apache/beam/runners/direct/DirectRunner.java | 5 ++---
.../runners/dataflow/worker/BatchDataflowWorker.java | 4 ++--
.../dataflow/worker/GroupAlsoByWindowFnRunner.java | 6 ++----
.../dataflow/worker/GroupAlsoByWindowsParDoFn.java | 2 ++
.../runners/dataflow/worker/WorkerCustomSources.java | 6 ++++++
.../worker/logging/DataflowWorkerLoggingHandler.java | 2 +-
.../beam/runners/dataflow/worker/util/JfrInterop.java | 2 ++
.../runners/dataflow/worker/util/MemoryMonitor.java | 2 +-
.../util/common/worker/BatchingShuffleEntryReader.java | 2 +-
.../worker/util/common/worker/WorkProgressUpdater.java | 4 ++++
.../work/processing/StreamingCommitFinalizer.java | 2 ++
.../fnexecution/artifact/ArtifactStagingService.java | 2 ++
...ReferenceCountingExecutableStageContextFactory.java | 2 ++
.../environment/EmbeddedEnvironmentFactory.java | 2 ++
.../beam/sdk/fn/data/BeamFnDataOutboundAggregator.java | 2 ++
.../org/apache/beam/sdk/options/ValueProvider.java | 2 ++
.../apache/beam/sdk/transforms/ApproximateUnique.java | 2 ++
.../beam/sdk/transforms/display/DisplayData.java | 2 ++
.../java/org/apache/beam/sdk/util/MoreFutures.java | 4 ++++
.../org/apache/beam/fn/harness/FnApiDoFnRunner.java | 2 ++
.../SplittablePairWithRestrictionDoFnRunner.java | 2 ++
.../SplittableSplitAndSizeRestrictionsDoFnRunner.java | 2 ++
.../SplittableTruncateSizedRestrictionsDoFnRunner.java | 2 ++
.../beam/fn/harness/control/ProcessBundleHandler.java | 2 ++
.../beam/fn/harness/logging/BeamFnLoggingClient.java | 2 ++
.../beam/fn/harness/state/FnApiTimerBundleTracker.java | 2 ++
27 files changed, 65 insertions(+), 22 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 05186ba5adb..a255467fc59 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -176,9 +176,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider());
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
+ } catch (Exception e) {
+ throw wrapUserCodeException(e);
}
}
@@ -215,8 +214,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeProcessElement(new DoFnProcessContext(elem));
- } catch (Exception ex) {
- throw wrapUserCodeException(ex);
+ } catch (Exception e) {
+ throw wrapUserCodeException(e);
}
}
@@ -225,9 +224,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
// This can contain user code. Wrap it in case it throws an exception.
try {
invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider());
- } catch (Throwable t) {
- // Exception in user code.
- throw wrapUserCodeException(t);
+ } catch (Exception e) {
+ throw wrapUserCodeException(e);
}
}
@@ -237,8 +235,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
new OnWindowExpirationArgumentProvider<>(window, timestamp, key));
}
- private RuntimeException wrapUserCodeException(Throwable t) {
- throw UserCodeException.wrapIf(!isSystemDoFn(), t);
+ private RuntimeException wrapUserCodeException(Exception e) {
+ throw UserCodeException.wrapIf(!isSystemDoFn(), e);
}
private boolean isSystemDoFn() {
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7db6f61fbfd..6997b590e01 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -223,10 +223,9 @@ public class DirectRunner extends
PipelineRunner<DirectPipelineResult> {
result.waitUntilFinish();
} catch (UserCodeException userException) {
throw new PipelineExecutionException(userException.getCause());
+ } catch (RuntimeException | OutOfMemoryError e) {
+ throw e;
} catch (Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
throw new RuntimeException(t);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index 7407c97619b..feaf5e74139 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -276,11 +276,11 @@ public class BatchDataflowWorker implements Closeable {
executeWork(worker, progressUpdater);
workItemStatusClient.reportSuccess();
return true;
-
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
workItemStatusClient.reportError(e);
return false;
-
} finally {
if (worker != null) {
try {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index 37e04316e23..4845bb0c98e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -92,11 +92,9 @@ public class GroupAlsoByWindowFnRunner<InputT, OutputT>
implements DoFnRunner<In
// This can contain user code. Wrap it in case it throws an exception.
try {
fn.processElement(elem.getValue(), options, stepContext,
sideInputReader, outputManager);
+ } catch (RuntimeException ex) {
+ throw ex;
} catch (Exception ex) {
- if (ex instanceof RuntimeException) {
- throw (RuntimeException) ex;
- }
-
throw new RuntimeException(ex);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
index 9f5e96297e4..882dd497e3f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java
@@ -172,6 +172,8 @@ public class GroupAlsoByWindowsParDoFn<InputT, K, V, W
extends BoundedWindow> im
output -> {
try {
receiver.process(output);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw new RuntimeException(t);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 0181e647ac7..9585b1c42bd 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -680,6 +680,8 @@ public class WorkerCustomSources {
if (fractionConsumed != null) {
progress.setFractionConsumed(fractionConsumed);
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
LOG.warn("Error estimating fraction consumed from reader {}", reader,
t);
}
@@ -689,6 +691,8 @@ public class WorkerCustomSources {
if (parallelism != null) {
progress.setConsumedParallelism(parallelism);
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
LOG.warn("Error estimating consumed parallelism from reader {}",
reader, t);
}
@@ -698,6 +702,8 @@ public class WorkerCustomSources {
if (parallelism != null) {
progress.setRemainingParallelism(parallelism);
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
LOG.warn("Error estimating remaining parallelism from reader {}",
reader, t);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 864887f9bd3..14af9851080 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -377,7 +377,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
if (manager != null) {
manager.error(message, e, code);
}
- } catch (Throwable t) {
+ } catch (Exception ex) {
// Failed to report logging failure. No meaningful action left.
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
index 936b310071f..3669e2dccae 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/JfrInterop.java
@@ -59,6 +59,8 @@ class JfrInterop {
recordingClose = jfrClass.getMethod("close");
jfrConfig = configClass.getMethod("getConfiguration",
String.class).invoke(null, "profile");
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw new RuntimeException(t);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
index 80d32fc6a98..e0288e3433e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitor.java
@@ -529,7 +529,7 @@ public class MemoryMonitor implements Runnable,
StatusDataProvider {
// Clearing this list should "release" some memory that will be needed to
dump the heap.
// We could try to reallocate it again if we later notice memory pressure
has subsided,
- // but that is risk. Further, leaving this released may help with the
memory pressure.
+ // but that is risky. Further, leaving this released may help with the
memory pressure.
reservedForDumpingHeap = null;
try {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
index 31dea074aa6..f6918267f0b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReader.java
@@ -129,7 +129,7 @@ public final class BatchingShuffleEntryReader implements
ShuffleEntryReader {
nextStartPosition = batch.nextStartPosition;
entries = batch.entries.listIterator();
currentBatch = batch;
- } catch (RuntimeException e) {
+ } catch (RuntimeException | OutOfMemoryError e) {
throw e;
} catch (Throwable t) {
throw new RuntimeException(t);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
index f04e9a47afe..5b9c828e16c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
@@ -292,6 +292,8 @@ public abstract class WorkProgressUpdater {
checkpointState = CheckpointState.CHECKPOINT_SUCCESSFUL;
return true;
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
LOG.warn("Error trying to checkpoint the worker: ", e);
}
@@ -305,6 +307,8 @@ public abstract class WorkProgressUpdater {
LOG.debug("Updating progress on work item {}", workString());
try {
reportProgressHelper();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (InterruptedException e) {
LOG.info("Cancelling workitem execution: {}", workString(), e);
worker.abort();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
index d663b4fca27..7116aed3a2b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
@@ -74,6 +74,8 @@ final class StreamingCommitFinalizer {
() -> {
try {
finalizeCommit.run();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
LOG.error("Source checkpoint finalization failed:", t);
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
index c64d8ce7733..0e38abb1b78 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/ArtifactStagingService.java
@@ -583,6 +583,8 @@ public class ArtifactStagingService
if (completionFuture.isCompletedExceptionally()) {
try {
completionFuture.get();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable th) {
responseObserver.onError(th);
return;
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
index 45b1b1942c4..4bee41a630b 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java
@@ -180,6 +180,8 @@ public class ReferenceCountingExecutableStageContextFactory
if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
try {
wrapper.closeActual();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
LOG.error("Unable to close ExecutableStageContext.", t);
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index 470692e7510..3f3bd02dd8a 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -139,6 +139,8 @@ public class EmbeddedEnvironmentFactory implements
EnvironmentFactory {
() -> {
try {
fnHarness.get();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
// Print stacktrace to stderr. Could be useful if underlying error
not surfaced earlier
t.printStackTrace();
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
index aaa27ef7445..109a057585a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java
@@ -268,6 +268,8 @@ public class BeamFnDataOutboundAggregator {
synchronized (flushLock) {
flushInternal();
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw new RuntimeException(t);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
index 9cfedd42c77..896eacb28ff 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java
@@ -269,6 +269,8 @@ public interface ValueProvider<T> extends Serializable {
return result.get();
}
return defaultValue;
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
throw new RuntimeException("Unable to load runtime value.", e);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
index fbf134e4cd2..2ee4eeba6cb 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateUnique.java
@@ -384,6 +384,8 @@ public class ApproximateUnique {
try {
heap.add(hash(input, coder));
return heap;
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
throw new RuntimeException(e);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index 966145f0836..3be59e55615 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -781,6 +781,8 @@ public class DisplayData implements Serializable {
} catch (PopulateDisplayDataException e) {
// Don't re-wrap exceptions recursively.
throw e;
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
String msg =
String.format(
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
index 441e604af3a..40e624db144 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java
@@ -103,6 +103,8 @@ public class MoreFutures {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw new CompletionException(t);
}
@@ -132,6 +134,8 @@ public class MoreFutures {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw new CompletionException(t);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 052ff324490..3893c0f405e 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1321,6 +1321,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
}
try {
consumer.accept(output);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
index 3bfb914a4a9..e83db404cb7 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunner.java
@@ -226,6 +226,8 @@ public class SplittablePairWithRestrictionDoFnRunner<
private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer,
WindowedValue<T> output) {
try {
consumer.accept(output);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
index e42cbdaf643..fac83485d22 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableSplitAndSizeRestrictionsDoFnRunner.java
@@ -316,6 +316,8 @@ public class SplittableSplitAndSizeRestrictionsDoFnRunner<
private <T> void outputTo(FnDataReceiver<WindowedValue<T>> consumer,
WindowedValue<T> output) {
try {
consumer.accept(output);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
index 6c300295eb6..56c109da84b 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java
@@ -764,6 +764,8 @@ public class SplittableTruncateSizedRestrictionsDoFnRunner<
}
try {
consumer.accept(output);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index b8ad51816a7..f773e794579 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -1188,6 +1188,8 @@ public class ProcessBundleHandler {
for (ThrowingRunnable teardownFunction :
Lists.reverse(this.getTearDownFunctions())) {
try {
teardownFunction.run();
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable e) {
LOG.warn(
"Exceptions are thrown from DoFn.teardown method when trying
to discard "
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 99cfeb5f799..99a7ee64832 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -265,6 +265,8 @@ public class BeamFnLoggingClient implements LoggingClient {
throw new RuntimeException(e);
}
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
thrown = t;
throw new RuntimeException(t);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
index e5e0e6d29ea..b9bc79968c3 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiTimerBundleTracker.java
@@ -198,6 +198,8 @@ public class FnApiTimerBundleTracker<K> {
if (timerFamilyOrId != null && timer != null) {
getTimersReceiverFromTimerIdFn.apply(timerFamilyOrId).accept(timer);
}
+ } catch (OutOfMemoryError oom) {
+ throw oom;
} catch (Throwable t) {
throw UserCodeException.wrap(t);
}