Hi ,

   Malhar library has in-memory join operator and this can't process large
amount of data because of checkpoint and Memory bottlenecks. To avoid
these, I am proposing inner join operator using Managed State that is
recently added to Malhar.

Details of Inner Join operator, Managed State and Design for Inner Join
operator using Managed State are given below:

Inner Join Operator
--------------------------
Join Operator pairs tuples from two relational streams that matches the
join condition and emits the paired tuples.

For example, let's say S1 and S2 are two input streams and join condition
is
(S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
where a and b are tuples coming on S1 and S2 respectively, t1 is the time
period

Based on the above join condition, join operator needs to store tuples for
"t1" time. Let's say JS1 and JS2 are their respective stores where these
tuples are stored.

Let (k1, v1) be the tuple coming on Stream S1.  Following are the steps to
be done for join operation:

   1. Insert (k1, v1) into the store JS1.
   2. For Key k1, get the values from store JS2.
   3. If step 2 returns non-null set, apply join conditions

The similar procedure is applied if the tuple is on Stream S2, swapping JS1
and JS2 in the steps above.

Managed State
--------------------
Managed State is an incremental check-pointing and fault-tolerant key value
data structure. For more info about Managed State, please have a look at
the below link:

https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed

Abstract Join Operator
--------------------------------
Abstract implementation of Join operator is available at
com.datatorrent.lib.join package. JoinStore is a store interface used in
join operator and available at the same package.
For more details about the join operator,  please have a look at the below
link:
https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join

Design of Join Operator Using Managed State
-------------------------------------------------------------
We need to provide a concrete store using Managed State and would be like
as below:

public class ManagedJoinStore extends ManagedTimeStateImpl implements
JoinStore
{
}

Data in Managed store would be in the form of Map of byte[] to byte[].

If the value is list, then inserting (k1, v1) into the store would be the
following steps:
(1) For the key k1, get the value from store JS2. (Here we need to search
it in all the time buckets, if the key is not present in Memory)
(2) Convert the above value to List.
(3) Add the value v1 to the above list.
(4) Convert the new list to slice.
(5) Insert (k1, new slice) into the Managed state.

Values as list has been discussed here:
https://issues.apache.org/jira/browse/APEXMALHAR-2026

Based on above JIRA, I am suggesting SpillableMultiMap data structure for
list of values. Let the store would be SpillableMultiMapJoinStore.

Based on above details, store is categorized based on key type.

         Key                                           Store
--------------------------------------------------------------------------
Primary                                    ManagedJoinStore
Not Primary                              SpillableMultiMapJoinStore

Following additional properties would be exposed by the Join operator:

   - isPrimaryKey - whether the key is primary or not.
   - noOfBuckets - Number of key buckets. This parameter is required for
   Managed State.

Please provide your suggestions.

Regards,
Chaitanya

Reply via email to