arunpandianp commented on code in PR #36742:
URL: https://github.com/apache/beam/pull/36742#discussion_r2516233164


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
     @Override
     public void close() {
       stream.reset();
-      if (releaseThreadLocal) {
+      if (refHolder != null) {
         refHolder.inUse = false;
       }
     }
   }
 
   private static class RefHolder {
 
-    public SoftReference<@Nullable ByteStringOutputStream> streamRef =
-        new SoftReference<>(new ByteStringOutputStream());
+    public ByteStringOutputStream stream = new ByteStringOutputStream();
 
     // Boolean is true when the thread local stream is already in use by the 
current thread.
     // Used to avoid reusing the same stream from nested calls if any.
     public boolean inUse = false;
-  }
 
-  private static RefHolder getRefHolderFromThreadLocal() {
-    @Nullable RefHolder refHolder = threadLocalRefHolder.get();
-    if (refHolder == null) {
-      refHolder = new RefHolder();
-      threadLocalRefHolder.set(refHolder);
+    public @Nullable StreamHandle streamHandle = null;
+
+    public static RefHolder create() {
+      RefHolder refHolder = new RefHolder();
+      refHolder.streamHandle = new StreamHandle(refHolder);
+      return refHolder;
     }
-    return refHolder;
   }
 
-  private static ByteStringOutputStream getByteStringOutputStream(RefHolder 
refHolder) {
-    @Nullable
-    ByteStringOutputStream stream = refHolder.streamRef == null ? null : 
refHolder.streamRef.get();
-    if (stream == null) {
-      stream = new ByteStringOutputStream();
-      refHolder.streamRef = new SoftReference<>(stream);
+  private static RefHolder getRefHolderFromThreadLocal() {
+    @Nullable SoftReference<RefHolder> refHolderSoftReference = 
threadLocalRefHolder.get();
+    @Nullable RefHolder refHolder = null;
+    if (refHolderSoftReference != null) {
+      refHolder = refHolderSoftReference.get();
     }
-    return stream;
+    if (refHolderSoftReference == null || refHolder == null) {
+      refHolder = RefHolder.create();
+      threadLocalRefHolder.set(new SoftReference<>(refHolder));

Review Comment:
   Wrapped RefHolder with a SoftRefHolder. 
   
   Want to make StreamHandle::stream always return the same stream. Making only 
the stream a SoftReference will mean the stream() could return different 
streams at different times, if the caller is not keeping a reference to the 
returned stream in the middle.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
     @Override
     public void close() {
       stream.reset();
-      if (releaseThreadLocal) {
+      if (refHolder != null) {
         refHolder.inUse = false;
       }
     }
   }
 
   private static class RefHolder {
 
-    public SoftReference<@Nullable ByteStringOutputStream> streamRef =
-        new SoftReference<>(new ByteStringOutputStream());
+    public ByteStringOutputStream stream = new ByteStringOutputStream();

Review Comment:
   Need this after the change to wrap RefHolder with SoftRefHolder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to