This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 828914840df [Dataflow Java Streaming] Ensure that OutOfMemoryError 
triggers JVM shutdown instead of being retried. This matches MemoryMonitor 
detection of gc thrashing and prevents workers from remaining in broken state 
that may be fixed by restarting (#37749)
828914840df is described below

commit 828914840df9d5a4c181ac4de6d0a1618278e504
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Mar 5 10:31:18 2026 +0000

    [Dataflow Java Streaming] Ensure that OutOfMemoryError triggers JVM 
shutdown instead of being retried. This matches MemoryMonitor detection of gc 
thrashing and prevents workers from remaining in broken state that may be fixed 
by restarting (#37749)
---
 .../work/processing/StreamingWorkScheduler.java    |  21 ++--
 .../processing/failures/WorkFailureProcessor.java  | 118 ++++++++++++---------
 .../failures/WorkFailureProcessorTest.java         |  22 ++--
 3 files changed, 95 insertions(+), 66 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 242e4a5f0db..bb936831d6e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -278,13 +278,20 @@ public class StreamingWorkScheduler {
       recordProcessingStats(commitRequest, workItem, executeWorkResult);
       LOG.debug("Processing done for work token: {}", workItem.getWorkToken());
     } catch (Throwable t) {
-      workFailureProcessor.logAndProcessFailure(
-          computationId,
-          ExecutableWork.create(work, retry -> processWork(computationState, 
retry)),
-          t,
-          invalidWork ->
-              computationState.completeWorkAndScheduleNextWorkForKey(
-                  invalidWork.getShardedKey(), invalidWork.id()));
+      // OutOfMemoryError that are caught will be rethrown and trigger jvm 
termination.
+      try {
+        workFailureProcessor.logAndProcessFailure(
+            computationId,
+            ExecutableWork.create(work, retry -> processWork(computationState, 
retry)),
+            t,
+            invalidWork ->
+                computationState.completeWorkAndScheduleNextWorkForKey(
+                    invalidWork.getShardedKey(), invalidWork.id()));
+      } catch (OutOfMemoryError oom) {
+        throw oom;
+      } catch (Throwable t2) {
+        throw new RuntimeException(t2);
+      }
     } finally {
       // Update total processing time counters. Updating in finally clause 
ensures that
       // work items causing exceptions are also accounted in time spent.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
index 0f0513b81c7..18c8e9b8d83 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessor.java
@@ -107,14 +107,20 @@ public final class WorkFailureProcessor {
       String computationId,
       ExecutableWork executableWork,
       Throwable t,
-      Consumer<Work> onInvalidWork) {
-    if (shouldRetryLocally(computationId, executableWork.work(), t)) {
-      // Try again after some delay and at the end of the queue to avoid a 
tight loop.
-      executeWithDelay(retryLocallyDelayMs, executableWork);
-    } else {
-      // Consider the item invalid. It will eventually be retried by Windmill 
if it still needs to
-      // be processed.
-      onInvalidWork.accept(executableWork.work());
+      Consumer<Work> onInvalidWork)
+      throws Throwable {
+    switch (evaluateRetry(computationId, executableWork.work(), t)) {
+      case DO_NOT_RETRY:
+        // Consider the item invalid. It will eventually be retried by 
Windmill if it still needs to
+        // be processed.
+        onInvalidWork.accept(executableWork.work());
+        break;
+      case RETRY_LOCALLY:
+        // Try again after some delay and at the end of the queue to avoid a 
tight loop.
+        executeWithDelay(retryLocallyDelayMs, executableWork);
+        break;
+      case RETHROW_THROWABLE:
+        throw t;
     }
   }
 
@@ -131,7 +137,13 @@ public final class WorkFailureProcessor {
         executableWork, executableWork.work().getSerializedWorkItemSize());
   }
 
-  private boolean shouldRetryLocally(String computationId, Work work, 
Throwable t) {
+  private enum RetryEvaluation {
+    DO_NOT_RETRY,
+    RETRY_LOCALLY,
+    RETHROW_THROWABLE,
+  }
+
+  private RetryEvaluation evaluateRetry(String computationId, Work work, 
Throwable t) {
     @Nullable final Throwable cause = t.getCause();
     Throwable parsedException = (t instanceof UserCodeException && cause != 
null) ? cause : t;
     if (KeyTokenInvalidException.isKeyTokenInvalidException(parsedException)) {
@@ -140,53 +152,59 @@ public final class WorkFailureProcessor {
               + "Work will not be retried locally.",
           computationId,
           work.getWorkItem().getShardingKey());
-    } else if 
(WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
+      return RetryEvaluation.DO_NOT_RETRY;
+    }
+    if 
(WorkItemCancelledException.isWorkItemCancelledException(parsedException)) {
       LOG.debug(
           "Execution of work for computation '{}' on sharding key '{}' failed. 
"
               + "Work will not be retried locally.",
           computationId,
           work.getWorkItem().getShardingKey());
-    } else {
-      LastExceptionDataProvider.reportException(parsedException);
-      LOG.debug("Failed work: {}", work);
-      Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), 
clock.get());
-      if (!failureTracker.trackFailure(computationId, work.getWorkItem(), 
parsedException)) {
-        LOG.error(
-            "Execution of work for computation '{}' on sharding key '{}' 
failed with uncaught exception, "
-                + "and Windmill indicated not to retry locally.",
-            computationId,
-            work.getWorkItem().getShardingKey(),
-            parsedException);
-      } else if (isOutOfMemoryError(parsedException)) {
-        String heapDump = tryToDumpHeap();
-        LOG.error(
-            "Execution of work for computation '{}' for sharding key '{}' 
failed with out-of-memory. "
-                + "Work will not be retried locally. Heap dump {}.",
-            computationId,
-            work.getWorkItem().getShardingKey(),
-            heapDump,
-            parsedException);
-      } else if 
(elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
-        LOG.error(
-            "Execution of work for computation '{}' for sharding key '{}' 
failed with uncaught exception, "
-                + "and it will not be retried locally because the elapsed time 
since start {} "
-                + "exceeds {}.",
-            computationId,
-            work.getWorkItem().getShardingKey(),
-            elapsedTimeSinceStart,
-            MAX_LOCAL_PROCESSING_RETRY_DURATION,
-            parsedException);
-      } else {
-        LOG.error(
-            "Execution of work for computation '{}' on sharding key '{}' 
failed with uncaught exception. "
-                + "Work will be retried locally.",
-            computationId,
-            work.getWorkItem().getShardingKey(),
-            parsedException);
-        return true;
-      }
+      return RetryEvaluation.DO_NOT_RETRY;
     }
 
-    return false;
+    LastExceptionDataProvider.reportException(parsedException);
+    LOG.debug("Failed work: {}", work);
+    Duration elapsedTimeSinceStart = new Duration(work.getStartTime(), 
clock.get());
+    if (isOutOfMemoryError(parsedException)) {
+      String heapDump = tryToDumpHeap();
+      LOG.error(
+          "Execution of work for computation '{}' for sharding key '{}' failed 
with out-of-memory. "
+              + "Work will not be retried locally. Heap dump {}.",
+          computationId,
+          work.getWorkItem().getShardingKey(),
+          heapDump,
+          parsedException);
+      return RetryEvaluation.RETHROW_THROWABLE;
+    }
+
+    if (!failureTracker.trackFailure(computationId, work.getWorkItem(), 
parsedException)) {
+      LOG.error(
+          "Execution of work for computation '{}' on sharding key '{}' failed 
with uncaught exception, "
+              + "and Windmill indicated not to retry locally.",
+          computationId,
+          work.getWorkItem().getShardingKey(),
+          parsedException);
+      return RetryEvaluation.DO_NOT_RETRY;
+    }
+    if 
(elapsedTimeSinceStart.isLongerThan(MAX_LOCAL_PROCESSING_RETRY_DURATION)) {
+      LOG.error(
+          "Execution of work for computation '{}' for sharding key '{}' failed 
with uncaught exception, "
+              + "and it will not be retried locally because the elapsed time 
since start {} "
+              + "exceeds {}.",
+          computationId,
+          work.getWorkItem().getShardingKey(),
+          elapsedTimeSinceStart,
+          MAX_LOCAL_PROCESSING_RETRY_DURATION,
+          parsedException);
+      return RetryEvaluation.DO_NOT_RETRY;
+    }
+    LOG.error(
+        "Execution of work for computation '{}' on sharding key '{}' failed 
with uncaught exception. "
+            + "Work will be retried locally.",
+        computationId,
+        work.getWorkItem().getShardingKey(),
+        parsedException);
+    return RetryEvaluation.RETRY_LOCALLY;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
index 41f2230f4a8..68a11895fa1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
@@ -18,6 +18,7 @@
 package 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.mockito.Mockito.mock;
 
 import java.util.HashSet;
@@ -105,7 +106,7 @@ public class WorkFailureProcessorTest {
   }
 
   @Test
-  public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() {
+  public void logAndProcessFailure_doesNotRetryKeyTokenInvalidException() 
throws Throwable {
     Set<Work> executedWork = new HashSet<>();
     ExecutableWork work = createWork(executedWork::add);
     WorkFailureProcessor workFailureProcessor =
@@ -119,7 +120,7 @@ public class WorkFailureProcessorTest {
   }
 
   @Test
-  public void logAndProcessFailure_doesNotRetryWhenWorkItemCancelled() {
+  public void logAndProcessFailure_doesNotRetryWhenWorkItemCancelled() throws 
Throwable {
     Set<Work> executedWork = new HashSet<>();
     ExecutableWork work = createWork(executedWork::add);
     WorkFailureProcessor workFailureProcessor =
@@ -142,15 +143,18 @@ public class WorkFailureProcessorTest {
     WorkFailureProcessor workFailureProcessor =
         createWorkFailureProcessor(streamingEngineFailureReporter());
     Set<Work> invalidWork = new HashSet<>();
-    workFailureProcessor.logAndProcessFailure(
-        DEFAULT_COMPUTATION_ID, work, new OutOfMemoryError(), 
invalidWork::add);
+    assertThrows(
+        OutOfMemoryError.class,
+        () ->
+            workFailureProcessor.logAndProcessFailure(
+                DEFAULT_COMPUTATION_ID, work, new OutOfMemoryError(), 
invalidWork::add));
 
     assertThat(executedWork).isEmpty();
-    assertThat(invalidWork).containsExactly(work.work());
   }
 
   @Test
-  public void 
logAndProcessFailure_doesNotRetryWhenFailureReporterMarksAsNonRetryable() {
+  public void 
logAndProcessFailure_doesNotRetryWhenFailureReporterMarksAsNonRetryable()
+      throws Throwable {
     Set<Work> executedWork = new HashSet<>();
     ExecutableWork work = createWork(executedWork::add);
     WorkFailureProcessor workFailureProcessor =
@@ -164,7 +168,7 @@ public class WorkFailureProcessorTest {
   }
 
   @Test
-  public void logAndProcessFailure_doesNotRetryAfterLocalRetryTimeout() {
+  public void logAndProcessFailure_doesNotRetryAfterLocalRetryTimeout() throws 
Throwable {
     Set<Work> executedWork = new HashSet<>();
     ExecutableWork veryOldWork =
         createWork(() -> Instant.now().minus(Duration.standardDays(30)), 
executedWork::add);
@@ -180,7 +184,7 @@ public class WorkFailureProcessorTest {
 
   @Test
   public void 
logAndProcessFailure_retriesOnUncaughtUnhandledException_streamingEngine()
-      throws InterruptedException {
+      throws Throwable {
     CountDownLatch runWork = new CountDownLatch(1);
     ExecutableWork work = createWork(ignored -> runWork.countDown());
     WorkFailureProcessor workFailureProcessor =
@@ -195,7 +199,7 @@ public class WorkFailureProcessorTest {
 
   @Test
   public void 
logAndProcessFailure_retriesOnUncaughtUnhandledException_streamingAppliance()
-      throws InterruptedException {
+      throws Throwable {
     CountDownLatch runWork = new CountDownLatch(1);
     ExecutableWork work = createWork(ignored -> runWork.countDown());
     WorkFailureProcessor workFailureProcessor =

Reply via email to