+1.

This is one of the common use cases in batch scenarios and it will be great
to have this in a stream as well using managed state, spillableMultiMap
structure.

Regards,
Mohit

On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <[email protected]
> wrote:

> +1 for the Proposal.
>
> This will be useful for joins involving large amount of data to be held
> intermediately.
>
> ~ Yogi
>
> On 9 May 2016 at 14:05, Chaitanya Chebolu <[email protected]>
> wrote:
>
> > Hi ,
> >
> >    Malhar library has in-memory join operator and this can't process
> large
> > amount of data because of checkpoint and Memory bottlenecks. To avoid
> > these, I am proposing inner join operator using Managed State that is
> > recently added to Malhar.
> >
> > Details of Inner Join operator, Managed State and Design for Inner Join
> > operator using Managed State are given below:
> >
> > Inner Join Operator
> > --------------------------
> > Join Operator pairs tuples from two relational streams that matches the
> > join condition and emits the paired tuples.
> >
> > For example, let's say S1 and S2 are two input streams and join condition
> > is
> > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> > where a and b are tuples coming on S1 and S2 respectively, t1 is the time
> > period
> >
> > Based on the above join condition, join operator needs to store tuples
> for
> > "t1" time. Let's say JS1 and JS2 are their respective stores where these
> > tuples are stored.
> >
> > Let (k1, v1) be the tuple coming on Stream S1.  Following are the steps
> to
> > be done for join operation:
> >
> >    1. Insert (k1, v1) into the store JS1.
> >    2. For Key k1, get the values from store JS2.
> >    3. If step 2 returns non-null set, apply join conditions
> >
> > The similar procedure is applied if the tuple is on Stream S2, swapping
> JS1
> > and JS2 in the steps above.
> >
> > Managed State
> > --------------------
> > Managed State is an incremental check-pointing and fault-tolerant key
> value
> > data structure. For more info about Managed State, please have a look at
> > the below link:
> >
> >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> >
> > Abstract Join Operator
> > --------------------------------
> > Abstract implementation of Join operator is available at
> > com.datatorrent.lib.join package. JoinStore is a store interface used in
> > join operator and available at the same package.
> > For more details about the join operator,  please have a look at the
> below
> > link:
> >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> >
> > Design of Join Operator Using Managed State
> > -------------------------------------------------------------
> > We need to provide a concrete store using Managed State and would be like
> > as below:
> >
> > public class ManagedJoinStore extends ManagedTimeStateImpl implements
> > JoinStore
> > {
> > }
> >
> > Data in Managed store would be in the form of Map of byte[] to byte[].
> >
> > If the value is list, then inserting (k1, v1) into the store would be the
> > following steps:
> > (1) For the key k1, get the value from store JS2. (Here we need to search
> > it in all the time buckets, if the key is not present in Memory)
> > (2) Convert the above value to List.
> > (3) Add the value v1 to the above list.
> > (4) Convert the new list to slice.
> > (5) Insert (k1, new slice) into the Managed state.
> >
> > Values as list has been discussed here:
> > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> >
> > Based on above JIRA, I am suggesting SpillableMultiMap data structure for
> > list of values. Let the store would be SpillableMultiMapJoinStore.
> >
> > Based on above details, store is categorized based on key type.
> >
> >          Key                                           Store
> >
> --------------------------------------------------------------------------
> > Primary                                    ManagedJoinStore
> > Not Primary                              SpillableMultiMapJoinStore
> >
> > Following additional properties would be exposed by the Join operator:
> >
> >    - isPrimaryKey - whether the key is primary or not.
> >    - noOfBuckets - Number of key buckets. This parameter is required for
> >    Managed State.
> >
> > Please provide your suggestions.
> >
> > Regards,
> > Chaitanya
> >
>

Reply via email to