Hi Chaitanya,
I am NOT suggesting that you use the interface TimeSliceBucketedState.
I don't see the need of having a JoinStore abstraction.
There will be SpillableArrayListMultimap implementation on which you can
set "ManagedTimeUnifiedStateImpl" as the persistent store.
This API of SpillableArrayListMultimap is sufficient for the use case.
You can directly use this implementation of SpillableArrayListMultimap in
the Join operator. Here is a simple example:
class InnerJoinOperator
{
SpillableArrayListMultiMap stream1Data = new
SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp);
port1.process (tuple) {
stream1Data.put(tuple.getKey(), tuple.getVal());
}
}
Chandni
On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu <
[email protected]> wrote:
> Hi Chandni,
>
> I think you are suggesting about interface "TimeSlicedBucketedState". I
> feel this is tightly coupled with the Managed State.
> In "TimeSlicedBucketedState" abstraction, bucketId parameter relates to
> the Managed State and this is not needed for join operator.
>
> Regards,
> Chaitanya
>
> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <[email protected]>
> wrote:
>
> > Chaitanya,
> >
> > SpillableArrayListMultimap will provide gives you similar abstraction.
> >
> > Why do we need another abstraction "Join Store" ?
> >
> > Chandni
> >
> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
> > [email protected]> wrote:
> >
> > > Chandni,
> > >
> > > JoinStore is an interface and consists of following methods:
> > > 1) boolean put(Object key, long time, Object value) => Insert (Key,
> > Value)
> > > pair into the store.
> > > 2) List<> getTuples(Object key) => Return the list of values from store
> > to
> > > which the specified key is mapped.
> > >
> > > JoinStore is a plug-able to the join operator. Below properties
> > exposed
> > > from join operator:
> > > JoinStore leftStore, rightStore.
> > >
> > > By default, leftStore & rightStore would be the Join Store using
> > spillable
> > > data-structures over ManagedState. If the user wants to integrate
> > different
> > > store, he/she has to implements the JoinStore and set it to the Join
> > > operator.
> > >
> > >
> > > Tim,
> > >
> > > I am not planning any different implementation.
> > > Here, I proposed to plug the
> SpillableArrayListMultiMap/SplillableMap
> > > over managed state. I think this is what you are developing. Please
> > correct
> > > it, if I am wrong.
> > >
> > > Regards,
> > > Chaitanya
> > >
> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> > > [email protected]> wrote:
> > >
> > > > Chaitanya,
> > > >
> > > > Are you planning to use the SpillableMap and
> SpillableArrayListMultiMap
> > > > that are in development, or separate implementations? If you are
> > planning
> > > > on using separate implementations can you please explain why they are
> > > > needed?
> > > >
> > > > Thanks,
> > > > Tim
> > > >
> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> > > > [email protected]> wrote:
> > > >
> > > > > Hi Chandni,
> > > > >
> > > > > I am suggesting two stores and categorized those based on key
> type:
> > > > > (1) If the key is Primary Key then store would be ManagedJoinStore
> or
> > > > Join
> > > > > store using SpillableMap.
> > > > > (2) If the key is not the Primary Key then store would be Join
> store
> > > > using
> > > > > SpillableArrayListMultiMap.
> > > > >
> > > > > Regards,
> > > > > Chaitanya
> > > > >
> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> > > [email protected]
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Chaitanya,
> > > > > >
> > > > > > In the last discussion, we decided that for Join operation we
> > needed
> > > > the
> > > > > > Spillable DataStructures- specifically SpillableArrayListMultiMap
> > for
> > > > the
> > > > > > Join Operator.
> > > > > >
> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this. The
> pull
> > > > > request
> > > > > > for an in-memory implementation already exists:
> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262
> > > > > >
> > > > > > As commented on the ticket, Tim is working on the implementation
> of
> > > > > > SpillableArrayListMultiMap which uses ManagedState.
> > > > > >
> > > > > > The ManagedJoinStore seems to me like a duplicate of these
> > spoilable
> > > > data
> > > > > > structure. I think we should avoid creating multiple things that
> > > > provide
> > > > > > the same basic functionality.
> > > > > >
> > > > > > Can you please help review the spillable data structures pull
> > request
> > > > and
> > > > > > point out what you will need for Join that is missing there?
> > > > > >
> > > > > > Thanks,
> > > > > > Chandni
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> > > > > > [email protected]> wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > Please go through this design and share your suggestions.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Chaitanya
> > > > > > >
> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
> > > > [email protected]>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1.
> > > > > > > >
> > > > > > > > This is one of the common use cases in batch scenarios and it
> > > will
> > > > be
> > > > > > > great
> > > > > > > > to have this in a stream as well using managed state,
> > > > > spillableMultiMap
> > > > > > > > structure.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Mohit
> > > > > > > >
> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> > > > > > > > [email protected]
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > +1 for the Proposal.
> > > > > > > > >
> > > > > > > > > This will be useful for joins involving large amount of
> data
> > to
> > > > be
> > > > > > held
> > > > > > > > > intermediately.
> > > > > > > > >
> > > > > > > > > ~ Yogi
> > > > > > > > >
> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <
> > > > > [email protected]
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi ,
> > > > > > > > > >
> > > > > > > > > > Malhar library has in-memory join operator and this
> > can't
> > > > > > process
> > > > > > > > > large
> > > > > > > > > > amount of data because of checkpoint and Memory
> > bottlenecks.
> > > To
> > > > > > avoid
> > > > > > > > > > these, I am proposing inner join operator using Managed
> > State
> > > > > that
> > > > > > is
> > > > > > > > > > recently added to Malhar.
> > > > > > > > > >
> > > > > > > > > > Details of Inner Join operator, Managed State and Design
> > for
> > > > > Inner
> > > > > > > Join
> > > > > > > > > > operator using Managed State are given below:
> > > > > > > > > >
> > > > > > > > > > Inner Join Operator
> > > > > > > > > > --------------------------
> > > > > > > > > > Join Operator pairs tuples from two relational streams
> that
> > > > > matches
> > > > > > > the
> > > > > > > > > > join condition and emits the paired tuples.
> > > > > > > > > >
> > > > > > > > > > For example, let's say S1 and S2 are two input streams
> and
> > > join
> > > > > > > > condition
> > > > > > > > > > is
> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> > > > > > > > > > where a and b are tuples coming on S1 and S2
> respectively,
> > t1
> > > > is
> > > > > > the
> > > > > > > > time
> > > > > > > > > > period
> > > > > > > > > >
> > > > > > > > > > Based on the above join condition, join operator needs to
> > > store
> > > > > > > tuples
> > > > > > > > > for
> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective
> > stores
> > > > > where
> > > > > > > > these
> > > > > > > > > > tuples are stored.
> > > > > > > > > >
> > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1. Following
> > are
> > > > the
> > > > > > > steps
> > > > > > > > > to
> > > > > > > > > > be done for join operation:
> > > > > > > > > >
> > > > > > > > > > 1. Insert (k1, v1) into the store JS1.
> > > > > > > > > > 2. For Key k1, get the values from store JS2.
> > > > > > > > > > 3. If step 2 returns non-null set, apply join
> conditions
> > > > > > > > > >
> > > > > > > > > > The similar procedure is applied if the tuple is on
> Stream
> > > S2,
> > > > > > > swapping
> > > > > > > > > JS1
> > > > > > > > > > and JS2 in the steps above.
> > > > > > > > > >
> > > > > > > > > > Managed State
> > > > > > > > > > --------------------
> > > > > > > > > > Managed State is an incremental check-pointing and
> > > > fault-tolerant
> > > > > > key
> > > > > > > > > value
> > > > > > > > > > data structure. For more info about Managed State, please
> > > have
> > > > a
> > > > > > look
> > > > > > > > at
> > > > > > > > > > the below link:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> > > > > > > > > >
> > > > > > > > > > Abstract Join Operator
> > > > > > > > > > --------------------------------
> > > > > > > > > > Abstract implementation of Join operator is available at
> > > > > > > > > > com.datatorrent.lib.join package. JoinStore is a store
> > > > interface
> > > > > > used
> > > > > > > > in
> > > > > > > > > > join operator and available at the same package.
> > > > > > > > > > For more details about the join operator, please have a
> > look
> > > > at
> > > > > > the
> > > > > > > > > below
> > > > > > > > > > link:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> > > > > > > > > >
> > > > > > > > > > Design of Join Operator Using Managed State
> > > > > > > > > >
> > -------------------------------------------------------------
> > > > > > > > > > We need to provide a concrete store using Managed State
> and
> > > > would
> > > > > > be
> > > > > > > > like
> > > > > > > > > > as below:
> > > > > > > > > >
> > > > > > > > > > public class ManagedJoinStore extends
> ManagedTimeStateImpl
> > > > > > implements
> > > > > > > > > > JoinStore
> > > > > > > > > > {
> > > > > > > > > > }
> > > > > > > > > >
> > > > > > > > > > Data in Managed store would be in the form of Map of
> byte[]
> > > to
> > > > > > > byte[].
> > > > > > > > > >
> > > > > > > > > > If the value is list, then inserting (k1, v1) into the
> > store
> > > > > would
> > > > > > be
> > > > > > > > the
> > > > > > > > > > following steps:
> > > > > > > > > > (1) For the key k1, get the value from store JS2. (Here
> we
> > > need
> > > > > to
> > > > > > > > search
> > > > > > > > > > it in all the time buckets, if the key is not present in
> > > > Memory)
> > > > > > > > > > (2) Convert the above value to List.
> > > > > > > > > > (3) Add the value v1 to the above list.
> > > > > > > > > > (4) Convert the new list to slice.
> > > > > > > > > > (5) Insert (k1, new slice) into the Managed state.
> > > > > > > > > >
> > > > > > > > > > Values as list has been discussed here:
> > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> > > > > > > > > >
> > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap
> data
> > > > > > structure
> > > > > > > > for
> > > > > > > > > > list of values. Let the store would be
> > > > > SpillableMultiMapJoinStore.
> > > > > > > > > >
> > > > > > > > > > Based on above details, store is categorized based on key
> > > type.
> > > > > > > > > >
> > > > > > > > > > Key
> > Store
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------
> > > > > > > > > > Primary
> ManagedJoinStore
> > > > > > > > > > Not Primary
> > > > > SpillableMultiMapJoinStore
> > > > > > > > > >
> > > > > > > > > > Following additional properties would be exposed by the
> > Join
> > > > > > > operator:
> > > > > > > > > >
> > > > > > > > > > - isPrimaryKey - whether the key is primary or not.
> > > > > > > > > > - noOfBuckets - Number of key buckets. This parameter
> is
> > > > > > required
> > > > > > > > for
> > > > > > > > > > Managed State.
> > > > > > > > > >
> > > > > > > > > > Please provide your suggestions.
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Chaitanya
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>