arunpandianp commented on code in PR #36742:
URL: https://github.com/apache/beam/pull/36742#discussion_r2516233164
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
@Override
public void close() {
stream.reset();
- if (releaseThreadLocal) {
+ if (refHolder != null) {
refHolder.inUse = false;
}
}
}
private static class RefHolder {
- public SoftReference<@Nullable ByteStringOutputStream> streamRef =
- new SoftReference<>(new ByteStringOutputStream());
+ public ByteStringOutputStream stream = new ByteStringOutputStream();
// Boolean is true when the thread local stream is already in use by the
current thread.
// Used to avoid reusing the same stream from nested calls if any.
public boolean inUse = false;
- }
- private static RefHolder getRefHolderFromThreadLocal() {
- @Nullable RefHolder refHolder = threadLocalRefHolder.get();
- if (refHolder == null) {
- refHolder = new RefHolder();
- threadLocalRefHolder.set(refHolder);
+ public @Nullable StreamHandle streamHandle = null;
+
+ public static RefHolder create() {
+ RefHolder refHolder = new RefHolder();
+ refHolder.streamHandle = new StreamHandle(refHolder);
+ return refHolder;
}
- return refHolder;
}
- private static ByteStringOutputStream getByteStringOutputStream(RefHolder
refHolder) {
- @Nullable
- ByteStringOutputStream stream = refHolder.streamRef == null ? null :
refHolder.streamRef.get();
- if (stream == null) {
- stream = new ByteStringOutputStream();
- refHolder.streamRef = new SoftReference<>(stream);
+ private static RefHolder getRefHolderFromThreadLocal() {
+ @Nullable SoftReference<RefHolder> refHolderSoftReference =
threadLocalRefHolder.get();
+ @Nullable RefHolder refHolder = null;
+ if (refHolderSoftReference != null) {
+ refHolder = refHolderSoftReference.get();
}
- return stream;
+ if (refHolderSoftReference == null || refHolder == null) {
+ refHolder = RefHolder.create();
+ threadLocalRefHolder.set(new SoftReference<>(refHolder));
Review Comment:
Wrapped RefHolder with a SoftRefHolder.
Want to make StreamHandle::stream always return the same stream. Making only
the stream a SoftReference will mean the stream() could return different
streams at different times, if the caller is not keeping a reference to the
returned stream in the middle.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ThreadLocalByteStringOutputStream.java:
##########
@@ -74,38 +82,39 @@ public ByteStringOutputStream stream() {
@Override
public void close() {
stream.reset();
- if (releaseThreadLocal) {
+ if (refHolder != null) {
refHolder.inUse = false;
}
}
}
private static class RefHolder {
- public SoftReference<@Nullable ByteStringOutputStream> streamRef =
- new SoftReference<>(new ByteStringOutputStream());
+ public ByteStringOutputStream stream = new ByteStringOutputStream();
Review Comment:
Need this after the change to wrap RefHolder with SoftRefHolder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]