This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e5db1b3c63e [Dataflow Java Streaming] Reset state using finally blocks 
instead of catching Exception, in cases where it may otherwise corrupt 
datastructures if an OutOfMemoryError is thrown. (#37746)
e5db1b3c63e is described below

commit e5db1b3c63e2ab8f59ca9bb36b18201aab3cb1f3
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Mar 4 09:27:24 2026 +0000

    [Dataflow Java Streaming] Reset state using finally blocks instead of 
catching Exception, in cases where it may otherwise corrupt datastructures if 
an OutOfMemoryError is thrown. (#37746)
---
 .../beam/runners/dataflow/worker/WindmillSink.java | 54 +++++++++++++---------
 .../windmill/client/AbstractWindmillStream.java    | 39 +++++++++-------
 .../windmill/client/grpc/GrpcGetDataStream.java    | 14 +++---
 3 files changed, 61 insertions(+), 46 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
index 5cb3cb56d9e..2ed29125bd4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.runners.dataflow.util.Structs.getString;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
@@ -51,16 +52,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings({
-  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 class WindmillSink<T> extends Sink<WindowedValue<T>> {
 
-  private WindmillStreamWriter writer;
+  private final WindmillStreamWriter writer;
   private final Coder<T> valueCoder;
   private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
-  private StreamingModeExecutionContext context;
+  private final StreamingModeExecutionContext context;
   private static final Logger LOG = 
LoggerFactory.getLogger(WindmillSink.class);
 
   WindmillSink(
@@ -81,6 +78,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
       PaneInfo paneInfo,
       BeamFnApi.Elements.ElementMetadata metadata)
       throws IOException {
+    boolean resetNeeded = true;
     try {
       // element metadata is behind the experiment
       boolean elementMetadata = 
WindowedValues.WindowedValueCoder.isMetadataSupported();
@@ -92,10 +90,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         PaneInfoCoder.INSTANCE.encode(paneInfo, stream);
         windowsCoder.encode(windows, stream, Coder.Context.OUTER);
       }
-      return stream.toByteStringAndReset();
-    } catch (Exception e) {
-      stream.reset();
-      throw e;
+      ByteString result = stream.toByteStringAndReset();
+      resetNeeded = false;
+      return result;
+    } finally {
+      if (resetNeeded) {
+        stream.reset();
+      }
     }
   }
 
@@ -150,6 +151,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
     }
   }
 
+  @SuppressWarnings("rawtypes")
   public static class Factory implements SinkFactory {
 
     @Override
@@ -166,7 +168,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
       return new WindmillSink<>(
           getString(spec, "stream_id"),
           typedCoder,
-          (StreamingModeExecutionContext) executionContext);
+          checkNotNull((StreamingModeExecutionContext) executionContext));
     }
   }
 
@@ -198,17 +200,21 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         throw new IllegalStateException(
             "Expected output stream to be empty but had " + 
stream.toByteString());
       }
+      boolean resetNeeded = true;
       try {
         coder.encode(object, stream, Coder.Context.OUTER);
-        return stream.toByteStringAndReset();
-      } catch (Exception e) {
-        stream.reset();
-        throw e;
+        ByteString result = stream.toByteStringAndReset();
+        resetNeeded = false;
+        return result;
+      } finally {
+        if (resetNeeded) {
+          stream.reset();
+        }
       }
     }
 
     @Override
-    @SuppressWarnings("NestedInstanceOfConditions")
+    @SuppressWarnings({"rawtypes", "NestedInstanceOfConditions"})
     public long add(WindowedValue<T> data) throws IOException {
       ByteString key, value;
       ByteString id = ByteString.EMPTY;
@@ -220,13 +226,13 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
               stream, windowsCoder, data.getWindows(), data.getPaneInfo(), 
additionalMetadata);
       if (valueCoder instanceof KvCoder) {
         KvCoder kvCoder = (KvCoder) valueCoder;
-        KV kv = (KV) data.getValue();
+        KV kv = checkNotNull((KV) data.getValue());
         key = encode(kvCoder.getKeyCoder(), kv.getKey());
         Coder valueCoder = kvCoder.getValueCoder();
         // If ids are explicitly provided, use that instead of the 
windmill-generated id.
         // This is used when reading an UnboundedSource to deduplicate records.
         if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
-          ValueWithRecordId valueAndId = (ValueWithRecordId) kv.getValue();
+          ValueWithRecordId valueAndId = checkNotNull((ValueWithRecordId) 
kv.getValue());
           value =
               encode(((ValueWithRecordIdCoder) valueCoder).getValueCoder(), 
valueAndId.getValue());
           id = ByteString.copyFrom(valueAndId.getId());
@@ -234,7 +240,7 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
           value = encode(valueCoder, kv.getValue());
         }
       } else {
-        key = context.getSerializedKey();
+        key = checkNotNull(context.getSerializedKey());
         value = encode(valueCoder, data.getValue());
       }
       if (key.size() > context.getMaxOutputKeyBytes()) {
@@ -291,8 +297,9 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         }
         byte[] rawId = null;
 
-        if (data.getRecordId() != null) {
-          rawId = data.getRecordId().getBytes(StandardCharsets.UTF_8);
+        @Nullable String recordId = data.getRecordId();
+        if (recordId != null) {
+          rawId = recordId.getBytes(StandardCharsets.UTF_8);
         } else {
           rawId = context.getCurrentRecordId();
         }
@@ -303,8 +310,9 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
         id = ByteString.copyFrom(rawId);
 
         byte[] rawOffset = null;
-        if (data.getRecordOffset() != null) {
-          rawOffset = Longs.toByteArray(data.getRecordOffset());
+        @Nullable Long recordOffset = data.getRecordOffset();
+        if (recordOffset != null) {
+          rawOffset = Longs.toByteArray(recordOffset);
         } else {
           rawOffset = context.getCurrentRecordOffset();
         }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
index ed99ae1bbd6..cc3f555a115 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
@@ -268,29 +268,36 @@ public abstract class AbstractWindmillStream<RequestT, 
ResponseT> implements Win
           debugMetrics.recordStart();
           streamHandler.streamDebugMetrics.recordStart();
           currentPhysicalStream = streamHandler;
-          currentPhysicalStreamForDebug.set(currentPhysicalStream);
-          requestObserver.reset(physicalStreamFactory.apply(new 
ResponseObserver(streamHandler)));
-          onFlushPending(true);
-          if (clientClosed) {
-            // The logical stream is half-closed so after flushing the 
remaining requests close the
-            // physical stream.
-            streamHandler.streamDebugMetrics.recordHalfClose();
-            requestObserver.onCompleted();
-          } else if (!halfClosePhysicalStreamAfter.isZero()) {
-            halfCloseFuture =
-                executor.schedule(
-                    () -> onHalfClosePhysicalStreamTimeout(streamHandler),
-                    halfClosePhysicalStreamAfter.getSeconds(),
-                    TimeUnit.SECONDS);
+          boolean resetCurrentPhysicalStream = true;
+          try {
+            currentPhysicalStreamForDebug.set(currentPhysicalStream);
+            requestObserver.reset(physicalStreamFactory.apply(new 
ResponseObserver(streamHandler)));
+            onFlushPending(true);
+            if (clientClosed) {
+              // The logical stream is half-closed so after flushing the 
remaining requests close
+              // the
+              // physical stream.
+              streamHandler.streamDebugMetrics.recordHalfClose();
+              requestObserver.onCompleted();
+            } else if (!halfClosePhysicalStreamAfter.isZero()) {
+              halfCloseFuture =
+                  executor.schedule(
+                      () -> onHalfClosePhysicalStreamTimeout(streamHandler),
+                      halfClosePhysicalStreamAfter.getSeconds(),
+                      TimeUnit.SECONDS);
+            }
+            resetCurrentPhysicalStream = false;
+          } finally {
+            if (resetCurrentPhysicalStream) {
+              clearCurrentPhysicalStream(true);
+            }
           }
           return;
         } catch (WindmillStreamShutdownException e) {
           logger.debug("Stream was shutdown while creating new stream.", e);
-          clearCurrentPhysicalStream(true);
           break;
         } catch (Exception e) {
           logger.error("Failed to create new stream, retrying: ", e);
-          clearCurrentPhysicalStream(true);
           debugMetrics.recordRestartReason("Failed to create new stream, 
retrying: " + e);
         }
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
index bd1c9eed408..9503893e2cf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
@@ -289,11 +289,11 @@ final class GrpcGetDataStream
         // Notify all waiters with requests in this batch as well as the sender
         // of the next batch (if one exists).
         batch.notifySent();
-      } catch (Exception e) {
-        LOG.debug("Batch failed to send on new stream", e);
+      } catch (Throwable t) {
         // Free waiters if the send() failed.
         batch.notifyFailed();
-        throw e;
+        LOG.debug("Batch failed to send on new stream", t);
+        throw t;
       }
     }
   }
@@ -535,12 +535,12 @@ final class GrpcGetDataStream
       // Notify all waiters with requests in this batch as well as the sender
       // of the next batch (if one exists).
       batch.notifySent();
-    } catch (Exception e) {
-      LOG.debug("Batch failed to send", e);
+    } catch (Throwable t) {
       // Free waiters if the send() failed.
       batch.notifyFailed();
-      // Propagate the exception to the calling thread.
-      throw e;
+      LOG.debug("Batch failed to send", t);
+      // Propagate the exception/error to the calling thread.
+      throw t;
     }
   }
 

Reply via email to