Hi , Malhar library has in-memory join operator and this can't process large amount of data because of checkpoint and Memory bottlenecks. To avoid these, I am proposing inner join operator using Managed State that is recently added to Malhar.
Details of Inner Join operator, Managed State and Design for Inner Join operator using Managed State are given below: Inner Join Operator -------------------------- Join Operator pairs tuples from two relational streams that matches the join condition and emits the paired tuples. For example, let's say S1 and S2 are two input streams and join condition is (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1 where a and b are tuples coming on S1 and S2 respectively, t1 is the time period Based on the above join condition, join operator needs to store tuples for "t1" time. Let's say JS1 and JS2 are their respective stores where these tuples are stored. Let (k1, v1) be the tuple coming on Stream S1. Following are the steps to be done for join operation: 1. Insert (k1, v1) into the store JS1. 2. For Key k1, get the values from store JS2. 3. If step 2 returns non-null set, apply join conditions The similar procedure is applied if the tuple is on Stream S2, swapping JS1 and JS2 in the steps above. Managed State -------------------- Managed State is an incremental check-pointing and fault-tolerant key value data structure. For more info about Managed State, please have a look at the below link: https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed Abstract Join Operator -------------------------------- Abstract implementation of Join operator is available at com.datatorrent.lib.join package. JoinStore is a store interface used in join operator and available at the same package. For more details about the join operator, please have a look at the below link: https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join Design of Join Operator Using Managed State ------------------------------------------------------------- We need to provide a concrete store using Managed State and would be like as below: public class ManagedJoinStore extends ManagedTimeStateImpl implements JoinStore { } Data in Managed store would be in the form of Map of byte[] to byte[]. If the value is list, then inserting (k1, v1) into the store would be the following steps: (1) For the key k1, get the value from store JS2. (Here we need to search it in all the time buckets, if the key is not present in Memory) (2) Convert the above value to List. (3) Add the value v1 to the above list. (4) Convert the new list to slice. (5) Insert (k1, new slice) into the Managed state. Values as list has been discussed here: https://issues.apache.org/jira/browse/APEXMALHAR-2026 Based on above JIRA, I am suggesting SpillableMultiMap data structure for list of values. Let the store would be SpillableMultiMapJoinStore. Based on above details, store is categorized based on key type. Key Store -------------------------------------------------------------------------- Primary ManagedJoinStore Not Primary SpillableMultiMapJoinStore Following additional properties would be exposed by the Join operator: - isPrimaryKey - whether the key is primary or not. - noOfBuckets - Number of key buckets. This parameter is required for Managed State. Please provide your suggestions. Regards, Chaitanya
