gemini-code-assist[bot] commented on code in PR #36165:
URL: https://github.com/apache/beam/pull/36165#discussion_r2350308521


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -26,22 +27,36 @@
 
 class WindmillStateUtil {
 
+  private static final ThreadLocal<SoftReference<ByteStringOutputStream>> 
threadLocalOutputStream =
+      new ThreadLocal<>();
+
   /** Encodes the given namespace and address as {@code 
&lt;namespace&gt;+&lt;address&gt;}. */
   @VisibleForTesting
   static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
     try {
       // Use ByteStringOutputStream rather than concatenation and 
String.format. We build these keys
       // a lot, and this leads to better performance results. See associated 
benchmarks.
-      ByteStringOutputStream stream = new ByteStringOutputStream();
+      ByteStringOutputStream stream = getByteStringOutputStream();
       // stringKey starts and ends with a slash.  We separate it from the
       // StateTag ID by a '+' (which is guaranteed not to be in the stringKey) 
because the
       // ID comes from the user.
       namespace.appendTo(stream);
       stream.append('+');
       address.appendTo(stream);
-      return stream.toByteString();
+      return stream.toByteStringAndReset();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If an `IOException` occurs while writing to the stream, the cached 
`ByteStringOutputStream` will be left in a dirty state, potentially corrupting 
subsequent key encodings on the same thread. The stream should be reset within 
a `catch` block to ensure the cached instance is always clean. This change 
should be made in conjunction with removing the reset from 
`getByteStringOutputStream`.
   
   ```java
     static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) 
{
       ByteStringOutputStream stream = getByteStringOutputStream();
       try {
         // Use ByteStringOutputStream rather than concatenation and 
String.format. We build these keys
         // a lot, and this leads to better performance results. See associated 
benchmarks.
         // stringKey starts and ends with a slash.  We separate it from the
         // StateTag ID by a '+' (which is guaranteed not to be in the 
stringKey) because the
         // ID comes from the user.
         namespace.appendTo(stream);
         stream.append('+');
         address.appendTo(stream);
         return stream.toByteStringAndReset();
       } catch (IOException e) {
         // Reset the stream on failure to not poison the thread-local cache.
         stream.toByteStringAndReset();
         throw new RuntimeException(e);
       }
     }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -71,15 +71,29 @@ class WindmillSink<T> extends Sink<WindowedValue<T>> {
     this.context = context;
   }
 
+  private static ByteString encodeMetadata(
+      ByteStringOutputStream stream,
+      Coder<Collection<? extends BoundedWindow>> windowsCoder,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo paneInfo)
+      throws IOException {
+    try {
+      PaneInfoCoder.INSTANCE.encode(paneInfo, stream);
+      windowsCoder.encode(windows, stream, Coder.Context.OUTER);
+      return stream.toByteStringAndReset();
+    } catch (Throwable e) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Catching `Throwable` is generally discouraged as it can intercept `Error`s 
from which applications are not expected to recover. It would be better to 
catch `Exception` to be consistent with the `encode` method in 
`WindmillStreamWriter` (line 170) and to follow best practices.
   
   ```suggestion
       } catch (Exception e) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -26,22 +27,36 @@
 
 class WindmillStateUtil {
 
+  private static final ThreadLocal<SoftReference<ByteStringOutputStream>> 
threadLocalOutputStream =
+      new ThreadLocal<>();
+
   /** Encodes the given namespace and address as {@code 
&lt;namespace&gt;+&lt;address&gt;}. */
   @VisibleForTesting
   static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
     try {
       // Use ByteStringOutputStream rather than concatenation and 
String.format. We build these keys
       // a lot, and this leads to better performance results. See associated 
benchmarks.
-      ByteStringOutputStream stream = new ByteStringOutputStream();
+      ByteStringOutputStream stream = getByteStringOutputStream();
       // stringKey starts and ends with a slash.  We separate it from the
       // StateTag ID by a '+' (which is guaranteed not to be in the stringKey) 
because the
       // ID comes from the user.
       namespace.appendTo(stream);
       stream.append('+');
       address.appendTo(stream);
-      return stream.toByteString();
+      return stream.toByteStringAndReset();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
+
+  private static ByteStringOutputStream getByteStringOutputStream() {
+    SoftReference<ByteStringOutputStream> refStream = 
threadLocalOutputStream.get();
+    ByteStringOutputStream stream = refStream == null ? null : refStream.get();
+    if (stream == null) {
+      stream = new ByteStringOutputStream();
+      threadLocalOutputStream.set(new SoftReference<>(stream));
+    }
+    stream.toByteStringAndReset();
+    return stream;
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The call to `stream.toByteStringAndReset()` on line 59 creates a new 
`ByteString` object on every invocation, which is then immediately discarded. 
This adds unnecessary GC pressure, which this pull request aims to reduce. The 
responsibility of resetting the stream should lie with the caller 
(`encodeKey`), which should handle both success and failure cases. This line 
should be removed.
   
   ```java
     private static ByteStringOutputStream getByteStringOutputStream() {
       SoftReference<ByteStringOutputStream> refStream = 
threadLocalOutputStream.get();
       ByteStringOutputStream stream = refStream == null ? null : 
refStream.get();
       if (stream == null) {
         stream = new ByteStringOutputStream();
         threadLocalOutputStream.set(new SoftReference<>(stream));
       }
       return stream;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to