Hey Tim, As I understand for Join operator, there needs to be a common abstract for a SpillableMap and SpillableArrayListMultipmap.
I suggested using SpillableComplexComponent. Is this correct? Thanks, Chandni On Wed, May 18, 2016 at 1:34 AM, Chandni Singh <[email protected]> wrote: > Hi Chaitanya, > > I am NOT suggesting that you use the interface TimeSliceBucketedState. > > I don't see the need of having a JoinStore abstraction. > > There will be SpillableArrayListMultimap implementation on which you can > set "ManagedTimeUnifiedStateImpl" as the persistent store. > This API of SpillableArrayListMultimap is sufficient for the use case. > > You can directly use this implementation of SpillableArrayListMultimap in > the Join operator. Here is a simple example: > > class InnerJoinOperator > { > SpillableArrayListMultiMap stream1Data = new > SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp); > > port1.process (tuple) { > stream1Data.put(tuple.getKey(), tuple.getVal()); > } > } > > > Chandni > > > > > On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu < > [email protected]> wrote: > >> Hi Chandni, >> >> I think you are suggesting about interface "TimeSlicedBucketedState". I >> feel this is tightly coupled with the Managed State. >> In "TimeSlicedBucketedState" abstraction, bucketId parameter relates to >> the Managed State and this is not needed for join operator. >> >> Regards, >> Chaitanya >> >> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <[email protected]> >> wrote: >> >> > Chaitanya, >> > >> > SpillableArrayListMultimap will provide gives you similar abstraction. >> > >> > Why do we need another abstraction "Join Store" ? >> > >> > Chandni >> > >> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu < >> > [email protected]> wrote: >> > >> > > Chandni, >> > > >> > > JoinStore is an interface and consists of following methods: >> > > 1) boolean put(Object key, long time, Object value) => Insert (Key, >> > Value) >> > > pair into the store. >> > > 2) List<> getTuples(Object key) => Return the list of values from >> store >> > to >> > > which the specified key is mapped. >> > > >> > > JoinStore is a plug-able to the join operator. Below properties >> > exposed >> > > from join operator: >> > > JoinStore leftStore, rightStore. >> > > >> > > By default, leftStore & rightStore would be the Join Store using >> > spillable >> > > data-structures over ManagedState. If the user wants to integrate >> > different >> > > store, he/she has to implements the JoinStore and set it to the Join >> > > operator. >> > > >> > > >> > > Tim, >> > > >> > > I am not planning any different implementation. >> > > Here, I proposed to plug the >> SpillableArrayListMultiMap/SplillableMap >> > > over managed state. I think this is what you are developing. Please >> > correct >> > > it, if I am wrong. >> > > >> > > Regards, >> > > Chaitanya >> > > >> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas < >> > > [email protected]> wrote: >> > > >> > > > Chaitanya, >> > > > >> > > > Are you planning to use the SpillableMap and >> SpillableArrayListMultiMap >> > > > that are in development, or separate implementations? If you are >> > planning >> > > > on using separate implementations can you please explain why they >> are >> > > > needed? >> > > > >> > > > Thanks, >> > > > Tim >> > > > >> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu < >> > > > [email protected]> wrote: >> > > > >> > > > > Hi Chandni, >> > > > > >> > > > > I am suggesting two stores and categorized those based on key >> type: >> > > > > (1) If the key is Primary Key then store would be >> ManagedJoinStore or >> > > > Join >> > > > > store using SpillableMap. >> > > > > (2) If the key is not the Primary Key then store would be Join >> store >> > > > using >> > > > > SpillableArrayListMultiMap. >> > > > > >> > > > > Regards, >> > > > > Chaitanya >> > > > > >> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh < >> > > [email protected] >> > > > > >> > > > > wrote: >> > > > > >> > > > > > Hi Chaitanya, >> > > > > > >> > > > > > In the last discussion, we decided that for Join operation we >> > needed >> > > > the >> > > > > > Spillable DataStructures- specifically >> SpillableArrayListMultiMap >> > for >> > > > the >> > > > > > Join Operator. >> > > > > > >> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this. The >> pull >> > > > > request >> > > > > > for an in-memory implementation already exists: >> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262 >> > > > > > >> > > > > > As commented on the ticket, Tim is working on the >> implementation of >> > > > > > SpillableArrayListMultiMap which uses ManagedState. >> > > > > > >> > > > > > The ManagedJoinStore seems to me like a duplicate of these >> > spoilable >> > > > data >> > > > > > structure. I think we should avoid creating multiple things that >> > > > provide >> > > > > > the same basic functionality. >> > > > > > >> > > > > > Can you please help review the spillable data structures pull >> > request >> > > > and >> > > > > > point out what you will need for Join that is missing there? >> > > > > > >> > > > > > Thanks, >> > > > > > Chandni >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu < >> > > > > > [email protected]> wrote: >> > > > > > >> > > > > > > Hi All, >> > > > > > > >> > > > > > > Please go through this design and share your suggestions. >> > > > > > > >> > > > > > > Regards, >> > > > > > > Chaitanya >> > > > > > > >> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani < >> > > > [email protected]> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > +1. >> > > > > > > > >> > > > > > > > This is one of the common use cases in batch scenarios and >> it >> > > will >> > > > be >> > > > > > > great >> > > > > > > > to have this in a stream as well using managed state, >> > > > > spillableMultiMap >> > > > > > > > structure. >> > > > > > > > >> > > > > > > > Regards, >> > > > > > > > Mohit >> > > > > > > > >> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra < >> > > > > > > > [email protected] >> > > > > > > > > wrote: >> > > > > > > > >> > > > > > > > > +1 for the Proposal. >> > > > > > > > > >> > > > > > > > > This will be useful for joins involving large amount of >> data >> > to >> > > > be >> > > > > > held >> > > > > > > > > intermediately. >> > > > > > > > > >> > > > > > > > > ~ Yogi >> > > > > > > > > >> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu < >> > > > > [email protected] >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi , >> > > > > > > > > > >> > > > > > > > > > Malhar library has in-memory join operator and this >> > can't >> > > > > > process >> > > > > > > > > large >> > > > > > > > > > amount of data because of checkpoint and Memory >> > bottlenecks. >> > > To >> > > > > > avoid >> > > > > > > > > > these, I am proposing inner join operator using Managed >> > State >> > > > > that >> > > > > > is >> > > > > > > > > > recently added to Malhar. >> > > > > > > > > > >> > > > > > > > > > Details of Inner Join operator, Managed State and Design >> > for >> > > > > Inner >> > > > > > > Join >> > > > > > > > > > operator using Managed State are given below: >> > > > > > > > > > >> > > > > > > > > > Inner Join Operator >> > > > > > > > > > -------------------------- >> > > > > > > > > > Join Operator pairs tuples from two relational streams >> that >> > > > > matches >> > > > > > > the >> > > > > > > > > > join condition and emits the paired tuples. >> > > > > > > > > > >> > > > > > > > > > For example, let's say S1 and S2 are two input streams >> and >> > > join >> > > > > > > > condition >> > > > > > > > > > is >> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1 >> > > > > > > > > > where a and b are tuples coming on S1 and S2 >> respectively, >> > t1 >> > > > is >> > > > > > the >> > > > > > > > time >> > > > > > > > > > period >> > > > > > > > > > >> > > > > > > > > > Based on the above join condition, join operator needs >> to >> > > store >> > > > > > > tuples >> > > > > > > > > for >> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective >> > stores >> > > > > where >> > > > > > > > these >> > > > > > > > > > tuples are stored. >> > > > > > > > > > >> > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1. >> Following >> > are >> > > > the >> > > > > > > steps >> > > > > > > > > to >> > > > > > > > > > be done for join operation: >> > > > > > > > > > >> > > > > > > > > > 1. Insert (k1, v1) into the store JS1. >> > > > > > > > > > 2. For Key k1, get the values from store JS2. >> > > > > > > > > > 3. If step 2 returns non-null set, apply join >> conditions >> > > > > > > > > > >> > > > > > > > > > The similar procedure is applied if the tuple is on >> Stream >> > > S2, >> > > > > > > swapping >> > > > > > > > > JS1 >> > > > > > > > > > and JS2 in the steps above. >> > > > > > > > > > >> > > > > > > > > > Managed State >> > > > > > > > > > -------------------- >> > > > > > > > > > Managed State is an incremental check-pointing and >> > > > fault-tolerant >> > > > > > key >> > > > > > > > > value >> > > > > > > > > > data structure. For more info about Managed State, >> please >> > > have >> > > > a >> > > > > > look >> > > > > > > > at >> > > > > > > > > > the below link: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed >> > > > > > > > > > >> > > > > > > > > > Abstract Join Operator >> > > > > > > > > > -------------------------------- >> > > > > > > > > > Abstract implementation of Join operator is available at >> > > > > > > > > > com.datatorrent.lib.join package. JoinStore is a store >> > > > interface >> > > > > > used >> > > > > > > > in >> > > > > > > > > > join operator and available at the same package. >> > > > > > > > > > For more details about the join operator, please have a >> > look >> > > > at >> > > > > > the >> > > > > > > > > below >> > > > > > > > > > link: >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join >> > > > > > > > > > >> > > > > > > > > > Design of Join Operator Using Managed State >> > > > > > > > > > >> > ------------------------------------------------------------- >> > > > > > > > > > We need to provide a concrete store using Managed State >> and >> > > > would >> > > > > > be >> > > > > > > > like >> > > > > > > > > > as below: >> > > > > > > > > > >> > > > > > > > > > public class ManagedJoinStore extends >> ManagedTimeStateImpl >> > > > > > implements >> > > > > > > > > > JoinStore >> > > > > > > > > > { >> > > > > > > > > > } >> > > > > > > > > > >> > > > > > > > > > Data in Managed store would be in the form of Map of >> byte[] >> > > to >> > > > > > > byte[]. >> > > > > > > > > > >> > > > > > > > > > If the value is list, then inserting (k1, v1) into the >> > store >> > > > > would >> > > > > > be >> > > > > > > > the >> > > > > > > > > > following steps: >> > > > > > > > > > (1) For the key k1, get the value from store JS2. (Here >> we >> > > need >> > > > > to >> > > > > > > > search >> > > > > > > > > > it in all the time buckets, if the key is not present in >> > > > Memory) >> > > > > > > > > > (2) Convert the above value to List. >> > > > > > > > > > (3) Add the value v1 to the above list. >> > > > > > > > > > (4) Convert the new list to slice. >> > > > > > > > > > (5) Insert (k1, new slice) into the Managed state. >> > > > > > > > > > >> > > > > > > > > > Values as list has been discussed here: >> > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026 >> > > > > > > > > > >> > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap >> data >> > > > > > structure >> > > > > > > > for >> > > > > > > > > > list of values. Let the store would be >> > > > > SpillableMultiMapJoinStore. >> > > > > > > > > > >> > > > > > > > > > Based on above details, store is categorized based on >> key >> > > type. >> > > > > > > > > > >> > > > > > > > > > Key >> > Store >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> -------------------------------------------------------------------------- >> > > > > > > > > > Primary >> ManagedJoinStore >> > > > > > > > > > Not Primary >> > > > > SpillableMultiMapJoinStore >> > > > > > > > > > >> > > > > > > > > > Following additional properties would be exposed by the >> > Join >> > > > > > > operator: >> > > > > > > > > > >> > > > > > > > > > - isPrimaryKey - whether the key is primary or not. >> > > > > > > > > > - noOfBuckets - Number of key buckets. This >> parameter is >> > > > > > required >> > > > > > > > for >> > > > > > > > > > Managed State. >> > > > > > > > > > >> > > > > > > > > > Please provide your suggestions. >> > > > > > > > > > >> > > > > > > > > > Regards, >> > > > > > > > > > Chaitanya >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >
