Vlad, The approach you suggested doesn't work because the CSVParser outputs Object Data Type irrespective of the POJO class being emitted.
Ajay On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > Make your POJO class implement WindowedOperator Tuple interface (it may > return itself in getValue()). > > Thank you, > > Vlad > > On 4/28/17 02:44, AJAY GUPTA wrote: > >> Hi All, >> >> I am creating an application which is using Windowed Operator. This >> application involves CsvParser operator emitting a POJO object which is to >> be passed as input to WindowedOperator. The WindowedOperator requires an >> instance of Tuple class as input : >> *public final transient DefaultInputPort<Tuple<InputT>> >> input = new DefaultInputPort<Tuple<InputT>>() * >> >> Due to this, the addStream cannot work as the output of CsvParser's output >> port is not compatible with input port type of WindowedOperator. >> One way to solve this problem is to have an operator between the above two >> operators as a convertor. >> I would like to know if there is any other more generic approach to solve >> this problem without writing a new Operator for every new application >> using >> Windowed Operators. >> >> Thanks, >> Ajay >> >> >> >> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <bhup...@datatorrent.com> >> wrote: >> >> Hi All, >>> >>> I think we have some agreement on the way we should use control tuples >>> for >>> File I/O operators to support batch. >>> >>> In order to have more operators in Malhar, support this paradigm, I think >>> we should also look at store operators - JDBC, Cassandra, HBase etc. >>> The case with these operators is simpler as most of these do not poll the >>> sources (except JDBC poller operator) and just stop once they have read a >>> fixed amount of data. In other words, these are inherently batch sources. >>> The only change that we should add to these operators is to shut down the >>> DAG once the reading of data is done. For a windowed operator this would >>> mean a Global window with a final watermark before the DAG is shut down. >>> >>> ~ Bhupesh >>> >>> >>> _______________________________________________________ >>> >>> Bhupesh Chawda >>> >>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>> >>> www.datatorrent.com | apex.apache.org >>> >>> >>> >>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda < >>> bhup...@datatorrent.com> >>> wrote: >>> >>> Hi Thomas, >>>> >>>> Even though the windowing operator is not just "event time", it seems it >>>> is too much dependent on the "time" attribute of the incoming tuple. >>>> This >>>> is the reason we had to model the file index as a timestamp to solve the >>>> batch case for files. >>>> Perhaps we should work on increasing the scope of the windowed operator >>>> >>> to >>> >>>> consider other types of windows as well. The Sequence option suggested >>>> by >>>> David seems to be something in that direction. >>>> >>>> ~ Bhupesh >>>> >>>> >>>> _______________________________________________________ >>>> >>>> Bhupesh Chawda >>>> >>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>> >>>> www.datatorrent.com | apex.apache.org >>>> >>>> >>>> >>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> wrote: >>>> >>>> That's correct, we are looking at a generalized approach for state >>>>> management vs. a series of special cases. >>>>> >>>>> And to be clear, windowing does not imply event time, otherwise it >>>>> would >>>>> be >>>>> "EventTimeOperator" :-) >>>>> >>>>> Thomas >>>>> >>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda < >>>>> >>>> bhup...@datatorrent.com> >>> >>>> wrote: >>>>> >>>>> Hi David, >>>>>> >>>>>> I went through the discussion, but it seems like it is more on the >>>>>> >>>>> event >>> >>>> time watermark handling as opposed to batches. What we are trying to >>>>>> >>>>> do >>> >>>> is >>>>> >>>>>> have watermarks serve the purpose of demarcating batches using control >>>>>> tuples. Since each batch is separate from others, we would like to >>>>>> >>>>> have >>> >>>> stateful processing within a batch, but not across batches. >>>>>> At the same time, we would like to do this in a manner which is >>>>>> >>>>> consistent >>>>> >>>>>> with the windowing mechanism provided by the windowed operator. This >>>>>> >>>>> will >>>>> >>>>>> allow us to treat a single batch as a (bounded) stream and apply all >>>>>> >>>>> the >>> >>>> event time windowing concepts in that time span. >>>>>> >>>>>> For example, let's say I need to process data for a day (24 hours) as >>>>>> >>>>> a >>> >>>> single batch. The application is still streaming in nature: it would >>>>>> >>>>> end >>> >>>> the batch after a day and start a new batch the next day. At the same >>>>>> >>>>> time, >>>>> >>>>>> I would be able to have early trigger firings every minute as well as >>>>>> >>>>> drop >>>>> >>>>>> any data which is say, 5 mins late. All this within a single day. >>>>>> >>>>>> ~ Bhupesh >>>>>> >>>>>> >>>>>> >>>>>> _______________________________________________________ >>>>>> >>>>>> Bhupesh Chawda >>>>>> >>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>> >>>>>> www.datatorrent.com | apex.apache.org >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com> >>>>>> >>>>> wrote: >>> >>>> There is a discussion in the Flink mailing list about key-based >>>>>>> >>>>>> watermarks. >>>>>> >>>>>>> I think it's relevant to our use case here. >>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef >>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E >>>>>>> >>>>>>> David >>>>>>> >>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < >>>>>>> >>>>>> bhup...@datatorrent.com >>>>> >>>>>> wrote: >>>>>>> >>>>>>> Hi David, >>>>>>>> >>>>>>>> If using time window does not seem appropriate, we can have >>>>>>>> >>>>>>> another >>> >>>> class >>>>>> >>>>>>> which is more suited for such sequential and distinct windows. >>>>>>>> >>>>>>> Perhaps, a >>>>>> >>>>>>> CustomWindow option can be introduced which takes in a window id. >>>>>>>> >>>>>>> The >>>>> >>>>>> purpose of this window option could be to translate the window id >>>>>>>> >>>>>>> into >>>>> >>>>>> appropriate timestamps. >>>>>>>> >>>>>>>> Another option would be to go with a custom timestampExtractor for >>>>>>>> >>>>>>> such >>>>> >>>>>> tuples which translates the each unique file name to a distinct >>>>>>>> >>>>>>> timestamp >>>>>> >>>>>>> while using time windows in the windowed operator. >>>>>>>> >>>>>>>> ~ Bhupesh >>>>>>>> >>>>>>>> >>>>>>>> _______________________________________________________ >>>>>>>> >>>>>>>> Bhupesh Chawda >>>>>>>> >>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>> >>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com> >>>>>>>> >>>>>>> wrote: >>>>>> >>>>>>> I now see your rationale on putting the filename in the window. >>>>>>>>> As far as I understand, the reasons why the filename is not part >>>>>>>>> >>>>>>>> of >>>>> >>>>>> the >>>>>> >>>>>>> key >>>>>>>> >>>>>>>>> and the Global Window is not used are: >>>>>>>>> >>>>>>>>> 1) The files are processed in sequence, not in parallel >>>>>>>>> 2) The windowed operator should not keep the state associated >>>>>>>>> >>>>>>>> with >>> >>>> the >>>>>> >>>>>>> file >>>>>>>> >>>>>>>>> when the processing of the file is done >>>>>>>>> 3) The trigger should be fired for the file when a file is done >>>>>>>>> >>>>>>>> processing. >>>>>>>> >>>>>>>>> However, if the file is just a sequence has nothing to do with a >>>>>>>>> >>>>>>>> timestamp, >>>>>>>> >>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do >>>>>>>>> >>>>>>>> and >>>>> >>>>>> would >>>>>>> >>>>>>>> just create confusions to the users, especially when it's used >>>>>>>>> >>>>>>>> as >>> >>>> an >>>>> >>>>>> example for new users. >>>>>>>>> >>>>>>>>> How about having a separate class called SequenceWindow? And >>>>>>>>> >>>>>>>> perhaps >>>>> >>>>>> TimeWindow can inherit from it? >>>>>>>>> >>>>>>>>> David >>>>>>>>> >>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org> >>>>>>>>> >>>>>>>> wrote: >>>>>> >>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < >>>>>>>>>> >>>>>>>>> bhup...@datatorrent.com >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> I think my comments related to count based windows might be >>>>>>>>>>> >>>>>>>>>> causing >>>>>> >>>>>>> confusion. Let's not discuss count based scenarios for now. >>>>>>>>>>> >>>>>>>>>>> Just want to make sure we are on the same page wrt. the >>>>>>>>>>> >>>>>>>>>> "each >>> >>>> file >>>>>> >>>>>>> is a >>>>>>>> >>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from >>>>>>>>>>> >>>>>>>>>> the >>>>> >>>>>> same >>>>>>> >>>>>>>> file >>>>>>>>>> >>>>>>>>>>> has the same timestamp (which is just a sequence number) and >>>>>>>>>>> >>>>>>>>>> that >>>>> >>>>>> helps >>>>>>>> >>>>>>>>> keep tuples from each file in a separate window. >>>>>>>>>>> >>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a >>>>>>>>>> >>>>>>>>> time >>>>> >>>>>> stamp >>>>>>> >>>>>>>> also, depending on the file naming convention. And if it was >>>>>>>>>> >>>>>>>>> event >>>>> >>>>>> time >>>>>>> >>>>>>>> processing, the watermark would be derived from records within >>>>>>>>>> >>>>>>>>> the >>>>> >>>>>> file. >>>>>>>> >>>>>>>>> Agreed, the source should have a mechanism to control the time >>>>>>>>>> >>>>>>>>> stamp >>>>>> >>>>>>> extraction along with everything else pertaining to the >>>>>>>>>> >>>>>>>>> watermark >>>>> >>>>>> generation. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> We could also implement a "timestampExtractor" interface to >>>>>>>>>>> >>>>>>>>>> identify >>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> timestamp (sequence number) for a file. >>>>>>>>>>> >>>>>>>>>>> ~ Bhupesh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> _______________________________________________________ >>>>>>>>>>> >>>>>>>>>>> Bhupesh Chawda >>>>>>>>>>> >>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>> >>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise < >>>>>>>>>>> >>>>>>>>>> t...@apache.org >>> >>>> wrote: >>>>>>>> >>>>>>>>> I don't think this is a use case for count based window. >>>>>>>>>>>> >>>>>>>>>>>> We have multiple files that are retrieved in a sequence >>>>>>>>>>>> >>>>>>>>>>> and >>> >>>> there >>>>>> >>>>>>> is >>>>>>>> >>>>>>>>> no >>>>>>>>> >>>>>>>>>> knowledge of the number of records per file. The >>>>>>>>>>>> >>>>>>>>>>> requirement is >>>>> >>>>>> to >>>>>>> >>>>>>>> aggregate each file separately and emit the aggregate when >>>>>>>>>>>> >>>>>>>>>>> the >>>>> >>>>>> file >>>>>>> >>>>>>>> is >>>>>>>>> >>>>>>>>>> read >>>>>>>>>>> >>>>>>>>>>>> fully. There is no concept of "end of something" for an >>>>>>>>>>>> >>>>>>>>>>> individual >>>>>>> >>>>>>>> key >>>>>>>>> >>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>>>> global window isn't applicable. >>>>>>>>>>>> >>>>>>>>>>>> However, as already explained and implemented by Bhupesh, >>>>>>>>>>>> >>>>>>>>>>> this >>>>> >>>>>> can >>>>>>> >>>>>>>> be >>>>>>>> >>>>>>>>> solved using watermark and window (in this case the window >>>>>>>>>>>> >>>>>>>>>>> timestamp >>>>>>>> >>>>>>>>> isn't >>>>>>>>>>> >>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter. >>>>>>>>>>>> >>>>>>>>>>>> Thomas >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan < >>>>>>>>>>>> >>>>>>>>>>> david...@gmail.com >>>>> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> I don't think this is the way to go. Global Window only >>>>>>>>>>>>> >>>>>>>>>>>> means >>>>> >>>>>> the >>>>>>> >>>>>>>> timestamp >>>>>>>>>>>> >>>>>>>>>>>>> does not matter (or that there is no timestamp). It does >>>>>>>>>>>>> >>>>>>>>>>>> not >>>>> >>>>>> necessarily >>>>>>>>>>> >>>>>>>>>>>> mean it's a large batch. Unless there is some notion of >>>>>>>>>>>>> >>>>>>>>>>>> event >>>>> >>>>>> time >>>>>>>> >>>>>>>>> for >>>>>>>>>> >>>>>>>>>>> each >>>>>>>>>>>> >>>>>>>>>>>>> file, you don't want to embed the file into the window >>>>>>>>>>>>> >>>>>>>>>>>> itself. >>>>>> >>>>>>> If you want the result broken up by file name, and if >>>>>>>>>>>>> >>>>>>>>>>>> the >>> >>>> files >>>>>> >>>>>>> are >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> be >>>>>>>>>>> >>>>>>>>>>>> processed in parallel, I think making the file name be >>>>>>>>>>>>> >>>>>>>>>>>> part >>>>> >>>>>> of >>>>>> >>>>>>> the >>>>>>>> >>>>>>>>> key >>>>>>>>>> >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>>> the way to go. I think it's very confusing if we somehow >>>>>>>>>>>>> >>>>>>>>>>>> make >>>>> >>>>>> the >>>>>>> >>>>>>>> file >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>> >>>>>>>>>>>> be part of the window. >>>>>>>>>>>>> >>>>>>>>>>>>> For count-based window, it's not implemented yet and >>>>>>>>>>>>> >>>>>>>>>>>> you're >>>>> >>>>>> welcome >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> add >>>>>>>>>>>> >>>>>>>>>>>>> that feature. In case of count-based windows, there >>>>>>>>>>>>> >>>>>>>>>>>> would >>> >>>> be >>>>> >>>>>> no >>>>>> >>>>>>> notion >>>>>>>>>> >>>>>>>>>>> of >>>>>>>>>>> >>>>>>>>>>>> time and you probably only trigger at the end of each >>>>>>>>>>>>> >>>>>>>>>>>> window. >>>>> >>>>>> In >>>>>>> >>>>>>>> the >>>>>>>>> >>>>>>>>>> case >>>>>>>>>>> >>>>>>>>>>>> of count-based windows, the watermark only matters for >>>>>>>>>>>>> >>>>>>>>>>>> batch >>>>> >>>>>> since >>>>>>>> >>>>>>>>> you >>>>>>>>>> >>>>>>>>>>> need >>>>>>>>>>>> >>>>>>>>>>>>> a way to know when the batch has ended (if the count is >>>>>>>>>>>>> >>>>>>>>>>>> 10, >>>>> >>>>>> the >>>>>> >>>>>>> number >>>>>>>>>> >>>>>>>>>>> of >>>>>>>>>>> >>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to >>>>>>>>>>>>> >>>>>>>>>>>> end >>>>> >>>>>> the >>>>>> >>>>>>> last >>>>>>>>> >>>>>>>>>> window >>>>>>>>>>>> >>>>>>>>>>>>> with 5 tuples). >>>>>>>>>>>>> >>>>>>>>>>>>> David >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < >>>>>>>>>>>>> >>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi David, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your comments. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The wordcount example that I created based on the >>>>>>>>>>>>>> >>>>>>>>>>>>> windowed >>>>> >>>>>> operator >>>>>>>>> >>>>>>>>>> does >>>>>>>>>>>> >>>>>>>>>>>>> processing of word counts per file (each file as a >>>>>>>>>>>>>> >>>>>>>>>>>>> separate >>>>> >>>>>> batch), >>>>>>>>> >>>>>>>>>> i.e. >>>>>>>>>>>> >>>>>>>>>>>>> process counts for each file and dump into separate >>>>>>>>>>>>>> >>>>>>>>>>>>> files. >>>>> >>>>>> As I understand Global window is for one large batch; >>>>>>>>>>>>>> >>>>>>>>>>>>> i.e. >>>>> >>>>>> all >>>>>>> >>>>>>>> incoming >>>>>>>>>>> >>>>>>>>>>>> data falls into the same batch. This could not be >>>>>>>>>>>>>> >>>>>>>>>>>>> processed >>>>> >>>>>> using >>>>>>>> >>>>>>>>> GlobalWindow option as we need more than one windows. >>>>>>>>>>>>>> >>>>>>>>>>>>> In >>> >>>> this >>>>>> >>>>>>> case, I >>>>>>>>>> >>>>>>>>>>> configured the windowed operator to have time windows >>>>>>>>>>>>>> >>>>>>>>>>>>> of >>> >>>> 1ms >>>>>> >>>>>>> each >>>>>>>> >>>>>>>>> and >>>>>>>>>> >>>>>>>>>>> passed data for each file with increasing timestamps: >>>>>>>>>>>>>> >>>>>>>>>>>>> (file1, >>>>>> >>>>>>> 1), >>>>>>>> >>>>>>>>> (file2, >>>>>>>>>>>> >>>>>>>>>>>>> 2) and so on. Is there a better way of handling this >>>>>>>>>>>>>> >>>>>>>>>>>>> scenario? >>>>>>> >>>>>>>> Regarding (2 - count based windows), I think there is >>>>>>>>>>>>>> >>>>>>>>>>>>> a >>> >>>> trigger >>>>>>> >>>>>>>> option >>>>>>>>>>> >>>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> process count based windows. In case I want to process >>>>>>>>>>>>>> >>>>>>>>>>>>> every >>>>>> >>>>>>> 1000 >>>>>>>> >>>>>>>>> tuples >>>>>>>>>>>> >>>>>>>>>>>>> as >>>>>>>>>>>>> >>>>>>>>>>>>>> a batch, I could set the Trigger option to >>>>>>>>>>>>>> >>>>>>>>>>>>> CountTrigger >>> >>>> with >>>>>> >>>>>>> the >>>>>>>> >>>>>>>>> accumulation set to Discarding. Is this correct? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using >>>>>>>>>>>>>> >>>>>>>>>>>>> Global >>>>> >>>>>> window. >>>>>>>>> >>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>>> >>>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>> >>>>>>>>>>>>> _________________________ >>> >>>> Bhupesh Chawda >>>>>>>>>>>>>> >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>> >>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan < >>>>>>>>>>>>>> >>>>>>>>>>>>> david...@gmail.com> >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I'm worried that we are making the watermark concept >>>>>>>>>>>>>>> >>>>>>>>>>>>>> too >>>>> >>>>>> complicated. >>>>>>>>>>> >>>>>>>>>>>> Watermarks should simply just tell you what windows >>>>>>>>>>>>>>> >>>>>>>>>>>>>> can >>>>> >>>>>> be >>>>>> >>>>>>> considered >>>>>>>>>>> >>>>>>>>>>>> complete. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Point 2 is basically a count-based window. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Watermarks >>> >>>> do >>>>> >>>>>> not >>>>>>> >>>>>>>> play a >>>>>>>>>> >>>>>>>>>>> role >>>>>>>>>>>>> >>>>>>>>>>>>>> here because the window is always complete at the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> n-th >>> >>>> tuple. >>>>>>> >>>>>>>> If I understand correctly, point 3 is for batch >>>>>>>>>>>>>>> >>>>>>>>>>>>>> processing >>>>>> >>>>>>> of >>>>>>> >>>>>>>> files. >>>>>>>>>>> >>>>>>>>>>>> Unless >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the files contain timed events, it sounds to be that >>>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>> >>>>>> can >>>>>>> >>>>>>>> be >>>>>>>>> >>>>>>>>>> achieved >>>>>>>>>>>>> >>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> watermark >>>>> >>>>>> with >>>>>>>> >>>>>>>>> a >>>>>>>>> >>>>>>>>>> +infinity >>>>>>>>>>>>>> >>>>>>>>>>>>>>> timestamp can be used so that triggers will be fired >>>>>>>>>>>>>>> >>>>>>>>>>>>>> upon >>>>> >>>>>> receipt >>>>>>>>> >>>>>>>>>> of >>>>>>>>>>> >>>>>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>>>> watermark. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can >>>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>> >>>> achieved >>>>>>>> >>>>>>>>> with a >>>>>>>>>>> >>>>>>>>>>>> watermark with a +infinity timestamp. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> David >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < >>>>>>>>>>>>>>> >>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For an input operator which is supposed to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> generate >>> >>>> watermarks >>>>>>>>> >>>>>>>>>> for >>>>>>>>>>> >>>>>>>>>>>> downstream operators, I can think about the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> following >>>>> >>>>>> watermarks >>>>>>>>>> >>>>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> operator can emit: >>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> watermark) >>>>>>>> >>>>>>>>> 2. Number of tuple based watermarks (Every n >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> tuples) >>> >>>> 3. File based watermarks (Start file, end file) >>>>>>>>>>>>>>>> 4. Final watermark >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> File based watermarks seem to be applicable for >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> batch >>>>> >>>>>> (file >>>>>>> >>>>>>>> based) >>>>>>>>>>> >>>>>>>>>>>> as >>>>>>>>>>>> >>>>>>>>>>>>> well, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and hence I thought of looking at these first. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Does >>> >>>> this >>>>>> >>>>>>> seem >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> be >>>>>>>>>>> >>>>>>>>>>>> in >>>>>>>>>>>>> >>>>>>>>>>>>>> line >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> with the thought process? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> _________________________ >>>>>> >>>>>>> Bhupesh Chawda >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Software Engineer >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> t...@apache.org >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I don't think this should be designed based on a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> simplistic >>>>>>>> >>>>>>>>> file >>>>>>>>>>> >>>>>>>>>>>> input-output scenario. It would be good to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> include a >>>>> >>>>>> stateful >>>>>>>>> >>>>>>>>>> transformation based on event time. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> More complex pipelines contain stateful >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> transformations >>>>>> >>>>>>> that >>>>>>>>> >>>>>>>>>> depend >>>>>>>>>>>> >>>>>>>>>>>>> on >>>>>>>>>>>>>> >>>>>>>>>>>>>>> windowing and watermarks. I think we need a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> watermark >>>>> >>>>>> concept >>>>>>>>> >>>>>>>>>> that >>>>>>>>>>>> >>>>>>>>>>>>> is >>>>>>>>>>>>> >>>>>>>>>>>>>> based >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> on progress in event time (or other monotonic >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> increasing >>>>>>> >>>>>>>> sequence) >>>>>>>>>>>> >>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>>> other operators can generically work with. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Note that even file input in many cases can >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> produce >>>>> >>>>>> time >>>>>>> >>>>>>>> based >>>>>>>>>> >>>>>>>>>>> watermarks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> for example when you read part files that are >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> bound >>>>> >>>>>> by >>>>>> >>>>>>> event >>>>>>>>> >>>>>>>>>> time. >>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> < >>> >>>> bhup...@datatorrent.com >>>>>>>>>>>>>> >>>>>>>>>>>>>>