I am using WindowedOperatorImpl and it is declared as follows. WindowedOperatorImpl<InputT, AccumulationType, OutputType> windowedOperator = new WindowedOperatorImpl<>();
In my application scenario, the InputT is Customer POJO which is getting output as an Object by CsvParser. Ajay On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > How do you declare WindowedOperator? > > Thank you, > > Vlad > > > On 4/28/17 10:35, AJAY GUPTA wrote: > >> Vlad, >> >> The approach you suggested doesn't work because the CSVParser outputs >> Object Data Type irrespective of the POJO class being emitted. >> >> >> Ajay >> >> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com> >> wrote: >> >> Make your POJO class implement WindowedOperator Tuple interface (it may >>> return itself in getValue()). >>> >>> Thank you, >>> >>> Vlad >>> >>> On 4/28/17 02:44, AJAY GUPTA wrote: >>> >>> Hi All, >>>> >>>> I am creating an application which is using Windowed Operator. This >>>> application involves CsvParser operator emitting a POJO object which is >>>> to >>>> be passed as input to WindowedOperator. The WindowedOperator requires an >>>> instance of Tuple class as input : >>>> *public final transient DefaultInputPort<Tuple<InputT>> >>>> input = new DefaultInputPort<Tuple<InputT>>() * >>>> >>>> Due to this, the addStream cannot work as the output of CsvParser's >>>> output >>>> port is not compatible with input port type of WindowedOperator. >>>> One way to solve this problem is to have an operator between the above >>>> two >>>> operators as a convertor. >>>> I would like to know if there is any other more generic approach to >>>> solve >>>> this problem without writing a new Operator for every new application >>>> using >>>> Windowed Operators. >>>> >>>> Thanks, >>>> Ajay >>>> >>>> >>>> >>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda < >>>> bhup...@datatorrent.com> >>>> wrote: >>>> >>>> Hi All, >>>> >>>>> I think we have some agreement on the way we should use control tuples >>>>> for >>>>> File I/O operators to support batch. >>>>> >>>>> In order to have more operators in Malhar, support this paradigm, I >>>>> think >>>>> we should also look at store operators - JDBC, Cassandra, HBase etc. >>>>> The case with these operators is simpler as most of these do not poll >>>>> the >>>>> sources (except JDBC poller operator) and just stop once they have >>>>> read a >>>>> fixed amount of data. In other words, these are inherently batch >>>>> sources. >>>>> The only change that we should add to these operators is to shut down >>>>> the >>>>> DAG once the reading of data is done. For a windowed operator this >>>>> would >>>>> mean a Global window with a final watermark before the DAG is shut >>>>> down. >>>>> >>>>> ~ Bhupesh >>>>> >>>>> >>>>> _______________________________________________________ >>>>> >>>>> Bhupesh Chawda >>>>> >>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>> >>>>> www.datatorrent.com | apex.apache.org >>>>> >>>>> >>>>> >>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda < >>>>> bhup...@datatorrent.com> >>>>> wrote: >>>>> >>>>> Hi Thomas, >>>>> >>>>>> Even though the windowing operator is not just "event time", it seems >>>>>> it >>>>>> is too much dependent on the "time" attribute of the incoming tuple. >>>>>> This >>>>>> is the reason we had to model the file index as a timestamp to solve >>>>>> the >>>>>> batch case for files. >>>>>> Perhaps we should work on increasing the scope of the windowed >>>>>> operator >>>>>> >>>>>> to >>>>> >>>>> consider other types of windows as well. The Sequence option suggested >>>>>> by >>>>>> David seems to be something in that direction. >>>>>> >>>>>> ~ Bhupesh >>>>>> >>>>>> >>>>>> _______________________________________________________ >>>>>> >>>>>> Bhupesh Chawda >>>>>> >>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>> >>>>>> www.datatorrent.com | apex.apache.org >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> >>>>>> wrote: >>>>>> >>>>>> That's correct, we are looking at a generalized approach for state >>>>>> >>>>>>> management vs. a series of special cases. >>>>>>> >>>>>>> And to be clear, windowing does not imply event time, otherwise it >>>>>>> would >>>>>>> be >>>>>>> "EventTimeOperator" :-) >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda < >>>>>>> >>>>>>> bhup...@datatorrent.com> >>>>>> wrote: >>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>>> I went through the discussion, but it seems like it is more on the >>>>>>>> >>>>>>>> event >>>>>>> >>>>>> time watermark handling as opposed to batches. What we are trying to >>>>>> >>>>>>> do >>>>>>> >>>>>> is >>>>>> >>>>>>> have watermarks serve the purpose of demarcating batches using >>>>>>>> control >>>>>>>> tuples. Since each batch is separate from others, we would like to >>>>>>>> >>>>>>>> have >>>>>>> >>>>>> stateful processing within a batch, but not across batches. >>>>>> >>>>>>> At the same time, we would like to do this in a manner which is >>>>>>>> >>>>>>>> consistent >>>>>>> >>>>>>> with the windowing mechanism provided by the windowed operator. This >>>>>>>> >>>>>>>> will >>>>>>> >>>>>>> allow us to treat a single batch as a (bounded) stream and apply all >>>>>>>> >>>>>>>> the >>>>>>> >>>>>> event time windowing concepts in that time span. >>>>>> >>>>>>> For example, let's say I need to process data for a day (24 hours) as >>>>>>>> >>>>>>>> a >>>>>>> >>>>>> single batch. The application is still streaming in nature: it would >>>>>> >>>>>>> end >>>>>>> >>>>>> the batch after a day and start a new batch the next day. At the same >>>>>> >>>>>>> time, >>>>>>> >>>>>>> I would be able to have early trigger firings every minute as well as >>>>>>>> >>>>>>>> drop >>>>>>> >>>>>>> any data which is say, 5 mins late. All this within a single day. >>>>>>>> >>>>>>>> ~ Bhupesh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> _______________________________________________________ >>>>>>>> >>>>>>>> Bhupesh Chawda >>>>>>>> >>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>> >>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com> >>>>>>>> >>>>>>>> wrote: >>>>>>> >>>>>> There is a discussion in the Flink mailing list about key-based >>>>>> >>>>>>> watermarks. >>>>>>>> >>>>>>>> I think it's relevant to our use case here. >>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc >>>>>>>>> c6510ef >>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E >>>>>>>>> >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < >>>>>>>>> >>>>>>>>> bhup...@datatorrent.com >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi David, >>>>>>>>> >>>>>>>>>> If using time window does not seem appropriate, we can have >>>>>>>>>> >>>>>>>>>> another >>>>>>>>> >>>>>>>> class >>>>>> >>>>>>> which is more suited for such sequential and distinct windows. >>>>>>>>> Perhaps, a >>>>>>>>> CustomWindow option can be introduced which takes in a window id. >>>>>>>>> The >>>>>>>>> >>>>>>>> purpose of this window option could be to translate the window id >>>>>>>> >>>>>>>>> into >>>>>>>>> >>>>>>>> appropriate timestamps. >>>>>>>> >>>>>>>>> Another option would be to go with a custom timestampExtractor for >>>>>>>>>> >>>>>>>>>> such >>>>>>>>> >>>>>>>> tuples which translates the each unique file name to a distinct >>>>>>>> >>>>>>>>> timestamp >>>>>>>>> while using time windows in the windowed operator. >>>>>>>>> >>>>>>>>>> ~ Bhupesh >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> _______________________________________________________ >>>>>>>>>> >>>>>>>>>> Bhupesh Chawda >>>>>>>>>> >>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>> >>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com> >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>> I now see your rationale on putting the filename in the window. >>>>>>>>> >>>>>>>>>> As far as I understand, the reasons why the filename is not part >>>>>>>>>>> >>>>>>>>>>> of >>>>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>>> key >>>>>>>>> >>>>>>>>>> and the Global Window is not used are: >>>>>>>>>>> >>>>>>>>>>> 1) The files are processed in sequence, not in parallel >>>>>>>>>>> 2) The windowed operator should not keep the state associated >>>>>>>>>>> >>>>>>>>>>> with >>>>>>>>>> >>>>>>>>> the >>>>>> >>>>>>> file >>>>>>>>> >>>>>>>>>> when the processing of the file is done >>>>>>>>>>> 3) The trigger should be fired for the file when a file is done >>>>>>>>>>> >>>>>>>>>>> processing. >>>>>>>>>> >>>>>>>>>> However, if the file is just a sequence has nothing to do with a >>>>>>>>>>> >>>>>>>>>>> timestamp, >>>>>>>>>> >>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do >>>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>> >>>>>>>>> would >>>>>>>> >>>>>>>>> just create confusions to the users, especially when it's used >>>>>>>>>> as >>>>>>>>>> >>>>>>>>> an >>>>>> >>>>>>> example for new users. >>>>>>>> >>>>>>>>> How about having a separate class called SequenceWindow? And >>>>>>>>>>> >>>>>>>>>>> perhaps >>>>>>>>>> >>>>>>>>> TimeWindow can inherit from it? >>>>>>>> >>>>>>>>> David >>>>>>>>>>> >>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < >>>>>>>>> >>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think my comments related to count based windows might be >>>>>>>>>>>> causing >>>>>>>>>>>> >>>>>>>>>>> confusion. Let's not discuss count based scenarios for now. >>>>>>>>> >>>>>>>>>> Just want to make sure we are on the same page wrt. the >>>>>>>>>>>>> >>>>>>>>>>>>> "each >>>>>>>>>>>> >>>>>>>>>>> file >>>>>> >>>>>>> is a >>>>>>>>> >>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from >>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>> same >>>>>>>> >>>>>>>>> file >>>>>>>>>> >>>>>>>>>>> has the same timestamp (which is just a sequence number) and >>>>>>>>>>>>> >>>>>>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>> helps >>>>>>>> >>>>>>>>> keep tuples from each file in a separate window. >>>>>>>>>>> >>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a >>>>>>>>>>>>> >>>>>>>>>>>> time >>>>>>>>>>> >>>>>>>>>> stamp >>>>>>>> >>>>>>>>> also, depending on the file naming convention. And if it was >>>>>>>>>> >>>>>>>>>>> event >>>>>>>>>>> >>>>>>>>>> time >>>>>>>> >>>>>>>>> processing, the watermark would be derived from records within >>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>> >>>>>>>>>> file. >>>>>>>> >>>>>>>>> Agreed, the source should have a mechanism to control the time >>>>>>>>>>> stamp >>>>>>>>>>> >>>>>>>>>> extraction along with everything else pertaining to the >>>>>>>>> >>>>>>>>>> watermark >>>>>>>>>>> >>>>>>>>>> generation. >>>>>>>> >>>>>>>>> >>>>>>>>>>>> We could also implement a "timestampExtractor" interface to >>>>>>>>>>>> identify >>>>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> timestamp (sequence number) for a file. >>>>>>>>>>>> >>>>>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> _______________________________________________________ >>>>>>>>>>>>> >>>>>>>>>>>>> Bhupesh Chawda >>>>>>>>>>>>> >>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>> >>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise < >>>>>>>>>>>>> >>>>>>>>>>>>> t...@apache.org >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>> >>>>>>> I don't think this is a use case for count based window. >>>>>>>>>>> >>>>>>>>>>>> We have multiple files that are retrieved in a sequence >>>>>>>>>>>>>> >>>>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>> there >>>>>> >>>>>>> is >>>>>>>>> >>>>>>>>>> no >>>>>>>>>>> >>>>>>>>>>> knowledge of the number of records per file. The >>>>>>>>>>>> >>>>>>>>>>>>> requirement is >>>>>>>>>>>>> >>>>>>>>>>>> to >>>>>>>> >>>>>>>>> aggregate each file separately and emit the aggregate when >>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>> file >>>>>>>> >>>>>>>>> is >>>>>>>>>> >>>>>>>>>>> read >>>>>>>>>>>> >>>>>>>>>>>>> fully. There is no concept of "end of something" for an >>>>>>>>>>>>>> >>>>>>>>>>>>>> individual >>>>>>>>>>>>> >>>>>>>>>>>> key >>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>> global window isn't applicable. >>>>>>>>>>>>>> >>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh, >>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>>>>>>>>>> >>>>>>>>>>>> can >>>>>>>> >>>>>>>>> be >>>>>>>>>> >>>>>>>>>> solved using watermark and window (in this case the window >>>>>>>>>>> >>>>>>>>>>>> timestamp >>>>>>>>>>>>> >>>>>>>>>>>> isn't >>>>>>>>>>> >>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan < >>>>>>>>>>>>>> >>>>>>>>>>>>>> david...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>> >>>>>>>>> I don't think this is the way to go. Global Window only >>>>>>>>>>>> >>>>>>>>>>>>> means >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>> >>>>>>>>> timestamp >>>>>>>>>> >>>>>>>>>>> does not matter (or that there is no timestamp). It does >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> not >>>>>>>>>>>>>> >>>>>>>>>>>>> necessarily >>>>>>>> >>>>>>>>> mean it's a large batch. Unless there is some notion of >>>>>>>>>>>>>> event >>>>>>>>>>>>>> >>>>>>>>>>>>> time >>>>>>>> >>>>>>>>> for >>>>>>>>>>> >>>>>>>>>>>> each >>>>>>>>>>>>> >>>>>>>>>>>>>> file, you don't want to embed the file into the window >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> itself. >>>>>>>>>>>>>> >>>>>>>>>>>>> If you want the result broken up by file name, and if >>>>>>>>> >>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>> files >>>>>> >>>>>>> are >>>>>>>>> >>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>>> be >>>>>>>>>>>>> >>>>>>>>>>>>> processed in parallel, I think making the file name be >>>>>>>>>>>>>> part >>>>>>>>>>>>>> >>>>>>>>>>>>> of >>>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> key >>>>>>>>>>> >>>>>>>>>>>> is >>>>>>>>>>>>> >>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow >>>>>>>>>>>>>> make >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>> >>>>>>>>> file >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>>>>>>>> be part of the window. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> For count-based window, it's not implemented yet and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> you're >>>>>>>>>>>>>> >>>>>>>>>>>>> welcome >>>>>>>> >>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>>> add >>>>>>>>>>>>> >>>>>>>>>>>>>> that feature. In case of count-based windows, there >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> would >>>>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>> >>>>>>> no >>>>>>>> >>>>>>>> notion >>>>>>>>> >>>>>>>>>> of >>>>>>>>>>>>> >>>>>>>>>>>>> time and you probably only trigger at the end of each >>>>>>>>>>>>>> window. >>>>>>>>>>>>>> >>>>>>>>>>>>> In >>>>>>>> >>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> case >>>>>>>>>>>> >>>>>>>>>>>>> of count-based windows, the watermark only matters for >>>>>>>>>>>>>> batch >>>>>>>>>>>>>> >>>>>>>>>>>>> since >>>>>>>> >>>>>>>>> you >>>>>>>>>>> >>>>>>>>>>>> need >>>>>>>>>>>>> >>>>>>>>>>>>>> a way to know when the batch has ended (if the count is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>> >>>>>>>> number >>>>>>>>> >>>>>>>>>> of >>>>>>>>>>>>> >>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to >>>>>>>>>>>>>> end >>>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>> >>>>>>>> last >>>>>>>>> >>>>>>>>>> window >>>>>>>>>>>> >>>>>>>>>>>>> with 5 tuples). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> David >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi David, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your comments. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The wordcount example that I created based on the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> windowed >>>>>>>>>>>>>>> >>>>>>>>>>>>>> operator >>>>>>>> >>>>>>>>> does >>>>>>>>>>>> >>>>>>>>>>>>> processing of word counts per file (each file as a >>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>> >>>>>>>>>>>>>> batch), >>>>>>>> >>>>>>>>> i.e. >>>>>>>>>>>> >>>>>>>>>>>>> process counts for each file and dump into separate >>>>>>>>>>>>>>> files. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> As I understand Global window is for one large batch; >>>>>>>> >>>>>>>>> i.e. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> all >>>>>>>> >>>>>>>>> incoming >>>>>>>>>> >>>>>>>>>>> data falls into the same batch. This could not be >>>>>>>>>>>>>> >>>>>>>>>>>>>>> processed >>>>>>>>>>>>>>> >>>>>>>>>>>>>> using >>>>>>>> >>>>>>>>> GlobalWindow option as we need more than one windows. >>>>>>>>>>> >>>>>>>>>>>> In >>>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>>> >>>>>>> case, I >>>>>>>>> >>>>>>>>>> configured the windowed operator to have time windows >>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>>>>>>>> >>>>>>>>>>>>>> 1ms >>>>>> >>>>>>> each >>>>>>>>> >>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>>>> passed data for each file with increasing timestamps: >>>>>>>>>>>>> >>>>>>>>>>>>>> (file1, >>>>>>>>>>>>>>> >>>>>>>>>>>>>> 1), >>>>>>>>> >>>>>>>>>> (file2, >>>>>>>>>>> >>>>>>>>>>>> 2) and so on. Is there a better way of handling this >>>>>>>>>>>>>>> scenario? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is >>>>>>>>>> >>>>>>>>>>> a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> trigger >>>>>> >>>>>>> option >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> process count based windows. In case I want to process >>>>>>>>>>>>>>> every >>>>>>>>>>>>>>> >>>>>>>>>>>>>> 1000 >>>>>>>>> >>>>>>>>>> tuples >>>>>>>>>>> >>>>>>>>>>>> as >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> a batch, I could set the Trigger option to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> CountTrigger >>>>>>>>>>>>>>> >>>>>>>>>>>>>> with >>>>>> >>>>>>> the >>>>>>>>> >>>>>>>>>> accumulation set to Discarding. Is this correct? >>>>>>>>>>> >>>>>>>>>>>> I agree that (4. Final Watermark) can be done using >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Global >>>>>>>>>>>>>>> >>>>>>>>>>>>>> window. >>>>>>>> >>>>>>>>> ~ Bhupesh >>>>>>>>>>>> >>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> _________________________ >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Bhupesh Chawda >>>>>> >>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan < >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> david...@gmail.com> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'm worried that we are making the watermark concept >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> too >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> complicated. >>>>>>>> >>>>>>>>> Watermarks should simply just tell you what windows >>>>>>>>>>>>>> >>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> be >>>>>>>> >>>>>>>> considered >>>>>>>>> >>>>>>>>>> complete. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Point 2 is basically a count-based window. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Watermarks >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> do >>>>>> >>>>>>> not >>>>>>>> >>>>>>>>> play a >>>>>>>>>> >>>>>>>>>>> role >>>>>>>>>>>>> >>>>>>>>>>>>>> here because the window is always complete at the >>>>>>>>>>>>>>>> n-th >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> tuple. >>>>>> >>>>>>> If I understand correctly, point 3 is for batch >>>>>>>>>> >>>>>>>>>>> processing >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> of >>>>>>>>> >>>>>>>>> files. >>>>>>>>>> >>>>>>>>>>> Unless >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the files contain timed events, it sounds to be that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> can >>>>>>>> >>>>>>>>> be >>>>>>>>>> >>>>>>>>>>> achieved >>>>>>>>>>>> >>>>>>>>>>>>> with just a Global Window. For signaling EOF, a >>>>>>>>>>>>>>>> watermark >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> with >>>>>>>> >>>>>>>>> a >>>>>>>>>>> >>>>>>>>>>> +infinity >>>>>>>>>>>> >>>>>>>>>>>>> timestamp can be used so that triggers will be fired >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> upon >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> receipt >>>>>>>> >>>>>>>>> of >>>>>>>>>>>> >>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>>> watermark. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> achieved >>>>>> >>>>>>> with a >>>>>>>>>>> >>>>>>>>>>>> watermark with a +infinity timestamp. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For an input operator which is supposed to >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> watermarks >>>>>> >>>>>>> for >>>>>>>>>>>> >>>>>>>>>>>>> downstream operators, I can think about the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> watermarks >>>>>>>> >>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> operator can emit: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> watermark) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n >>>>>>>>>>> >>>>>>>>>>>> tuples) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file) >>>>>> >>>>>>> 4. Final watermark >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> batch >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> (file >>>>>>>> >>>>>>>>> based) >>>>>>>>>> >>>>>>>>>>> as >>>>>>>>>>>>>> >>>>>>>>>>>>>> well, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and hence I thought of looking at these first. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Does >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> this >>>>>> >>>>>>> seem >>>>>>>>> >>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>>> be >>>>>>>>>>>>> >>>>>>>>>>>>> in >>>>>>>>>>>>>> >>>>>>>>>>>>>>> line >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> with the thought process? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> _________________________ >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Bhupesh Chawda >>>>>>>>> >>>>>>>>>> Software Engineer >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> t...@apache.org >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I don't think this should be designed based on a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> simplistic >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> file >>>>>>>>>>> >>>>>>>>>>>> input-output scenario. It would be good to >>>>>>>>>>>>>> >>>>>>>>>>>>>>> include a >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stateful >>>>>>>> >>>>>>>>> transformation based on event time. >>>>>>>>>>>> >>>>>>>>>>>>> More complex pipelines contain stateful >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> transformations >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>> >>>>>>>>>> depend >>>>>>>>>>>> >>>>>>>>>>>>> on >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> windowing and watermarks. I think we need a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> watermark >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> concept >>>>>>>> >>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>>> is >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> based >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on progress in event time (or other monotonic >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> increasing >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> sequence) >>>>>>>>>> >>>>>>>>>>> that >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> other operators can generically work with. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Note that even file input in many cases can >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> produce >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> time >>>>>>>> >>>>>>>>> based >>>>>>>>>> >>>>>>>>>>> watermarks, >>>>>>>>>>>>> >>>>>>>>>>>>>> for example when you read part files that are >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> bound >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> by >>>>>>>> >>>>>>>> event >>>>>>>>> >>>>>>>>>> time. >>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>> >>>>>>> >>>>>>>>>>>>>>>> >