+1 for Windowed Operator if we manage to take care of the problem you
mentioned for this approach in the earlier email.

IMO, "WindowedStorage to suit the dedup use case" would open up ways for
any other similar use-cases for Windowed Operators which needs some custom
handling.

-1 for keeping both the implementations. End user may not have sufficient
depth to understand which implementation to use.

+0 for using Managed state.


~ Yogi

On 15 July 2016 at 15:18, Bhupesh Chawda <[email protected]> wrote:

> Hi,
>
> ​Yes Pramod, the state is not shared among the sliding windows. Probably,
> as David suggested, we can create a more apt implementation for
> WindowedStorage to suit the dedup use case​. And if we are to create a
> custom storage implementation, we might also have to provide functionality
> for asynchronously fetching the data for different event windows (since
> this has to be processed asynchronously due to the time involved in
> fetching data from persistent storage), which is already provided by
> managed state.
>
> ​One of the first decisions we need to make is which implementation to use.
> Once we decide that, we can concentrate completely on that and figure out
> ways to fill up the shortcomings. Here are the options:
>
>    1. Deduper using Managed State - Deduper using Bucketing mechanism and
>    storage provided by Managed State
>    2. Deduper using Windowed Operator - Deduper supporting Beam concepts ​
>    3. Keep both implementations around for users to try out.
>
> ​I have already described the design, the advantages and problems in both
> approaches in the previous emails. Please vote and mention why you think
> so.
>
> Thanks.
>
> ~ Bhupesh
>
>
>
> On Fri, Jul 15, 2016 at 4:43 AM, David Yan <[email protected]> wrote:
>
> > In general, windows cannot share state.
> > But you can have a custom WindowedStorage implementation that does the
> > dedup more efficiently than the default behavior.
> >
> > David
> >
> > On Thu, Jul 14, 2016 at 2:40 PM, Pramod Immaneni <[email protected]
> >
> > wrote:
> >
> > > Bhupesh,
> > >
> > > When using "Windows Operator", if you were using sliding time windows
> > like
> > > you were originally thinking then you would have the correct dedup
> > behavior
> > > with the example case you mentioned with the tuples isn't it? Can the
> > > sliding windows share state with each other?
> > >
> > > Thanks
> > >
> > > On Thu, Jul 14, 2016 at 10:55 AM, Bhupesh Chawda <[email protected]>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I also implemented a De-duplication operator using Windowed Operator.
> > Now
> > > > we have two implementations, one with Managed state and another using
> > > > Windowed operator. Here are their details:
> > > >
> > > >    1. *With Managed State - *
> > > >    - The operator is implemented using managed state as the storage
> for
> > > >       buckets into which the tuples will be stored.
> > > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > > >       different buckets based on the event time. It is also used to
> > > > identify
> > > >       whether a particular tuple is expired and should be sent to the
> > > > expired
> > > >       port / dropped.
> > > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > > implementation
> > > >       is used which just requires the user to specify the event time
> > > > and a bucket
> > > >       is automatically assigned based on that. The structure of the
> > > bucket
> > > > data
> > > >       on storage is as follows: /operator_id /time_bucket
> > > >       - An advantage of using Managed State approach is that we don't
> > > have
> > > >       to assume the correlation of event time to the de-duplication
> key
> > > of
> > > > the
> > > >       tuple. For example, if we get two tuples like: (K1, T1), and
> (K1,
> > > > T2), we
> > > >       can still use ManagedStateImpl and conclude that these tuples
> are
> > > >       duplicates based on the Key K1.
> > > >       2. *With Windowed Operator - *
> > > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > > >       - Accumulation, for the deduper, basically amounts to storing a
> > > list
> > > >       of tuples in the data storage. Every time we get a unique
> tuple,
> > we
> > > >       *accumulate* it in the list.
> > > >       - Event windows are modeled using the *TimeWindow* option.
> > Although
> > > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> > seems
> > > > to be
> > > >       the costly option as the accumulation in this case is not just
> > > > an aggregate
> > > >       value but a list of values in that bucket.
> > > >       - Watermarks are not assumed to be sent from an input operator
> > > >       (although it is okay if an upstream operator sends them). The
> > > >       *fixedWatermark* feature is used to assume watermarks which are
> > > >       relative to the window time.
> > > >       - One of the issues I found with using WindowedOperator for
> Dedup
> > > is
> > > >       that event time is tightly coupled with the de-duplication key.
> > In
> > > > the
> > > >       above example, (K1, T1), and (K1, T2) *might* be concluded as
> two
> > > >       unique tuples since T1 and T2 may fall into two different time
> > > > buckets.
> > > >
> > > > Here are the PRs for both of them.
> > > >
> > > >    - Using Managed State:
> > https://github.com/apache/apex-malhar/pull/335
> > > >    - Using Windowed Operator:
> > > > https://github.com/apache/apex-malhar/pull/343
> > > >
> > > > Please review them and suggest on the correct approach for the final
> > > > implementation which should be used to add other features like fault
> > > > tolerance, scalability, optimizations etc.
> > > > Thanks.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <[email protected]>
> > > wrote:
> > > >
> > > > > No problem.
> > > > >
> > > > > By the way, I changed the method name to setFixedWatermark. And
> also,
> > > if
> > > > > you want to drop any tuples that are considered late, you need to
> set
> > > the
> > > > > allowed lateness to be 0.
> > > > >
> > > > > David
> > > > >
> > > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > > Thanks David.
> > > > > > I'll try to create an implementation for Deduper which uses
> > > > > > WindowedOperator. Will open a PR soon for review.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <[email protected]
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > > AbstractWindowedOperator in my PR. This will allow you to
> specify
> > > the
> > > > > > > lateness with respect to the timestamp from the window ID
> without
> > > > > > watermark
> > > > > > > tuples from upstream.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> > [email protected]>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Bhupesh,
> > > > > > > >
> > > > > > > > Yes, the windowed operator currently depends on the watermark
> > > > tuples
> > > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > > watermark,
> > > > > > > > nothing will be considered late. We can add support for
> > lateness
> > > > > > handling
> > > > > > > > without incoming watermark tuples. Let me add that to the
> pull
> > > > > request.
> > > > > > > >
> > > > > > > > David
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi David,
> > > > > > > >>
> > > > > > > >> Thanks for your reply.
> > > > > > > >>
> > > > > > > >> If I am to use a windowed operator for the Dedup operator,
> > there
> > > > > > should
> > > > > > > be
> > > > > > > >> some operator (upstream to Deduper) which sends the
> watermark
> > > > > tuples.
> > > > > > > >> These
> > > > > > > >> tuples (along with allowed lateness), will be the ones
> > deciding
> > > > > which
> > > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > > following
> > > > > > > >> questions:
> > > > > > > >>
> > > > > > > >> Is a windowed operator (which needs watermarks) dependent
> upon
> > > > some
> > > > > > > other
> > > > > > > >> operator for these tuples? What happens when there are no
> > > > watermark
> > > > > > > tuples
> > > > > > > >> sent from upstream?
> > > > > > > >>
> > > > > > > >> Can a windowed operator "*assume*" the watermark tuples
> based
> > on
> > > > > some
> > > > > > > >> notion of time? For example, can the Deduper, use the
> > streaming
> > > > > window
> > > > > > > >> time
> > > > > > > >> as the reference to advance the watermark?
> > > > > > > >>
> > > > > > > >> Thanks.
> > > > > > > >>
> > > > > > > >> ~ Bhupesh
> > > > > > > >>
> > > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > > [email protected]>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hi Bhupesh,
> > > > > > > >> >
> > > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > > >> WindowedStorage
> > > > > > > >> > and WindowedKeyedStorage:
> > > > > > > >> >
> > > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > > >> >
> > > > > > > >> > We expect either to use ManagedState directly, or
> Spillable
> > > > > > > structures,
> > > > > > > >> > which in turn uses ManagedState.
> > > > > > > >> >
> > > > > > > >> > I'm not very familiar with the dedup operator. but in
> order
> > to
> > > > use
> > > > > > the
> > > > > > > >> > WindowedOperator, it sounds to me that we can use
> > > SlidingWindows
> > > > > > with
> > > > > > > an
> > > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > > filter
> > > > to
> > > > > > > cover
> > > > > > > >> > most of the false cases.
> > > > > > > >> >
> > > > > > > >> > David
> > > > > > > >> >
> > > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > > [email protected]>
> > > > > > > >> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi All,
> > > > > > > >> > >
> > > > > > > >> > > I have looked into Windowing concepts from Apache Beam
> and
> > > the
> > > > > PR
> > > > > > > >> #319 by
> > > > > > > >> > > David. Looks like there are a lot of advanced concepts
> > which
> > > > > could
> > > > > > > be
> > > > > > > >> > used
> > > > > > > >> > > by operators using event time windowing.
> > > > > > > >> > > Additionally I also looked at the Managed State
> > > > implementation.
> > > > > > > >> > >
> > > > > > > >> > > One of the things I noticed is that there is an overlap
> of
> > > > > > > >> functionality
> > > > > > > >> > > between Managed State and Windowing Support in terms of
> > the
> > > > > > > following:
> > > > > > > >> > >
> > > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > > Managed
> > > > > > > State
> > > > > > > >> > uses
> > > > > > > >> > >    the concept of expiry while a Windowed operator uses
> > the
> > > > > > concepts
> > > > > > > >> of
> > > > > > > >> > >    Watermarks and allowed lateness. If I try to
> reconcile
> > > the
> > > > > > above
> > > > > > > >> two,
> > > > > > > >> > it
> > > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> > is
> > > > > trying
> > > > > > > to
> > > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > > based
> > > > on
> > > > > > > >> either
> > > > > > > >> > the
> > > > > > > >> > >    user supplied time or the event time.
> > > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > > disable
> > > > > > > >> purging
> > > > > > > >> > in
> > > > > > > >> > >    Managed State, it will have similar semantics to the
> > > Global
> > > > > > > Window
> > > > > > > >> > > option
> > > > > > > >> > >    in Windowing support.
> > > > > > > >> > >
> > > > > > > >> > > If I understand correctly, is the suggestion to
> implement
> > > the
> > > > > > Dedup
> > > > > > > >> > > operator as a Windowed operator and to use managed state
> > > only
> > > > > as a
> > > > > > > >> > storage
> > > > > > > >> > > medium (through WindowedStorage) ? What could be a
> better
> > > way
> > > > of
> > > > > > > going
> > > > > > > >> > > about this?
> > > > > > > >> > >
> > > > > > > >> > > Thanks.
> > > > > > > >> > >
> > > > > > > >> > > ~ Bhupesh
> > > > > > > >> > >
> > > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > > [email protected]>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hi Thomas,
> > > > > > > >> > > >
> > > > > > > >> > > > I agree that the case of processing bounded data is a
> > > > special
> > > > > > case
> > > > > > > >> of
> > > > > > > >> > > > unbounded data.
> > > > > > > >> > > > Th difference I was pointing out was in terms of
> expiry.
> > > > This
> > > > > is
> > > > > > > not
> > > > > > > >> > > > applicable in case of bounded data sets, while
> unbounded
> > > > data
> > > > > > sets
> > > > > > > >> will
> > > > > > > >> > > > inherently use expiry for limiting the amount of data
> to
> > > be
> > > > > > > stored.
> > > > > > > >> > > >
> > > > > > > >> > > > For idempotency when applying expiry on the streaming
> > > data,
> > > > I
> > > > > > need
> > > > > > > >> to
> > > > > > > >> > > > explore more on the using the window timestamp that
> you
> > > > > proposed
> > > > > > > as
> > > > > > > >> > > opposed
> > > > > > > >> > > > to the system time which I was planning to use.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks.
> > > > > > > >> > > > ~ Bhupesh
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > > >> [email protected]>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > >> Bhupesh,
> > > > > > > >> > > >>
> > > > > > > >> > > >> Why is there a distinction between bounded and
> > unbounded
> > > > > data?
> > > > > > I
> > > > > > > >> see
> > > > > > > >> > the
> > > > > > > >> > > >> former as a special case of the latter?
> > > > > > > >> > > >>
> > > > > > > >> > > >> When rewinding the stream or reprocessing the stream
> in
> > > > > another
> > > > > > > run
> > > > > > > >> > the
> > > > > > > >> > > >> operator should produce the same result.
> > > > > > > >> > > >>
> > > > > > > >> > > >> This operator should be idempotent also. That implies
> > > that
> > > > > code
> > > > > > > >> does
> > > > > > > >> > not
> > > > > > > >> > > >> rely on current system time but the window timestamp
> > > > instead.
> > > > > > > >> > > >>
> > > > > > > >> > > >> All of this should be accomplished by using the
> > windowing
> > > > > > > support:
> > > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > > >> > > >>
> > > > > > > >> > > >> Thanks,
> > > > > > > >> > > >> Thomas
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > > >> > > [email protected]>
> > > > > > > >> > > >> wrote:
> > > > > > > >> > > >>
> > > > > > > >> > > >> > Hi All,
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > > that
> > > > > will
> > > > > > > be
> > > > > > > >> > going
> > > > > > > >> > > >> as
> > > > > > > >> > > >> > part of this implementation.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >    - *Bounded data set*
> > > > > > > >> > > >> >       - This is de-duplication for bounded data.
> For
> > > > > example,
> > > > > > > >> data
> > > > > > > >> > > sets
> > > > > > > >> > > >> >       which are old or fixed or which may not have
> a
> > > time
> > > > > > field
> > > > > > > >> at
> > > > > > > >> > > >> > all. Example:
> > > > > > > >> > > >> >       Last year's transaction records or Customer
> > data
> > > > etc.
> > > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > > bounded
> > > > > > data
> > > > > > > >> set.
> > > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > > >> > > >> >       - This is de-duplication of online streaming
> > data
> > > > > > > >> > > >> >       - Expiry is needed because here incoming
> tuples
> > > may
> > > > > > > arrive
> > > > > > > >> > later
> > > > > > > >> > > >> than
> > > > > > > >> > > >> >       what they are expected. Expiry is always
> > computed
> > > > by
> > > > > > > taking
> > > > > > > >> > the
> > > > > > > >> > > >> > difference
> > > > > > > >> > > >> >       in System time and the Event time.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Thanks.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > ~ Bhupesh
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > > >> > > >> [email protected]>
> > > > > > > >> > > >> > wrote:
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > > Hi All,
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am working on adding a De-duplication operator
> in
> > > > > Malhar
> > > > > > > >> library
> > > > > > > >> > > >> based
> > > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > > already
> > > > > > > >> created
> > > > > > > >> > > JIRA
> > > > > > > >> > > >> -
> > > > > > > >> > > >> > >
> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > > and
> > > > > > > the
> > > > > > > >> > > initial
> > > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > > >> > > >> > >
> > https://github.com/apache/apex-malhar/pull/260/files
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am planning to include the following features
> in
> > > the
> > > > > > first
> > > > > > > >> > > version:
> > > > > > > >> > > >> > > 1. Time based de-duplication. Assumption:
> Tuple_Key
> > > ->
> > > > > > > >> Tuple_Time
> > > > > > > >> > > >> > > correlation holds.
> > > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> > and
> > > > > > expired
> > > > > > > >> > tuples
> > > > > > > >> > > >> > > respectively.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > Thanks.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > ~ Bhupesh
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> >
> > > > > > > >> > > >>
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to