Chaitanya, SpillableArrayListMultimap will provide gives you similar abstraction.
Why do we need another abstraction "Join Store" ? Chandni On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu < [email protected]> wrote: > Chandni, > > JoinStore is an interface and consists of following methods: > 1) boolean put(Object key, long time, Object value) => Insert (Key, Value) > pair into the store. > 2) List<> getTuples(Object key) => Return the list of values from store to > which the specified key is mapped. > > JoinStore is a plug-able to the join operator. Below properties exposed > from join operator: > JoinStore leftStore, rightStore. > > By default, leftStore & rightStore would be the Join Store using spillable > data-structures over ManagedState. If the user wants to integrate different > store, he/she has to implements the JoinStore and set it to the Join > operator. > > > Tim, > > I am not planning any different implementation. > Here, I proposed to plug the SpillableArrayListMultiMap/SplillableMap > over managed state. I think this is what you are developing. Please correct > it, if I am wrong. > > Regards, > Chaitanya > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas < > [email protected]> wrote: > > > Chaitanya, > > > > Are you planning to use the SpillableMap and SpillableArrayListMultiMap > > that are in development, or separate implementations? If you are planning > > on using separate implementations can you please explain why they are > > needed? > > > > Thanks, > > Tim > > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu < > > [email protected]> wrote: > > > > > Hi Chandni, > > > > > > I am suggesting two stores and categorized those based on key type: > > > (1) If the key is Primary Key then store would be ManagedJoinStore or > > Join > > > store using SpillableMap. > > > (2) If the key is not the Primary Key then store would be Join store > > using > > > SpillableArrayListMultiMap. > > > > > > Regards, > > > Chaitanya > > > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh < > [email protected] > > > > > > wrote: > > > > > > > Hi Chaitanya, > > > > > > > > In the last discussion, we decided that for Join operation we needed > > the > > > > Spillable DataStructures- specifically SpillableArrayListMultiMap for > > the > > > > Join Operator. > > > > > > > > Tim had created a ticket APEXMALHAR-2070 to address this. The pull > > > request > > > > for an in-memory implementation already exists: > > > > https://github.com/apache/incubator-apex-malhar/pull/262 > > > > > > > > As commented on the ticket, Tim is working on the implementation of > > > > SpillableArrayListMultiMap which uses ManagedState. > > > > > > > > The ManagedJoinStore seems to me like a duplicate of these spoilable > > data > > > > structure. I think we should avoid creating multiple things that > > provide > > > > the same basic functionality. > > > > > > > > Can you please help review the spillable data structures pull request > > and > > > > point out what you will need for Join that is missing there? > > > > > > > > Thanks, > > > > Chandni > > > > > > > > > > > > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu < > > > > [email protected]> wrote: > > > > > > > > > Hi All, > > > > > > > > > > Please go through this design and share your suggestions. > > > > > > > > > > Regards, > > > > > Chaitanya > > > > > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani < > > [email protected]> > > > > > wrote: > > > > > > > > > > > +1. > > > > > > > > > > > > This is one of the common use cases in batch scenarios and it > will > > be > > > > > great > > > > > > to have this in a stream as well using managed state, > > > spillableMultiMap > > > > > > structure. > > > > > > > > > > > > Regards, > > > > > > Mohit > > > > > > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra < > > > > > > [email protected] > > > > > > > wrote: > > > > > > > > > > > > > +1 for the Proposal. > > > > > > > > > > > > > > This will be useful for joins involving large amount of data to > > be > > > > held > > > > > > > intermediately. > > > > > > > > > > > > > > ~ Yogi > > > > > > > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu < > > > [email protected] > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi , > > > > > > > > > > > > > > > > Malhar library has in-memory join operator and this can't > > > > process > > > > > > > large > > > > > > > > amount of data because of checkpoint and Memory bottlenecks. > To > > > > avoid > > > > > > > > these, I am proposing inner join operator using Managed State > > > that > > > > is > > > > > > > > recently added to Malhar. > > > > > > > > > > > > > > > > Details of Inner Join operator, Managed State and Design for > > > Inner > > > > > Join > > > > > > > > operator using Managed State are given below: > > > > > > > > > > > > > > > > Inner Join Operator > > > > > > > > -------------------------- > > > > > > > > Join Operator pairs tuples from two relational streams that > > > matches > > > > > the > > > > > > > > join condition and emits the paired tuples. > > > > > > > > > > > > > > > > For example, let's say S1 and S2 are two input streams and > join > > > > > > condition > > > > > > > > is > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1 > > > > > > > > where a and b are tuples coming on S1 and S2 respectively, t1 > > is > > > > the > > > > > > time > > > > > > > > period > > > > > > > > > > > > > > > > Based on the above join condition, join operator needs to > store > > > > > tuples > > > > > > > for > > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective stores > > > where > > > > > > these > > > > > > > > tuples are stored. > > > > > > > > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1. Following are > > the > > > > > steps > > > > > > > to > > > > > > > > be done for join operation: > > > > > > > > > > > > > > > > 1. Insert (k1, v1) into the store JS1. > > > > > > > > 2. For Key k1, get the values from store JS2. > > > > > > > > 3. If step 2 returns non-null set, apply join conditions > > > > > > > > > > > > > > > > The similar procedure is applied if the tuple is on Stream > S2, > > > > > swapping > > > > > > > JS1 > > > > > > > > and JS2 in the steps above. > > > > > > > > > > > > > > > > Managed State > > > > > > > > -------------------- > > > > > > > > Managed State is an incremental check-pointing and > > fault-tolerant > > > > key > > > > > > > value > > > > > > > > data structure. For more info about Managed State, please > have > > a > > > > look > > > > > > at > > > > > > > > the below link: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed > > > > > > > > > > > > > > > > Abstract Join Operator > > > > > > > > -------------------------------- > > > > > > > > Abstract implementation of Join operator is available at > > > > > > > > com.datatorrent.lib.join package. JoinStore is a store > > interface > > > > used > > > > > > in > > > > > > > > join operator and available at the same package. > > > > > > > > For more details about the join operator, please have a look > > at > > > > the > > > > > > > below > > > > > > > > link: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join > > > > > > > > > > > > > > > > Design of Join Operator Using Managed State > > > > > > > > ------------------------------------------------------------- > > > > > > > > We need to provide a concrete store using Managed State and > > would > > > > be > > > > > > like > > > > > > > > as below: > > > > > > > > > > > > > > > > public class ManagedJoinStore extends ManagedTimeStateImpl > > > > implements > > > > > > > > JoinStore > > > > > > > > { > > > > > > > > } > > > > > > > > > > > > > > > > Data in Managed store would be in the form of Map of byte[] > to > > > > > byte[]. > > > > > > > > > > > > > > > > If the value is list, then inserting (k1, v1) into the store > > > would > > > > be > > > > > > the > > > > > > > > following steps: > > > > > > > > (1) For the key k1, get the value from store JS2. (Here we > need > > > to > > > > > > search > > > > > > > > it in all the time buckets, if the key is not present in > > Memory) > > > > > > > > (2) Convert the above value to List. > > > > > > > > (3) Add the value v1 to the above list. > > > > > > > > (4) Convert the new list to slice. > > > > > > > > (5) Insert (k1, new slice) into the Managed state. > > > > > > > > > > > > > > > > Values as list has been discussed here: > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026 > > > > > > > > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap data > > > > structure > > > > > > for > > > > > > > > list of values. Let the store would be > > > SpillableMultiMapJoinStore. > > > > > > > > > > > > > > > > Based on above details, store is categorized based on key > type. > > > > > > > > > > > > > > > > Key Store > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -------------------------------------------------------------------------- > > > > > > > > Primary ManagedJoinStore > > > > > > > > Not Primary > > > SpillableMultiMapJoinStore > > > > > > > > > > > > > > > > Following additional properties would be exposed by the Join > > > > > operator: > > > > > > > > > > > > > > > > - isPrimaryKey - whether the key is primary or not. > > > > > > > > - noOfBuckets - Number of key buckets. This parameter is > > > > required > > > > > > for > > > > > > > > Managed State. > > > > > > > > > > > > > > > > Please provide your suggestions. > > > > > > > > > > > > > > > > Regards, > > > > > > > > Chaitanya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
