scwhittle commented on code in PR #36313:
URL: https://github.com/apache/beam/pull/36313#discussion_r2451227990
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java:
##########
@@ -84,20 +85,15 @@ protected StateTag.StateBinder
binderForNamespace(StateNamespace namespace, Stat
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T>
elemCoder) {
StateTag<BagState<T>> resolvedAddress =
isSystemTable ? StateTags.makeSystemTagInternal(address) : address;
+ InternedByteString encodeKey =
windmillStateTagUtil.encodeKey(namespace, resolvedAddress);
Review Comment:
how about encodedKey?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -225,24 +225,23 @@ public long getWeight() {
/** Entry in the state cache that stores a map of values. */
private static class StateCacheEntry implements Weighted {
- private final HashMap<NamespacedTag<?>, WeightedValue<?>> values;
+ private final IdentityHashMap<InternedByteString, WeightedValue<?>> values;
Review Comment:
if we in some cases are not using a consistent WindmillStateTagUtil then we
could have InternedByteStrings which are equivalent but not the same object.
It also seems like IdentityHashMap uses linear probe instead of HashMap
which looks like it may change to trees within nodes if needed and SO indicates
it may be performance regression.
How about instead just making InternedByteString intern the hashcode and
change equals to first compare by identity?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -43,7 +46,7 @@ private WindmillStateTagUtil() {}
/** Encodes the given namespace and address as {@code
<namespace>+<address>}. */
@VisibleForTesting
- ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
+ InternedByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
Review Comment:
Add a comment about interning, I didn't see it was weak at first and was
concerned about dynamic StateTags like those in
runners/google_cloud_dataflow/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/StateTag.java
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -225,24 +225,23 @@ public long getWeight() {
/** Entry in the state cache that stores a map of values. */
private static class StateCacheEntry implements Weighted {
- private final HashMap<NamespacedTag<?>, WeightedValue<?>> values;
+ private final IdentityHashMap<InternedByteString, WeightedValue<?>> values;
Review Comment:
It seems like this might have collisions between types. For example if we
have a StateTag<Value> StateTag<Bag> for the same "tag", I believe the
bytestring for both would be the same and we could collide in the cache.
Can you add a test covering that?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java:
##########
@@ -145,4 +148,21 @@ private static ByteStringOutputStream
getByteStringOutputStream(RefHolder refHol
public static WindmillStateTagUtil instance() {
return INSTANCE;
}
+
+ @AutoValue
+ /*StateTags are Interned across keys to reduce memory usage and GC pressure
*/
+ public abstract static class InternedByteString {
Review Comment:
Should this be a separate class? It seems like we could use it elsewhere,
and it would make it clearer that interning is static and not dependent on
WindmillStateTagUtil instance.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -412,6 +380,11 @@ public boolean supportMapStateViaMultimapState() {
}
public <T extends State> Optional<T> get(StateNamespace namespace,
StateTag<T> address) {
+ return get(namespace,
WindmillStateTagUtil.instance().encodeKey(namespace, address));
Review Comment:
can this method be removed?
will make callers think about caching
if used for tests, helper method could be added to the test
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java:
##########
@@ -420,11 +393,11 @@ public <T extends State> Optional<T> get(StateNamespace
namespace, StateTag<T> a
localCache.computeIfAbsent(
new StateId(forKey, stateFamily, namespace),
stateCache::getIfPresent));
- return stateCacheEntry.flatMap(entry -> entry.get(namespace, address));
+ return stateCacheEntry.flatMap(entry -> entry.get(encodedAddress));
Review Comment:
don't use Optional to avoid allocation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]