Fwd: APEXMALHAR-1701 Deduper in Malhar

2016-07-18 Thread Bhupesh Chawda
Yes, in that case, it can be modelled as a session window. A session for every key can start with the first occurrence of that key and lasts for some specific time duration. This leads to another question: How exactly do we want to model Dedup expiry? - Are the windows fixed on the event time

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-18 Thread Thomas Weise
+1 on the suggested way forward No clear why you say the windows are fixed though. What if I want the dedup to happen based on the most recent event with a given key + n time units? On Mon, Jul 18, 2016 at 9:05 AM, Bhupesh Chawda wrote: > I can see that Dedup seems like a case where state is c

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-18 Thread Mohit Jotwani
Dear Community, +1 to Bhupesh's suggestion. I would suggest to go ahead with the Managed State and once we have proper analysis on the windowed operator + large storage backed windowed operator - we should implement operators such as dedup with it. Regards, Mohit On Mon, Jul 18, 2016 at 12:35 P

Fwd: APEXMALHAR-1701 Deduper in Malhar

2016-07-18 Thread Bhupesh Chawda
I can see that Dedup seems like a case where state is continuously merged with older state. State in this case is the set of unique tuples. However, for Dedup use case, the event windows are, in a way, fixed, and do not depend on the incoming tuples. In-coming tuples are just *assigned* to these wi

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-17 Thread Thomas Weise
Hi Bhupesh, Dedup is different with regard to state accumulation. For other windowed operations, we collect state and then emit a result after a period of time (trigger or watermark). Here, we only need the state to detect the duplicate. Hence, it is inefficient to collect a list of tuples to dete

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-15 Thread Yogi Devendra
+1 for Windowed Operator if we manage to take care of the problem you mentioned for this approach in the earlier email. IMO, "WindowedStorage to suit the dedup use case" would open up ways for any other similar use-cases for Windowed Operators which needs some custom handling. -1 for keeping both

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-15 Thread Bhupesh Chawda
Hi, ​Yes Pramod, the state is not shared among the sliding windows. Probably, as David suggested, we can create a more apt implementation for WindowedStorage to suit the dedup use case​. And if we are to create a custom storage implementation, we might also have to provide functionality for asynch

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-14 Thread David Yan
In general, windows cannot share state. But you can have a custom WindowedStorage implementation that does the dedup more efficiently than the default behavior. David On Thu, Jul 14, 2016 at 2:40 PM, Pramod Immaneni wrote: > Bhupesh, > > When using "Windows Operator", if you were using sliding

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-14 Thread Pramod Immaneni
Bhupesh, When using "Windows Operator", if you were using sliding time windows like you were originally thinking then you would have the correct dedup behavior with the example case you mentioned with the tuples isn't it? Can the sliding windows share state with each other? Thanks On Thu, Jul 14

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-14 Thread Bhupesh Chawda
Hi All, I also implemented a De-duplication operator using Windowed Operator. Now we have two implementations, one with Managed state and another using Windowed operator. Here are their details: 1. *With Managed State - * - The operator is implemented using managed state as the storage for

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-08 Thread David Yan
No problem. By the way, I changed the method name to setFixedWatermark. And also, if you want to drop any tuples that are considered late, you need to set the allowed lateness to be 0. David On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda wrote: > Thanks David. > I'll try to create an implement

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-08 Thread Bhupesh Chawda
Thanks David. I'll try to create an implementation for Deduper which uses WindowedOperator. Will open a PR soon for review. ~ Bhupesh On Fri, Jul 8, 2016 at 2:23 AM, David Yan wrote: > Hi Bhupesh, > > I just added the method setFixedLateness(long millis) to > AbstractWindowedOperator in my PR.

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-07 Thread David Yan
Hi Bhupesh, I just added the method setFixedLateness(long millis) to AbstractWindowedOperator in my PR. This will allow you to specify the lateness with respect to the timestamp from the window ID without watermark tuples from upstream. David On Thu, Jul 7, 2016 at 11:49 AM, David Yan wrote: >

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-07 Thread David Yan
Hi Bhupesh, Yes, the windowed operator currently depends on the watermark tuples upstream for any "lateness" related operation. If there is no watermark, nothing will be considered late. We can add support for lateness handling without incoming watermark tuples. Let me add that to the pull request

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-07 Thread Bhupesh Chawda
Hi All, I have created an initial Deduper implementation based on Managed State and opened a PR to get feedback from the community. This is based on the initial PR by @chandnisingh. Please help review this PR: https://github.com/apache/apex-malhar/pull/335 Note that this is based entirely on the

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-06 Thread Bhupesh Chawda
Hi David, Thanks for your reply. If I am to use a windowed operator for the Dedup operator, there should be some operator (upstream to Deduper) which sends the watermark tuples. These tuples (along with allowed lateness), will be the ones deciding which incoming tuples are too late and will be dr

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-06 Thread David Yan
Hi Bhupesh, FYI, there is a JIRA open for a scalable implementation of WindowedStorage and WindowedKeyedStorage: https://issues.apache.org/jira/browse/APEXMALHAR-2130 We expect either to use ManagedState directly, or Spillable structures, which in turn uses ManagedState. I'm not very familiar w

Re: APEXMALHAR-1701 Deduper in Malhar

2016-07-04 Thread Bhupesh Chawda
Hi All, I have looked into Windowing concepts from Apache Beam and the PR #319 by David. Looks like there are a lot of advanced concepts which could be used by operators using event time windowing. Additionally I also looked at the Managed State implementation. One of the things I noticed is that

Re: APEXMALHAR-1701 Deduper in Malhar

2016-06-29 Thread Bhupesh Chawda
Hi Thomas, I agree that the case of processing bounded data is a special case of unbounded data. Th difference I was pointing out was in terms of expiry. This is not applicable in case of bounded data sets, while unbounded data sets will inherently use expiry for limiting the amount of data to be

Re: APEXMALHAR-1701 Deduper in Malhar

2016-06-29 Thread Thomas Weise
Bhupesh, Why is there a distinction between bounded and unbounded data? I see the former as a special case of the latter? When rewinding the stream or reprocessing the stream in another run the operator should produce the same result. This operator should be idempotent also. That implies that co

Re: APEXMALHAR-1701 Deduper in Malhar

2016-06-29 Thread Bhupesh Chawda
Hi All, I want to validate the use cases for de-duplication that will be going as part of this implementation. - *Bounded data set* - This is de-duplication for bounded data. For example, data sets which are old or fixed or which may not have a time field at all. Example: Las

APEXMALHAR-1701 Deduper in Malhar

2016-06-26 Thread Bhupesh Chawda
Hi All, I am working on adding a De-duplication operator in Malhar library based on managed state APIs. I will be working off the already created JIRA - https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial pull request for an AbstractDeduper here: https://github.com/apache/apex-ma