Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/345#discussion_r71473044 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java --- @@ -0,0 +1,198 @@ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.Spillable; +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by david on 7/15/16. + */ +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V> +{ + @NotNull + private SpillableStateStore store; + private SpillableComplexComponentImpl sccImpl; + private long bucket; + @NotNull + private String identifier; + @NotNull + private Serde<Window, Slice> windowSerde; + @NotNull + private Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde; + @NotNull + private Serde<K, Slice> keySerde; + @NotNull + private Serde<V, Slice> valueSerde; + + protected Spillable.SpillableByteMap<ImmutablePair<Window, K>, V> internValues; + protected Spillable.SpillableByteArrayListMultimap<Window, K> internKeys; + + public SpillableWindowedKeyedStorage() + { + } + + public SpillableWindowedKeyedStorage(long bucket, String identifier, + Serde<Window, Slice> windowSerde, Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde) + { + this.bucket = bucket; + this.identifier = identifier; + this.windowSerde = windowSerde; + this.windowKeyPairSerde = windowKeyPairSerde; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public void setStore(SpillableStateStore store) + { + this.store = store; + } + + public void setBucket(long bucket) + { + this.bucket = bucket; + } + + public void setIdentifier(String identifier) + { + this.identifier = identifier; + } + + public void setWindowSerde(Serde<Window, Slice> windowSerde) + { + this.windowSerde = windowSerde; + } + + public void setWindowKeyPairSerde(Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde) + { + this.windowKeyPairSerde = windowKeyPairSerde; + } + + public void setValueSerde(Serde<V, Slice> valueSerde) + { + this.valueSerde = valueSerde; + } + + @Override + public boolean containsWindow(Window window) + { + return internKeys.containsKey(window); + } + + @Override + public long size() + { + return internKeys.size(); + } + + @Override + public void remove(Window window) + { + List<K> keys = internKeys.get(window); + for (K key : keys) { + internValues.remove(new ImmutablePair<>(window, key)); + } + internKeys.removeAll(window); + } + + @Override + public void migrateWindow(Window fromWindow, Window toWindow) + { + List<K> keys = internKeys.get(fromWindow); + internValues.remove(toWindow); + for (K key : keys) { + internKeys.put(toWindow, key); + ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key); + ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key); + + V value = internValues.get(oldKey); + internValues.remove(oldKey); + internValues.put(newKey, value); + } + internKeys.removeAll(fromWindow); + } + + + @Override + public void setup(Context.OperatorContext context) + { + if (store == null) { + // provide a default managed state store + store = new ManagedStateSpillableStateStore(); + } + sccImpl = new SpillableComplexComponentImpl(store); + internValues = sccImpl.newSpillableByteMap((identifier + "#values").getBytes(), bucket, windowKeyPairSerde, valueSerde); --- End diff -- There is a constraint on the identifiers right now that they have to be the same number of bytes in length. Also you want the identifiers to be small since all the keys stored in managed state will be prefixed with it. For this case you can not specify an identifier. If you don't specify an identifier a single byte identifier will automatically generated for each data structure.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---