Thanks Chandni and Tim for the valuable inputs. I will use SpillableComplexComponent as common abstraction.
Created a JIRA for this task: https://issues.apache.org/jira/browse/APEXMALHAR-2100. Regards, Chaitanya On Fri, May 20, 2016 at 11:39 AM, Timothy Farkas < timothytiborfar...@gmail.com> wrote: > Hi Chandni, > > That is correct. I will provide some explanation about the motivation and > usage of that interface: > > Goals: > > The goal of SpillableComplexComponent is to provide an interface for > creating Spillable datastructures. It is essentially the interface for a > factory class which produces Spillable datastructures. By having a factory > interface it allows different backends to be plugged into operators very > easily. > > Going into more detail for your use case SpillableComplexComponent has two > factory methods newSpillableByteMap and newSpillableByteArrayListMultimap. > These two methods are factory methods which return an implementation of the > SpillabeByteMap and SpillableArrayListMultimap interfaces. > > Usage: > > Setting backend on an operator: > > myOperator.setStore(new InMemorySpillableComplexComponent()) > //myOperator.setStore(new ManagedStateSpillableComplexComponent()) > //myOperator.setStore(new HbaseSpillableComplexComponent()) > > Using the factory in an operator > > setup() { > map = store.newSpillableByteMap() > } > > As you can see you can set the factory on an operator, then the operator > can use the factory to create a Spillable datastructure. The operator is > agnostic to the store which manages the data for Spillable datastructures. > If you want the data to be stored in managed state simply set a > ManagedState implementation of SpillableComplexComponent, if you want the > data to be stored in Cassandra simply set a Cassandra implementation of > SpillableComplexComponent on the operator. The code using the spillable > datastructures is independent of the backend used to store the data with > this design > > Thanks, > Tim > > On Thu, May 19, 2016 at 9:29 PM, Chandni Singh <singh.chan...@gmail.com> > wrote: > > > Hey Tim, > > > > As I understand for Join operator, there needs to be a common abstract > for > > a SpillableMap and SpillableArrayListMultipmap. > > > > I suggested using SpillableComplexComponent. Is this correct? > > > > Thanks, > > Chandni > > > > On Wed, May 18, 2016 at 1:34 AM, Chandni Singh <singh.chan...@gmail.com> > > wrote: > > > > > Hi Chaitanya, > > > > > > I am NOT suggesting that you use the interface TimeSliceBucketedState. > > > > > > I don't see the need of having a JoinStore abstraction. > > > > > > There will be SpillableArrayListMultimap implementation on which you > can > > > set "ManagedTimeUnifiedStateImpl" as the persistent store. > > > This API of SpillableArrayListMultimap is sufficient for the use case. > > > > > > You can directly use this implementation of SpillableArrayListMultimap > in > > > the Join operator. Here is a simple example: > > > > > > class InnerJoinOperator > > > { > > > SpillableArrayListMultiMap stream1Data = new > > > SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp); > > > > > > port1.process (tuple) { > > > stream1Data.put(tuple.getKey(), tuple.getVal()); > > > } > > > } > > > > > > > > > Chandni > > > > > > > > > > > > > > > On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu < > > > chaita...@datatorrent.com> wrote: > > > > > >> Hi Chandni, > > >> > > >> I think you are suggesting about interface > > "TimeSlicedBucketedState". I > > >> feel this is tightly coupled with the Managed State. > > >> In "TimeSlicedBucketedState" abstraction, bucketId parameter > relates > > to > > >> the Managed State and this is not needed for join operator. > > >> > > >> Regards, > > >> Chaitanya > > >> > > >> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh < > > singh.chan...@gmail.com> > > >> wrote: > > >> > > >> > Chaitanya, > > >> > > > >> > SpillableArrayListMultimap will provide gives you similar > abstraction. > > >> > > > >> > Why do we need another abstraction "Join Store" ? > > >> > > > >> > Chandni > > >> > > > >> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu < > > >> > chaita...@datatorrent.com> wrote: > > >> > > > >> > > Chandni, > > >> > > > > >> > > JoinStore is an interface and consists of following methods: > > >> > > 1) boolean put(Object key, long time, Object value) => Insert > (Key, > > >> > Value) > > >> > > pair into the store. > > >> > > 2) List<> getTuples(Object key) => Return the list of values from > > >> store > > >> > to > > >> > > which the specified key is mapped. > > >> > > > > >> > > JoinStore is a plug-able to the join operator. Below properties > > >> > exposed > > >> > > from join operator: > > >> > > JoinStore leftStore, rightStore. > > >> > > > > >> > > By default, leftStore & rightStore would be the Join Store using > > >> > spillable > > >> > > data-structures over ManagedState. If the user wants to integrate > > >> > different > > >> > > store, he/she has to implements the JoinStore and set it to the > Join > > >> > > operator. > > >> > > > > >> > > > > >> > > Tim, > > >> > > > > >> > > I am not planning any different implementation. > > >> > > Here, I proposed to plug the > > >> SpillableArrayListMultiMap/SplillableMap > > >> > > over managed state. I think this is what you are developing. > Please > > >> > correct > > >> > > it, if I am wrong. > > >> > > > > >> > > Regards, > > >> > > Chaitanya > > >> > > > > >> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas < > > >> > > timothytiborfar...@gmail.com> wrote: > > >> > > > > >> > > > Chaitanya, > > >> > > > > > >> > > > Are you planning to use the SpillableMap and > > >> SpillableArrayListMultiMap > > >> > > > that are in development, or separate implementations? If you are > > >> > planning > > >> > > > on using separate implementations can you please explain why > they > > >> are > > >> > > > needed? > > >> > > > > > >> > > > Thanks, > > >> > > > Tim > > >> > > > > > >> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu < > > >> > > > chaita...@datatorrent.com> wrote: > > >> > > > > > >> > > > > Hi Chandni, > > >> > > > > > > >> > > > > I am suggesting two stores and categorized those based on > key > > >> type: > > >> > > > > (1) If the key is Primary Key then store would be > > >> ManagedJoinStore or > > >> > > > Join > > >> > > > > store using SpillableMap. > > >> > > > > (2) If the key is not the Primary Key then store would be Join > > >> store > > >> > > > using > > >> > > > > SpillableArrayListMultiMap. > > >> > > > > > > >> > > > > Regards, > > >> > > > > Chaitanya > > >> > > > > > > >> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh < > > >> > > singh.chan...@gmail.com > > >> > > > > > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Hi Chaitanya, > > >> > > > > > > > >> > > > > > In the last discussion, we decided that for Join operation > we > > >> > needed > > >> > > > the > > >> > > > > > Spillable DataStructures- specifically > > >> SpillableArrayListMultiMap > > >> > for > > >> > > > the > > >> > > > > > Join Operator. > > >> > > > > > > > >> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this. > The > > >> pull > > >> > > > > request > > >> > > > > > for an in-memory implementation already exists: > > >> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262 > > >> > > > > > > > >> > > > > > As commented on the ticket, Tim is working on the > > >> implementation of > > >> > > > > > SpillableArrayListMultiMap which uses ManagedState. > > >> > > > > > > > >> > > > > > The ManagedJoinStore seems to me like a duplicate of these > > >> > spoilable > > >> > > > data > > >> > > > > > structure. I think we should avoid creating multiple things > > that > > >> > > > provide > > >> > > > > > the same basic functionality. > > >> > > > > > > > >> > > > > > Can you please help review the spillable data structures > pull > > >> > request > > >> > > > and > > >> > > > > > point out what you will need for Join that is missing there? > > >> > > > > > > > >> > > > > > Thanks, > > >> > > > > > Chandni > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu < > > >> > > > > > chaita...@datatorrent.com> wrote: > > >> > > > > > > > >> > > > > > > Hi All, > > >> > > > > > > > > >> > > > > > > Please go through this design and share your > suggestions. > > >> > > > > > > > > >> > > > > > > Regards, > > >> > > > > > > Chaitanya > > >> > > > > > > > > >> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani < > > >> > > > mo...@datatorrent.com> > > >> > > > > > > wrote: > > >> > > > > > > > > >> > > > > > > > +1. > > >> > > > > > > > > > >> > > > > > > > This is one of the common use cases in batch scenarios > and > > >> it > > >> > > will > > >> > > > be > > >> > > > > > > great > > >> > > > > > > > to have this in a stream as well using managed state, > > >> > > > > spillableMultiMap > > >> > > > > > > > structure. > > >> > > > > > > > > > >> > > > > > > > Regards, > > >> > > > > > > > Mohit > > >> > > > > > > > > > >> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra < > > >> > > > > > > > devendra.vyavah...@gmail.com > > >> > > > > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > +1 for the Proposal. > > >> > > > > > > > > > > >> > > > > > > > > This will be useful for joins involving large amount > of > > >> data > > >> > to > > >> > > > be > > >> > > > > > held > > >> > > > > > > > > intermediately. > > >> > > > > > > > > > > >> > > > > > > > > ~ Yogi > > >> > > > > > > > > > > >> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu < > > >> > > > > chaita...@datatorrent.com > > >> > > > > > > > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > >> > > > > > > > > > Hi , > > >> > > > > > > > > > > > >> > > > > > > > > > Malhar library has in-memory join operator and > this > > >> > can't > > >> > > > > > process > > >> > > > > > > > > large > > >> > > > > > > > > > amount of data because of checkpoint and Memory > > >> > bottlenecks. > > >> > > To > > >> > > > > > avoid > > >> > > > > > > > > > these, I am proposing inner join operator using > > Managed > > >> > State > > >> > > > > that > > >> > > > > > is > > >> > > > > > > > > > recently added to Malhar. > > >> > > > > > > > > > > > >> > > > > > > > > > Details of Inner Join operator, Managed State and > > Design > > >> > for > > >> > > > > Inner > > >> > > > > > > Join > > >> > > > > > > > > > operator using Managed State are given below: > > >> > > > > > > > > > > > >> > > > > > > > > > Inner Join Operator > > >> > > > > > > > > > -------------------------- > > >> > > > > > > > > > Join Operator pairs tuples from two relational > streams > > >> that > > >> > > > > matches > > >> > > > > > > the > > >> > > > > > > > > > join condition and emits the paired tuples. > > >> > > > > > > > > > > > >> > > > > > > > > > For example, let's say S1 and S2 are two input > streams > > >> and > > >> > > join > > >> > > > > > > > condition > > >> > > > > > > > > > is > > >> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < > t1 > > >> > > > > > > > > > where a and b are tuples coming on S1 and S2 > > >> respectively, > > >> > t1 > > >> > > > is > > >> > > > > > the > > >> > > > > > > > time > > >> > > > > > > > > > period > > >> > > > > > > > > > > > >> > > > > > > > > > Based on the above join condition, join operator > needs > > >> to > > >> > > store > > >> > > > > > > tuples > > >> > > > > > > > > for > > >> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are their > respective > > >> > stores > > >> > > > > where > > >> > > > > > > > these > > >> > > > > > > > > > tuples are stored. > > >> > > > > > > > > > > > >> > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1. > > >> Following > > >> > are > > >> > > > the > > >> > > > > > > steps > > >> > > > > > > > > to > > >> > > > > > > > > > be done for join operation: > > >> > > > > > > > > > > > >> > > > > > > > > > 1. Insert (k1, v1) into the store JS1. > > >> > > > > > > > > > 2. For Key k1, get the values from store JS2. > > >> > > > > > > > > > 3. If step 2 returns non-null set, apply join > > >> conditions > > >> > > > > > > > > > > > >> > > > > > > > > > The similar procedure is applied if the tuple is on > > >> Stream > > >> > > S2, > > >> > > > > > > swapping > > >> > > > > > > > > JS1 > > >> > > > > > > > > > and JS2 in the steps above. > > >> > > > > > > > > > > > >> > > > > > > > > > Managed State > > >> > > > > > > > > > -------------------- > > >> > > > > > > > > > Managed State is an incremental check-pointing and > > >> > > > fault-tolerant > > >> > > > > > key > > >> > > > > > > > > value > > >> > > > > > > > > > data structure. For more info about Managed State, > > >> please > > >> > > have > > >> > > > a > > >> > > > > > look > > >> > > > > > > > at > > >> > > > > > > > > > the below link: > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed > > >> > > > > > > > > > > > >> > > > > > > > > > Abstract Join Operator > > >> > > > > > > > > > -------------------------------- > > >> > > > > > > > > > Abstract implementation of Join operator is > available > > at > > >> > > > > > > > > > com.datatorrent.lib.join package. JoinStore is a > store > > >> > > > interface > > >> > > > > > used > > >> > > > > > > > in > > >> > > > > > > > > > join operator and available at the same package. > > >> > > > > > > > > > For more details about the join operator, please > > have a > > >> > look > > >> > > > at > > >> > > > > > the > > >> > > > > > > > > below > > >> > > > > > > > > > link: > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join > > >> > > > > > > > > > > > >> > > > > > > > > > Design of Join Operator Using Managed State > > >> > > > > > > > > > > > >> > ------------------------------------------------------------- > > >> > > > > > > > > > We need to provide a concrete store using Managed > > State > > >> and > > >> > > > would > > >> > > > > > be > > >> > > > > > > > like > > >> > > > > > > > > > as below: > > >> > > > > > > > > > > > >> > > > > > > > > > public class ManagedJoinStore extends > > >> ManagedTimeStateImpl > > >> > > > > > implements > > >> > > > > > > > > > JoinStore > > >> > > > > > > > > > { > > >> > > > > > > > > > } > > >> > > > > > > > > > > > >> > > > > > > > > > Data in Managed store would be in the form of Map of > > >> byte[] > > >> > > to > > >> > > > > > > byte[]. > > >> > > > > > > > > > > > >> > > > > > > > > > If the value is list, then inserting (k1, v1) into > the > > >> > store > > >> > > > > would > > >> > > > > > be > > >> > > > > > > > the > > >> > > > > > > > > > following steps: > > >> > > > > > > > > > (1) For the key k1, get the value from store JS2. > > (Here > > >> we > > >> > > need > > >> > > > > to > > >> > > > > > > > search > > >> > > > > > > > > > it in all the time buckets, if the key is not > present > > in > > >> > > > Memory) > > >> > > > > > > > > > (2) Convert the above value to List. > > >> > > > > > > > > > (3) Add the value v1 to the above list. > > >> > > > > > > > > > (4) Convert the new list to slice. > > >> > > > > > > > > > (5) Insert (k1, new slice) into the Managed state. > > >> > > > > > > > > > > > >> > > > > > > > > > Values as list has been discussed here: > > >> > > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026 > > >> > > > > > > > > > > > >> > > > > > > > > > Based on above JIRA, I am suggesting > SpillableMultiMap > > >> data > > >> > > > > > structure > > >> > > > > > > > for > > >> > > > > > > > > > list of values. Let the store would be > > >> > > > > SpillableMultiMapJoinStore. > > >> > > > > > > > > > > > >> > > > > > > > > > Based on above details, store is categorized based > on > > >> key > > >> > > type. > > >> > > > > > > > > > > > >> > > > > > > > > > Key > > >> > Store > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > -------------------------------------------------------------------------- > > >> > > > > > > > > > Primary > > >> ManagedJoinStore > > >> > > > > > > > > > Not Primary > > >> > > > > SpillableMultiMapJoinStore > > >> > > > > > > > > > > > >> > > > > > > > > > Following additional properties would be exposed by > > the > > >> > Join > > >> > > > > > > operator: > > >> > > > > > > > > > > > >> > > > > > > > > > - isPrimaryKey - whether the key is primary or > not. > > >> > > > > > > > > > - noOfBuckets - Number of key buckets. This > > >> parameter is > > >> > > > > > required > > >> > > > > > > > for > > >> > > > > > > > > > Managed State. > > >> > > > > > > > > > > > >> > > > > > > > > > Please provide your suggestions. > > >> > > > > > > > > > > > >> > > > > > > > > > Regards, > > >> > > > > > > > > > Chaitanya > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >