No problem. By the way, I changed the method name to setFixedWatermark. And also, if you want to drop any tuples that are considered late, you need to set the allowed lateness to be 0.
David On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <[email protected]> wrote: > Thanks David. > I'll try to create an implementation for Deduper which uses > WindowedOperator. Will open a PR soon for review. > > ~ Bhupesh > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <[email protected]> wrote: > > > Hi Bhupesh, > > > > I just added the method setFixedLateness(long millis) to > > AbstractWindowedOperator in my PR. This will allow you to specify the > > lateness with respect to the timestamp from the window ID without > watermark > > tuples from upstream. > > > > David > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <[email protected]> > wrote: > > > > > Hi Bhupesh, > > > > > > Yes, the windowed operator currently depends on the watermark tuples > > > upstream for any "lateness" related operation. If there is no > watermark, > > > nothing will be considered late. We can add support for lateness > handling > > > without incoming watermark tuples. Let me add that to the pull request. > > > > > > David > > > > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <[email protected]> > > > wrote: > > > > > >> Hi David, > > >> > > >> Thanks for your reply. > > >> > > >> If I am to use a windowed operator for the Dedup operator, there > should > > be > > >> some operator (upstream to Deduper) which sends the watermark tuples. > > >> These > > >> tuples (along with allowed lateness), will be the ones deciding which > > >> incoming tuples are too late and will be dropped. I have the following > > >> questions: > > >> > > >> Is a windowed operator (which needs watermarks) dependent upon some > > other > > >> operator for these tuples? What happens when there are no watermark > > tuples > > >> sent from upstream? > > >> > > >> Can a windowed operator "*assume*" the watermark tuples based on some > > >> notion of time? For example, can the Deduper, use the streaming window > > >> time > > >> as the reference to advance the watermark? > > >> > > >> Thanks. > > >> > > >> ~ Bhupesh > > >> > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <[email protected]> > > wrote: > > >> > > >> > Hi Bhupesh, > > >> > > > >> > FYI, there is a JIRA open for a scalable implementation of > > >> WindowedStorage > > >> > and WindowedKeyedStorage: > > >> > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130 > > >> > > > >> > We expect either to use ManagedState directly, or Spillable > > structures, > > >> > which in turn uses ManagedState. > > >> > > > >> > I'm not very familiar with the dedup operator. but in order to use > the > > >> > WindowedOperator, it sounds to me that we can use SlidingWindows > with > > an > > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to > > cover > > >> > most of the false cases. > > >> > > > >> > David > > >> > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <[email protected]> > > >> wrote: > > >> > > > >> > > Hi All, > > >> > > > > >> > > I have looked into Windowing concepts from Apache Beam and the PR > > >> #319 by > > >> > > David. Looks like there are a lot of advanced concepts which could > > be > > >> > used > > >> > > by operators using event time windowing. > > >> > > Additionally I also looked at the Managed State implementation. > > >> > > > > >> > > One of the things I noticed is that there is an overlap of > > >> functionality > > >> > > between Managed State and Windowing Support in terms of the > > following: > > >> > > > > >> > > - *Discarding / Dropping of tuples* from the system - Managed > > State > > >> > uses > > >> > > the concept of expiry while a Windowed operator uses the > concepts > > >> of > > >> > > Watermarks and allowed lateness. If I try to reconcile the > above > > >> two, > > >> > it > > >> > > seems like Managed State (through TimeBucketAssigner) is trying > > to > > >> > > implement some sort of implicit heuristic Watermarks based on > > >> either > > >> > the > > >> > > user supplied time or the event time. > > >> > > - *Global Window* support - Once we have an option to disable > > >> purging > > >> > in > > >> > > Managed State, it will have similar semantics to the Global > > Window > > >> > > option > > >> > > in Windowing support. > > >> > > > > >> > > If I understand correctly, is the suggestion to implement the > Dedup > > >> > > operator as a Windowed operator and to use managed state only as a > > >> > storage > > >> > > medium (through WindowedStorage) ? What could be a better way of > > going > > >> > > about this? > > >> > > > > >> > > Thanks. > > >> > > > > >> > > ~ Bhupesh > > >> > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda < > > [email protected]> > > >> > > wrote: > > >> > > > > >> > > > Hi Thomas, > > >> > > > > > >> > > > I agree that the case of processing bounded data is a special > case > > >> of > > >> > > > unbounded data. > > >> > > > Th difference I was pointing out was in terms of expiry. This is > > not > > >> > > > applicable in case of bounded data sets, while unbounded data > sets > > >> will > > >> > > > inherently use expiry for limiting the amount of data to be > > stored. > > >> > > > > > >> > > > For idempotency when applying expiry on the streaming data, I > need > > >> to > > >> > > > explore more on the using the window timestamp that you proposed > > as > > >> > > opposed > > >> > > > to the system time which I was planning to use. > > >> > > > > > >> > > > Thanks. > > >> > > > ~ Bhupesh > > >> > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise < > > >> [email protected]> > > >> > > > wrote: > > >> > > > > > >> > > >> Bhupesh, > > >> > > >> > > >> > > >> Why is there a distinction between bounded and unbounded data? > I > > >> see > > >> > the > > >> > > >> former as a special case of the latter? > > >> > > >> > > >> > > >> When rewinding the stream or reprocessing the stream in another > > run > > >> > the > > >> > > >> operator should produce the same result. > > >> > > >> > > >> > > >> This operator should be idempotent also. That implies that code > > >> does > > >> > not > > >> > > >> rely on current system time but the window timestamp instead. > > >> > > >> > > >> > > >> All of this should be accomplished by using the windowing > > support: > > >> > > >> https://github.com/apache/apex-malhar/pull/319 > > >> > > >> > > >> > > >> Thanks, > > >> > > >> Thomas > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < > > >> > > [email protected]> > > >> > > >> wrote: > > >> > > >> > > >> > > >> > Hi All, > > >> > > >> > > > >> > > >> > I want to validate the use cases for de-duplication that will > > be > > >> > going > > >> > > >> as > > >> > > >> > part of this implementation. > > >> > > >> > > > >> > > >> > - *Bounded data set* > > >> > > >> > - This is de-duplication for bounded data. For example, > > >> data > > >> > > sets > > >> > > >> > which are old or fixed or which may not have a time > field > > >> at > > >> > > >> > all. Example: > > >> > > >> > Last year's transaction records or Customer data etc. > > >> > > >> > - Concept of expiry is not needed as this is bounded > data > > >> set. > > >> > > >> > - *Unbounded data set* > > >> > > >> > - This is de-duplication of online streaming data > > >> > > >> > - Expiry is needed because here incoming tuples may > > arrive > > >> > later > > >> > > >> than > > >> > > >> > what they are expected. Expiry is always computed by > > taking > > >> > the > > >> > > >> > difference > > >> > > >> > in System time and the Event time. > > >> > > >> > > > >> > > >> > Any feedback is appreciated. > > >> > > >> > > > >> > > >> > Thanks. > > >> > > >> > > > >> > > >> > ~ Bhupesh > > >> > > >> > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < > > >> > > >> [email protected]> > > >> > > >> > wrote: > > >> > > >> > > > >> > > >> > > Hi All, > > >> > > >> > > > > >> > > >> > > I am working on adding a De-duplication operator in Malhar > > >> library > > >> > > >> based > > >> > > >> > > on managed state APIs. I will be working off the already > > >> created > > >> > > JIRA > > >> > > >> - > > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and > > the > > >> > > initial > > >> > > >> > > pull request for an AbstractDeduper here: > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files > > >> > > >> > > > > >> > > >> > > I am planning to include the following features in the > first > > >> > > version: > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> > > >> Tuple_Time > > >> > > >> > > correlation holds. > > >> > > >> > > 2. Option to maintain order of incoming tuples. > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and > expired > > >> > tuples > > >> > > >> > > respectively. > > >> > > >> > > > > >> > > >> > > Thanks. > > >> > > >> > > > > >> > > >> > > ~ Bhupesh > > >> > > >> > > > > >> > > >> > > > >> > > >> > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >
