scwhittle commented on code in PR #36165:
URL: https://github.com/apache/beam/pull/36165#discussion_r2355460176
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -18,30 +18,65 @@
package org.apache.beam.runners.dataflow.worker.windmill.state;
import java.io.IOException;
+import java.lang.ref.SoftReference;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
class WindmillStateUtil {
+ private static final ThreadLocal<@Nullable SoftReference<@Nullable
ByteStringOutputStream>>
+ threadLocalOutputStream = new ThreadLocal<>();
+ // True when threadLocalOutputStream is already in use by the current thread.
+ // Used to avoid reusing the same stream from nested calls if any.
+ private static final ThreadLocal<@Nullable Boolean>
threadLocalOutputStreamInUse =
+ new ThreadLocal<>();
+
/** Encodes the given namespace and address as {@code
<namespace>+<address>}. */
@VisibleForTesting
static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
+ // Use ByteStringOutputStream rather than concatenation and String.format.
We build these keys
+ // a lot, and this leads to better performance results. See associated
benchmarks.
+ ByteStringOutputStream stream;
+ boolean releaseThreadLocal = false;
+ if (Boolean.TRUE.equals(threadLocalOutputStreamInUse.get())) {
+ stream = new ByteStringOutputStream();
+ } else {
+ stream = getByteStringOutputStreamFromThreadLocal();
+ threadLocalOutputStreamInUse.set(true);
+ releaseThreadLocal = true;
+ }
try {
- // Use ByteStringOutputStream rather than concatenation and
String.format. We build these keys
- // a lot, and this leads to better performance results. See associated
benchmarks.
- ByteStringOutputStream stream = new ByteStringOutputStream();
// stringKey starts and ends with a slash. We separate it from the
// StateTag ID by a '+' (which is guaranteed not to be in the stringKey)
because the
// ID comes from the user.
namespace.appendTo(stream);
stream.append('+');
address.appendTo(stream);
- return stream.toByteString();
+ return stream.toByteStringAndReset();
} catch (IOException e) {
throw new RuntimeException(e);
+ } finally {
+ if (stream.size() > 0) {
+ stream.toByteStringAndReset();
Review Comment:
we could add a reset or clear method that is cheap if empty and perhaps
remove the size check. could use in the Windmill sink exception handling also
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateUtil.java:
##########
@@ -18,30 +18,65 @@
package org.apache.beam.runners.dataflow.worker.windmill.state;
import java.io.IOException;
+import java.lang.ref.SoftReference;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
class WindmillStateUtil {
+ private static final ThreadLocal<@Nullable SoftReference<@Nullable
ByteStringOutputStream>>
+ threadLocalOutputStream = new ThreadLocal<>();
+ // True when threadLocalOutputStream is already in use by the current thread.
+ // Used to avoid reusing the same stream from nested calls if any.
+ private static final ThreadLocal<@Nullable Boolean>
threadLocalOutputStreamInUse =
Review Comment:
instead of two thread-locals have a single threadlocal containing a holder
object of bool and soft-ref to bytestringoutputstream
that also means you can just get the holder once then update it instead of
going through the thread-local again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]