+1 for the Proposal. This will be useful for joins involving large amount of data to be held intermediately.
~ Yogi On 9 May 2016 at 14:05, Chaitanya Chebolu <[email protected]> wrote: > Hi , > > Malhar library has in-memory join operator and this can't process large > amount of data because of checkpoint and Memory bottlenecks. To avoid > these, I am proposing inner join operator using Managed State that is > recently added to Malhar. > > Details of Inner Join operator, Managed State and Design for Inner Join > operator using Managed State are given below: > > Inner Join Operator > -------------------------- > Join Operator pairs tuples from two relational streams that matches the > join condition and emits the paired tuples. > > For example, let's say S1 and S2 are two input streams and join condition > is > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1 > where a and b are tuples coming on S1 and S2 respectively, t1 is the time > period > > Based on the above join condition, join operator needs to store tuples for > "t1" time. Let's say JS1 and JS2 are their respective stores where these > tuples are stored. > > Let (k1, v1) be the tuple coming on Stream S1. Following are the steps to > be done for join operation: > > 1. Insert (k1, v1) into the store JS1. > 2. For Key k1, get the values from store JS2. > 3. If step 2 returns non-null set, apply join conditions > > The similar procedure is applied if the tuple is on Stream S2, swapping JS1 > and JS2 in the steps above. > > Managed State > -------------------- > Managed State is an incremental check-pointing and fault-tolerant key value > data structure. For more info about Managed State, please have a look at > the below link: > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed > > Abstract Join Operator > -------------------------------- > Abstract implementation of Join operator is available at > com.datatorrent.lib.join package. JoinStore is a store interface used in > join operator and available at the same package. > For more details about the join operator, please have a look at the below > link: > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join > > Design of Join Operator Using Managed State > ------------------------------------------------------------- > We need to provide a concrete store using Managed State and would be like > as below: > > public class ManagedJoinStore extends ManagedTimeStateImpl implements > JoinStore > { > } > > Data in Managed store would be in the form of Map of byte[] to byte[]. > > If the value is list, then inserting (k1, v1) into the store would be the > following steps: > (1) For the key k1, get the value from store JS2. (Here we need to search > it in all the time buckets, if the key is not present in Memory) > (2) Convert the above value to List. > (3) Add the value v1 to the above list. > (4) Convert the new list to slice. > (5) Insert (k1, new slice) into the Managed state. > > Values as list has been discussed here: > https://issues.apache.org/jira/browse/APEXMALHAR-2026 > > Based on above JIRA, I am suggesting SpillableMultiMap data structure for > list of values. Let the store would be SpillableMultiMapJoinStore. > > Based on above details, store is categorized based on key type. > > Key Store > -------------------------------------------------------------------------- > Primary ManagedJoinStore > Not Primary SpillableMultiMapJoinStore > > Following additional properties would be exposed by the Join operator: > > - isPrimaryKey - whether the key is primary or not. > - noOfBuckets - Number of key buckets. This parameter is required for > Managed State. > > Please provide your suggestions. > > Regards, > Chaitanya >
