zhengbuqian commented on code in PR #23491:
URL: https://github.com/apache/beam/pull/23491#discussion_r1036786010
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java:
##########
@@ -463,6 +476,121 @@ public InMemoryBag<T> copy() {
}
}
+ /** An {@link InMemoryState} implementation of {@link MultimapState}. */
+ public static final class InMemoryMultimap<K, V>
+ implements MultimapState<K, V>, InMemoryState<InMemoryMultimap<K, V>> {
+ private final Coder<K> keyCoder;
+ private final Coder<V> valueCoder;
+ private Multimap<Object, V> contents = ArrayListMultimap.create();
+ private Map<Object, K> structuralKeysMapping = Maps.newHashMap();
+
+ public InMemoryMultimap(Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public void clear() {
+ contents = ArrayListMultimap.create();
+ structuralKeysMapping = Maps.newHashMap();
+ }
+
+ @Override
+ public void put(K key, V value) {
+ Object structuralKey = keyCoder.structuralValue(key);
+ structuralKeysMapping.put(structuralKey, key);
+ contents.put(structuralKey, value);
+ }
+
+ @Override
+ public void remove(K key) {
+ Object structuralKey = keyCoder.structuralValue(key);
+ structuralKeysMapping.remove(structuralKey);
+ contents.removeAll(structuralKey);
+ }
+
+ @Override
+ public ReadableState<Iterable<V>> get(K key) {
+ return
CollectionViewState.of(contents.get(keyCoder.structuralValue(key)));
Review Comment:
Done, updated ParDoTest to include those as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]