+1 on the suggested way forward No clear why you say the windows are fixed though. What if I want the dedup to happen based on the most recent event with a given key + n time units?
On Mon, Jul 18, 2016 at 9:05 AM, Bhupesh Chawda <[email protected]> wrote: > I can see that Dedup seems like a case where state is continuously merged > with older state. State in this case is the set of unique tuples. However, > for Dedup use case, the event windows are, in a way, fixed, and do not > depend on the incoming tuples. In-coming tuples are just *assigned* to > these windows. The point I am trying to make is that the older event > windows will be purged (depending on the lateness configuration and > watermarks) irrespective of the incoming tuples. Session windows on the > other hand depend on the incoming tuples and are not fixed, and change with > incoming data. Perhaps we should not model this use case as a session > window. > > I agree that we cannot decide the approach to be followed with the current > memory backed storage implementation. Actually, even when we have seen a > managed state backed implementation for windowed storage, I am worried that > the interfaces won't still be flexible enough as compared to direct usage > of managed state and will need custom changes to fit the Dedup use case. I > am looking at it from the perspective of asynchronous processing which will > be necessary once we have disk IO involved for processing incoming tuples. > > I will suggest we move ahead with the managed state implementation for > Deduper. We can pick up the Windowed operator based implementation once we > have all the necessary features like windowed storage backed by managed > state, input operators with watermark tuple support etc. > > Suggestions? > > ~ Bhupesh > > > On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <[email protected]> > wrote: > > > Hi Bhupesh, > > > > Dedup is different with regard to state accumulation. For other windowed > > operations, we collect state and then emit a result after a period of > time > > (trigger or watermark). Here, we only need the state to detect the > > duplicate. Hence, it is inefficient to collect a list of tuples to > > determine that a subsequently arriving tuple is a duplicate or not. But > > isn't this scenario similar to the session window, where state is > > continuously merged. > > > > I would prefer to see more analysis on performance and scalability to > large > > key cardinality. The window operator only has the memory backed window > > store at this time. Until there is a managed state backed implementation > > that has seen benchmarking, we cannot really use it as baseline for > further > > implementations on top of it. > > > > Thomas > > > > > > On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <[email protected]> > > wrote: > > > > > Hi All, > > > > > > I also implemented a De-duplication operator using Windowed Operator. > Now > > > we have two implementations, one with Managed state and another using > > > Windowed operator. Here are their details: > > > > > > 1. *With Managed State - * > > > - The operator is implemented using managed state as the storage for > > > buckets into which the tuples will be stored. > > > - *TimeBucketAssigner* is used to assign an incoming tuple to > > > different buckets based on the event time. It is also used to > > > identify > > > whether a particular tuple is expired and should be sent to the > > > expired > > > port / dropped. > > > - For managed state, the *ManagedTimeUnifiedStateImpl* > > implementation > > > is used which just requires the user to specify the event time > > > and a bucket > > > is automatically assigned based on that. The structure of the > > bucket > > > data > > > on storage is as follows: /operator_id /time_bucket > > > - An advantage of using Managed State approach is that we don't > > have > > > to assume the correlation of event time to the de-duplication key > > of > > > the > > > tuple. For example, if we get two tuples like: (K1, T1), and (K1, > > > T2), we > > > can still use ManagedStateImpl and conclude that these tuples are > > > duplicates based on the Key K1. > > > 2. *With Windowed Operator - * > > > - The operator uses the WindowedOperatorImpl as the base operator. > > > - Accumulation, for the deduper, basically amounts to storing a > > list > > > of tuples in the data storage. Every time we get a unique tuple, > we > > > *accumulate* it in the list. > > > - Event windows are modeled using the *TimeWindow* option. > Although > > > SlidingTimeWIndows seems to be intuitive for data buckets, it > seems > > > to be > > > the costly option as the accumulation in this case is not just > > > an aggregate > > > value but a list of values in that bucket. > > > - Watermarks are not assumed to be sent from an input operator > > > (although it is okay if an upstream operator sends them). The > > > *fixedWatermark* feature is used to assume watermarks which are > > > relative to the window time. > > > - One of the issues I found with using WindowedOperator for Dedup > > is > > > that event time is tightly coupled with the de-duplication key. > In > > > the > > > above example, (K1, T1), and (K1, T2) *might* be concluded as two > > > unique tuples since T1 and T2 may fall into two different time > > > buckets. > > > > > > Here are the PRs for both of them. > > > > > > - Using Managed State: > https://github.com/apache/apex-malhar/pull/335 > > > - Using Windowed Operator: > > > https://github.com/apache/apex-malhar/pull/343 > > > > > > Please review them and suggest on the correct approach for the final > > > implementation which should be used to add other features like fault > > > tolerance, scalability, optimizations etc. > > > Thanks. > > > > > > ~ Bhupesh > > > > > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <[email protected]> > > wrote: > > > > > > > No problem. > > > > > > > > By the way, I changed the method name to setFixedWatermark. And also, > > if > > > > you want to drop any tuples that are considered late, you need to set > > the > > > > allowed lateness to be 0. > > > > > > > > David > > > > > > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <[email protected]> > > > wrote: > > > > > > > > > Thanks David. > > > > > I'll try to create an implementation for Deduper which uses > > > > > WindowedOperator. Will open a PR soon for review. > > > > > > > > > > ~ Bhupesh > > > > > > > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <[email protected]> > > > wrote: > > > > > > > > > > > Hi Bhupesh, > > > > > > > > > > > > I just added the method setFixedLateness(long millis) to > > > > > > AbstractWindowedOperator in my PR. This will allow you to specify > > the > > > > > > lateness with respect to the timestamp from the window ID without > > > > > watermark > > > > > > tuples from upstream. > > > > > > > > > > > > David > > > > > > > > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan < > [email protected]> > > > > > wrote: > > > > > > > > > > > > > Hi Bhupesh, > > > > > > > > > > > > > > Yes, the windowed operator currently depends on the watermark > > > tuples > > > > > > > upstream for any "lateness" related operation. If there is no > > > > > watermark, > > > > > > > nothing will be considered late. We can add support for > lateness > > > > > handling > > > > > > > without incoming watermark tuples. Let me add that to the pull > > > > request. > > > > > > > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda < > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > >> Hi David, > > > > > > >> > > > > > > >> Thanks for your reply. > > > > > > >> > > > > > > >> If I am to use a windowed operator for the Dedup operator, > there > > > > > should > > > > > > be > > > > > > >> some operator (upstream to Deduper) which sends the watermark > > > > tuples. > > > > > > >> These > > > > > > >> tuples (along with allowed lateness), will be the ones > deciding > > > > which > > > > > > >> incoming tuples are too late and will be dropped. I have the > > > > following > > > > > > >> questions: > > > > > > >> > > > > > > >> Is a windowed operator (which needs watermarks) dependent upon > > > some > > > > > > other > > > > > > >> operator for these tuples? What happens when there are no > > > watermark > > > > > > tuples > > > > > > >> sent from upstream? > > > > > > >> > > > > > > >> Can a windowed operator "*assume*" the watermark tuples based > on > > > > some > > > > > > >> notion of time? For example, can the Deduper, use the > streaming > > > > window > > > > > > >> time > > > > > > >> as the reference to advance the watermark? > > > > > > >> > > > > > > >> Thanks. > > > > > > >> > > > > > > >> ~ Bhupesh > > > > > > >> > > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan < > > [email protected]> > > > > > > wrote: > > > > > > >> > > > > > > >> > Hi Bhupesh, > > > > > > >> > > > > > > > >> > FYI, there is a JIRA open for a scalable implementation of > > > > > > >> WindowedStorage > > > > > > >> > and WindowedKeyedStorage: > > > > > > >> > > > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130 > > > > > > >> > > > > > > > >> > We expect either to use ManagedState directly, or Spillable > > > > > > structures, > > > > > > >> > which in turn uses ManagedState. > > > > > > >> > > > > > > > >> > I'm not very familiar with the dedup operator. but in order > to > > > use > > > > > the > > > > > > >> > WindowedOperator, it sounds to me that we can use > > SlidingWindows > > > > > with > > > > > > an > > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom > > filter > > > to > > > > > > cover > > > > > > >> > most of the false cases. > > > > > > >> > > > > > > > >> > David > > > > > > >> > > > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda < > > > > [email protected]> > > > > > > >> wrote: > > > > > > >> > > > > > > > >> > > Hi All, > > > > > > >> > > > > > > > > >> > > I have looked into Windowing concepts from Apache Beam and > > the > > > > PR > > > > > > >> #319 by > > > > > > >> > > David. Looks like there are a lot of advanced concepts > which > > > > could > > > > > > be > > > > > > >> > used > > > > > > >> > > by operators using event time windowing. > > > > > > >> > > Additionally I also looked at the Managed State > > > implementation. > > > > > > >> > > > > > > > > >> > > One of the things I noticed is that there is an overlap of > > > > > > >> functionality > > > > > > >> > > between Managed State and Windowing Support in terms of > the > > > > > > following: > > > > > > >> > > > > > > > > >> > > - *Discarding / Dropping of tuples* from the system - > > > Managed > > > > > > State > > > > > > >> > uses > > > > > > >> > > the concept of expiry while a Windowed operator uses > the > > > > > concepts > > > > > > >> of > > > > > > >> > > Watermarks and allowed lateness. If I try to reconcile > > the > > > > > above > > > > > > >> two, > > > > > > >> > it > > > > > > >> > > seems like Managed State (through TimeBucketAssigner) > is > > > > trying > > > > > > to > > > > > > >> > > implement some sort of implicit heuristic Watermarks > > based > > > on > > > > > > >> either > > > > > > >> > the > > > > > > >> > > user supplied time or the event time. > > > > > > >> > > - *Global Window* support - Once we have an option to > > > disable > > > > > > >> purging > > > > > > >> > in > > > > > > >> > > Managed State, it will have similar semantics to the > > Global > > > > > > Window > > > > > > >> > > option > > > > > > >> > > in Windowing support. > > > > > > >> > > > > > > > > >> > > If I understand correctly, is the suggestion to implement > > the > > > > > Dedup > > > > > > >> > > operator as a Windowed operator and to use managed state > > only > > > > as a > > > > > > >> > storage > > > > > > >> > > medium (through WindowedStorage) ? What could be a better > > way > > > of > > > > > > going > > > > > > >> > > about this? > > > > > > >> > > > > > > > > >> > > Thanks. > > > > > > >> > > > > > > > > >> > > ~ Bhupesh > > > > > > >> > > > > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda < > > > > > > [email protected]> > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > >> > > > Hi Thomas, > > > > > > >> > > > > > > > > > >> > > > I agree that the case of processing bounded data is a > > > special > > > > > case > > > > > > >> of > > > > > > >> > > > unbounded data. > > > > > > >> > > > Th difference I was pointing out was in terms of expiry. > > > This > > > > is > > > > > > not > > > > > > >> > > > applicable in case of bounded data sets, while unbounded > > > data > > > > > sets > > > > > > >> will > > > > > > >> > > > inherently use expiry for limiting the amount of data to > > be > > > > > > stored. > > > > > > >> > > > > > > > > > >> > > > For idempotency when applying expiry on the streaming > > data, > > > I > > > > > need > > > > > > >> to > > > > > > >> > > > explore more on the using the window timestamp that you > > > > proposed > > > > > > as > > > > > > >> > > opposed > > > > > > >> > > > to the system time which I was planning to use. > > > > > > >> > > > > > > > > > >> > > > Thanks. > > > > > > >> > > > ~ Bhupesh > > > > > > >> > > > > > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise < > > > > > > >> [email protected]> > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > >> > > >> Bhupesh, > > > > > > >> > > >> > > > > > > >> > > >> Why is there a distinction between bounded and > unbounded > > > > data? > > > > > I > > > > > > >> see > > > > > > >> > the > > > > > > >> > > >> former as a special case of the latter? > > > > > > >> > > >> > > > > > > >> > > >> When rewinding the stream or reprocessing the stream in > > > > another > > > > > > run > > > > > > >> > the > > > > > > >> > > >> operator should produce the same result. > > > > > > >> > > >> > > > > > > >> > > >> This operator should be idempotent also. That implies > > that > > > > code > > > > > > >> does > > > > > > >> > not > > > > > > >> > > >> rely on current system time but the window timestamp > > > instead. > > > > > > >> > > >> > > > > > > >> > > >> All of this should be accomplished by using the > windowing > > > > > > support: > > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319 > > > > > > >> > > >> > > > > > > >> > > >> Thanks, > > > > > > >> > > >> Thomas > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < > > > > > > >> > > [email protected]> > > > > > > >> > > >> wrote: > > > > > > >> > > >> > > > > > > >> > > >> > Hi All, > > > > > > >> > > >> > > > > > > > >> > > >> > I want to validate the use cases for de-duplication > > that > > > > will > > > > > > be > > > > > > >> > going > > > > > > >> > > >> as > > > > > > >> > > >> > part of this implementation. > > > > > > >> > > >> > > > > > > > >> > > >> > - *Bounded data set* > > > > > > >> > > >> > - This is de-duplication for bounded data. For > > > > example, > > > > > > >> data > > > > > > >> > > sets > > > > > > >> > > >> > which are old or fixed or which may not have a > > time > > > > > field > > > > > > >> at > > > > > > >> > > >> > all. Example: > > > > > > >> > > >> > Last year's transaction records or Customer > data > > > etc. > > > > > > >> > > >> > - Concept of expiry is not needed as this is > > > bounded > > > > > data > > > > > > >> set. > > > > > > >> > > >> > - *Unbounded data set* > > > > > > >> > > >> > - This is de-duplication of online streaming > data > > > > > > >> > > >> > - Expiry is needed because here incoming tuples > > may > > > > > > arrive > > > > > > >> > later > > > > > > >> > > >> than > > > > > > >> > > >> > what they are expected. Expiry is always > computed > > > by > > > > > > taking > > > > > > >> > the > > > > > > >> > > >> > difference > > > > > > >> > > >> > in System time and the Event time. > > > > > > >> > > >> > > > > > > > >> > > >> > Any feedback is appreciated. > > > > > > >> > > >> > > > > > > > >> > > >> > Thanks. > > > > > > >> > > >> > > > > > > > >> > > >> > ~ Bhupesh > > > > > > >> > > >> > > > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < > > > > > > >> > > >> [email protected]> > > > > > > >> > > >> > wrote: > > > > > > >> > > >> > > > > > > > >> > > >> > > Hi All, > > > > > > >> > > >> > > > > > > > > >> > > >> > > I am working on adding a De-duplication operator in > > > > Malhar > > > > > > >> library > > > > > > >> > > >> based > > > > > > >> > > >> > > on managed state APIs. I will be working off the > > > already > > > > > > >> created > > > > > > >> > > JIRA > > > > > > >> > > >> - > > > > > > >> > > >> > > > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 > > > > and > > > > > > the > > > > > > >> > > initial > > > > > > >> > > >> > > pull request for an AbstractDeduper here: > > > > > > >> > > >> > > > https://github.com/apache/apex-malhar/pull/260/files > > > > > > >> > > >> > > > > > > > > >> > > >> > > I am planning to include the following features in > > the > > > > > first > > > > > > >> > > version: > > > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key > > -> > > > > > > >> Tuple_Time > > > > > > >> > > >> > > correlation holds. > > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples. > > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate > and > > > > > expired > > > > > > >> > tuples > > > > > > >> > > >> > > respectively. > > > > > > >> > > >> > > > > > > > > >> > > >> > > Thanks. > > > > > > >> > > >> > > > > > > > > >> > > >> > > ~ Bhupesh > > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
