scwhittle commented on code in PR #36218:
URL: https://github.com/apache/beam/pull/36218#discussion_r2378172633
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -106,6 +108,23 @@ public MultimapUserState(
.setWindow(stateKey.getMultimapKeysUserState().getWindow())
.setKey(stateKey.getMultimapKeysUserState().getKey());
this.userStateRequest = userStateRequestBuilder.build();
+
+ StateRequest.Builder entriesStateRequestBuilder =
StateRequest.newBuilder();
Review Comment:
Am I correct that this object is constructed fresh each time we have a
ProcessBundle request for a dofn that uses multimap state? If so we might want
to optimize by moving more of these to be lazily initialized when used.
could just leave an optimization comment for now.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
Review Comment:
could use use ImmutableMap.copyOf(pendingAdds)?
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -2917,6 +2917,72 @@ public void processElement(
pipeline.run();
}
+ @Test
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class,
UsesMultimapState.class})
+ public void testMultimapStateEntries() {
+ final String stateId = "foo:";
+ final String countStateId = "count";
+ DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>> fn =
+ new DoFn<KV<String, KV<String, Integer>>, KV<String, Integer>>() {
+
+ @StateId(stateId)
+ private final StateSpec<MultimapState<String, Integer>>
multimapState =
+ StateSpecs.multimap(StringUtf8Coder.of(), VarIntCoder.of());
+
+ @StateId(countStateId)
+ private final StateSpec<CombiningState<Integer, int[], Integer>>
countState =
+ StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
Sum.ofIntegers());
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @Element KV<String, KV<String, Integer>> element,
+ @StateId(stateId) MultimapState<String, Integer> state,
+ @StateId(countStateId) CombiningState<Integer, int[], Integer>
count,
+ OutputReceiver<KV<String, Integer>> r) {
+ // Empty before we process any elements.
+ if (count.read() == 0) {
+ assertThat(state.entries().read(), emptyIterable());
+ }
+ assertEquals(count.read().intValue(),
Iterables.size(state.entries().read()));
+
+ KV<String, Integer> value = element.getValue();
+ state.put(value.getKey(), value.getValue());
+ count.add(1);
+
+ if (count.read() >= 4) {
+ // Those should be evaluated only when ReadableState.read is
called.
+ ReadableState<Iterable<Entry<String, Integer>>> entriesView =
state.entries();
Review Comment:
can entriesView and containsBadView be validated? or removed?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
+ // Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
+ // it was created.
+ Map<K, List<V>> pendingAddsNow = new HashMap<>();
+ for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+ pendingAddsNow.put(
+ entry.getValue().getKey(), new ArrayList<>()); //
entry.getValue().getValue());
+ for (V value : entry.getValue().getValue()) {
+ List<V> values = pendingAddsNow.get(entry.getValue().getKey());
+ values.add(value);
+ }
+ }
+ return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
+ @Override
+ public PrefetchableIterator<Map.Entry<K, V>> createIterator() {
+ return new PrefetchableIterator<Map.Entry<K, V>>() {
+ PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
+ PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ persistedEntries,
+ entry ->
+ Iterables.transform(
+ entry.getValue(),
+ value ->
Maps.immutableEntry(entry.getKey(), value)))))
+ .iterator();
+ Iterator<Map.Entry<K, V>> pendingAddsNowIterator;
+ boolean hasNext;
Review Comment:
hasNextReady? it's confusing as is since hasNext=false doesn't mean it
doesn't have next, just that we haven't calculated it.
or perhaps you can remove this variable and just mark nextEntry nullable and
comment that if it is non-null it is calculated next element to return.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
+ // Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
+ // it was created.
+ Map<K, List<V>> pendingAddsNow = new HashMap<>();
+ for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+ pendingAddsNow.put(
+ entry.getValue().getKey(), new ArrayList<>()); //
entry.getValue().getValue());
+ for (V value : entry.getValue().getValue()) {
+ List<V> values = pendingAddsNow.get(entry.getValue().getKey());
+ values.add(value);
+ }
+ }
+ return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
+ @Override
+ public PrefetchableIterator<Map.Entry<K, V>> createIterator() {
+ return new PrefetchableIterator<Map.Entry<K, V>>() {
+ PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
+ PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ persistedEntries,
+ entry ->
+ Iterables.transform(
+ entry.getValue(),
+ value ->
Maps.immutableEntry(entry.getKey(), value)))))
+ .iterator();
+ Iterator<Map.Entry<K, V>> pendingAddsNowIterator;
+ boolean hasNext;
+ Map.Entry<K, V> nextEntry;
+
+ @Override
+ public boolean isReady() {
+ return persistedEntriesIterator.isReady();
+ }
+
+ @Override
+ public void prefetch() {
+ if (!isReady()) {
+ persistedEntriesIterator.prefetch();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (hasNext) {
+ return true;
+ }
+
+ while (persistedEntriesIterator.hasNext()) {
+ Map.Entry<K, V> nextPersistedEntry =
persistedEntriesIterator.next();
+ Object nextKeyStructuralValue =
+ mapKeyCoder.structuralValue(nextPersistedEntry.getKey());
+ if (!pendingRemovesNow.contains(nextKeyStructuralValue)) {
+ nextEntry =
+ Maps.immutableEntry(nextPersistedEntry.getKey(),
nextPersistedEntry.getValue());
+ hasNext = true;
+ return true;
+ }
+ }
+
+ if (pendingAddsNowIterator == null) {
+ pendingAddsNowIterator =
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue(),
+ value ->
Maps.immutableEntry(entry.getKey(), value))))
+ .iterator();
+ }
+ while (pendingAddsNowIterator.hasNext()) {
+ nextEntry = pendingAddsNowIterator.next();
+ hasNext = true;
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public Map.Entry<K, V> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ hasNext = false;
+ return nextEntry;
Review Comment:
should clear nextEntry to release memory it may hold
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
+ // Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
+ // it was created.
+ Map<K, List<V>> pendingAddsNow = new HashMap<>();
Review Comment:
use ImmutableMap.Builder?
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapUserStateTest.java:
##########
@@ -179,6 +184,66 @@ public void testKeys() throws Exception {
assertThrows(IllegalStateException.class, () -> userState.keys());
}
+ @Test
+ public void testEntries() throws Exception {
+ FakeBeamFnStateClient fakeClient =
+ new FakeBeamFnStateClient(
+ ImmutableMap.of(
+ createMultimapEntriesStateKey(),
+ KV.of(
+ KvCoder.of(ByteArrayCoder.of(),
IterableCoder.of(StringUtf8Coder.of())),
+ asList(KV.of(A1, asList("V1", "V2")), KV.of(A2,
asList("V3"))))));
+ MultimapUserState<byte[], String> userState =
+ new MultimapUserState<>(
+ Caches.noop(),
+ fakeClient,
+ "instructionId",
+ createMultimapKeyStateKey(),
+ ByteArrayCoder.of(),
+ StringUtf8Coder.of());
+
+ assertArrayEquals(A1, userState.entries().iterator().next().getKey());
+ assertThat(
+ StreamSupport.stream(userState.entries().spliterator(), false)
+ .map(entry -> KV.of(ByteString.copyFrom(entry.getKey()),
entry.getValue()))
+ .collect(Collectors.toList()),
+ containsInAnyOrder(
+ KV.of(ByteString.copyFrom(A1), "V1"),
+ KV.of(ByteString.copyFrom(A1), "V2"),
+ KV.of(ByteString.copyFrom(A2), "V3")));
+
+ userState.put(A1, "V4");
+ assertThat(
Review Comment:
should we keep a handle to previous userSTate.entries() result or iterator
we partially iterated and verify that it is still valid and doesn't contain the
new adds/removes?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
+ // Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
+ // it was created.
+ Map<K, List<V>> pendingAddsNow = new HashMap<>();
+ for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+ pendingAddsNow.put(
+ entry.getValue().getKey(), new ArrayList<>()); //
entry.getValue().getValue());
+ for (V value : entry.getValue().getValue()) {
+ List<V> values = pendingAddsNow.get(entry.getValue().getKey());
+ values.add(value);
+ }
+ }
+ return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
+ @Override
+ public PrefetchableIterator<Map.Entry<K, V>> createIterator() {
+ return new PrefetchableIterator<Map.Entry<K, V>>() {
+ PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
+ PrefetchableIterables.concat(
+ Iterables.concat(
Review Comment:
Would
PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
PrefetchableIterables.maybePrefetchable(Iterables.transform(
persistedEntries,
entry ->
Iterables.transform(
entry.getValue(),
value ->
Maps.immutableEntry(entry.getKey(), value)))
work? seems like single iterable so don't need the two concats. If I'm
missing something it at least seems like you could just have
PrefetchableIterables.concat and not the other one.
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
Review Comment:
ImmutableSet
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java:
##########
@@ -235,6 +254,122 @@ public K next() {
};
}
+ @SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/21068)
+ })
+ /*
+ * Returns an Iterable containing all <K, V> entries in this multimap.
+ */
+ public PrefetchableIterable<Map.Entry<K, V>> entries() {
+ checkState(
+ !isClosed,
+ "Multimap user state is no longer usable because it is closed for %s",
+ keysStateRequest.getStateKey());
+ if (isCleared) {
+ Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>(pendingAdds);
+ return PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ pendingAddsNow.entrySet(),
+ entry ->
+ Iterables.transform(
+ entry.getValue().getValue(),
+ value ->
Maps.immutableEntry(entry.getValue().getKey(), value)))));
+ }
+
+ Set<Object> pendingRemovesNow = new HashSet<>(pendingRemoves.keySet());
+ // Make a deep copy of pendingAdds so this iterator represents a snapshot
of state at the time
+ // it was created.
+ Map<K, List<V>> pendingAddsNow = new HashMap<>();
+ for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
+ pendingAddsNow.put(
+ entry.getValue().getKey(), new ArrayList<>()); //
entry.getValue().getValue());
+ for (V value : entry.getValue().getValue()) {
+ List<V> values = pendingAddsNow.get(entry.getValue().getKey());
+ values.add(value);
+ }
+ }
+ return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
+ @Override
+ public PrefetchableIterator<Map.Entry<K, V>> createIterator() {
+ return new PrefetchableIterator<Map.Entry<K, V>>() {
+ PrefetchableIterator<Map.Entry<K, V>> persistedEntriesIterator =
+ PrefetchableIterables.concat(
+ Iterables.concat(
+ Iterables.transform(
+ persistedEntries,
+ entry ->
+ Iterables.transform(
+ entry.getValue(),
+ value ->
Maps.immutableEntry(entry.getKey(), value)))))
+ .iterator();
+ Iterator<Map.Entry<K, V>> pendingAddsNowIterator;
+ boolean hasNext;
+ Map.Entry<K, V> nextEntry;
+
+ @Override
+ public boolean isReady() {
+ return persistedEntriesIterator.isReady();
+ }
+
+ @Override
+ public void prefetch() {
+ if (!isReady()) {
+ persistedEntriesIterator.prefetch();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (hasNext) {
+ return true;
+ }
+
+ while (persistedEntriesIterator.hasNext()) {
+ Map.Entry<K, V> nextPersistedEntry =
persistedEntriesIterator.next();
+ Object nextKeyStructuralValue =
+ mapKeyCoder.structuralValue(nextPersistedEntry.getKey());
+ if (!pendingRemovesNow.contains(nextKeyStructuralValue)) {
+ nextEntry =
+ Maps.immutableEntry(nextPersistedEntry.getKey(),
nextPersistedEntry.getValue());
+ hasNext = true;
+ return true;
+ }
+ }
+
+ if (pendingAddsNowIterator == null) {
Review Comment:
do we need to merge the persisted and local adds? Does the iterator have any
guarantees on ordering of elements for a key?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]