Even this will not work because the output port of CsvParser is of type Object. Even though Customer extends Tuple<Object>, it will still fail to work since Tuple<Object> gets output as Object.
*DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();* The input port type at windowed operator with InputT = Object : *DefaultInputPort<Tuple<Object>>* Ajay On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote: > Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to > the actual type of InputT at runtime. Introducing an operator just to do a > cast is not a good design decision, IMO. > > Thank you, > Vlad > > Отправлено с iPhone > > > On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote: > > > > I am using WindowedOperatorImpl and it is declared as follows. > > > > WindowedOperatorImpl<InputT, AccumulationType, OutputType> > windowedOperator > > = new WindowedOperatorImpl<>(); > > > > In my application scenario, the InputT is Customer POJO which is getting > > output as an Object by CsvParser. > > > > > > Ajay > > > > On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com> > > wrote: > > > >> How do you declare WindowedOperator? > >> > >> Thank you, > >> > >> Vlad > >> > >> > >>> On 4/28/17 10:35, AJAY GUPTA wrote: > >>> > >>> Vlad, > >>> > >>> The approach you suggested doesn't work because the CSVParser outputs > >>> Object Data Type irrespective of the POJO class being emitted. > >>> > >>> > >>> Ajay > >>> > >>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com> > >>> wrote: > >>> > >>> Make your POJO class implement WindowedOperator Tuple interface (it may > >>>> return itself in getValue()). > >>>> > >>>> Thank you, > >>>> > >>>> Vlad > >>>> > >>>> On 4/28/17 02:44, AJAY GUPTA wrote: > >>>> > >>>> Hi All, > >>>>> > >>>>> I am creating an application which is using Windowed Operator. This > >>>>> application involves CsvParser operator emitting a POJO object which > is > >>>>> to > >>>>> be passed as input to WindowedOperator. The WindowedOperator > requires an > >>>>> instance of Tuple class as input : > >>>>> *public final transient DefaultInputPort<Tuple<InputT>> > >>>>> input = new DefaultInputPort<Tuple<InputT>>() * > >>>>> > >>>>> Due to this, the addStream cannot work as the output of CsvParser's > >>>>> output > >>>>> port is not compatible with input port type of WindowedOperator. > >>>>> One way to solve this problem is to have an operator between the > above > >>>>> two > >>>>> operators as a convertor. > >>>>> I would like to know if there is any other more generic approach to > >>>>> solve > >>>>> this problem without writing a new Operator for every new application > >>>>> using > >>>>> Windowed Operators. > >>>>> > >>>>> Thanks, > >>>>> Ajay > >>>>> > >>>>> > >>>>> > >>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda < > >>>>> bhup...@datatorrent.com> > >>>>> wrote: > >>>>> > >>>>> Hi All, > >>>>> > >>>>>> I think we have some agreement on the way we should use control > tuples > >>>>>> for > >>>>>> File I/O operators to support batch. > >>>>>> > >>>>>> In order to have more operators in Malhar, support this paradigm, I > >>>>>> think > >>>>>> we should also look at store operators - JDBC, Cassandra, HBase etc. > >>>>>> The case with these operators is simpler as most of these do not > poll > >>>>>> the > >>>>>> sources (except JDBC poller operator) and just stop once they have > >>>>>> read a > >>>>>> fixed amount of data. In other words, these are inherently batch > >>>>>> sources. > >>>>>> The only change that we should add to these operators is to shut > down > >>>>>> the > >>>>>> DAG once the reading of data is done. For a windowed operator this > >>>>>> would > >>>>>> mean a Global window with a final watermark before the DAG is shut > >>>>>> down. > >>>>>> > >>>>>> ~ Bhupesh > >>>>>> > >>>>>> > >>>>>> _______________________________________________________ > >>>>>> > >>>>>> Bhupesh Chawda > >>>>>> > >>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>> > >>>>>> www.datatorrent.com | apex.apache.org > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda < > >>>>>> bhup...@datatorrent.com> > >>>>>> wrote: > >>>>>> > >>>>>> Hi Thomas, > >>>>>> > >>>>>>> Even though the windowing operator is not just "event time", it > seems > >>>>>>> it > >>>>>>> is too much dependent on the "time" attribute of the incoming > tuple. > >>>>>>> This > >>>>>>> is the reason we had to model the file index as a timestamp to > solve > >>>>>>> the > >>>>>>> batch case for files. > >>>>>>> Perhaps we should work on increasing the scope of the windowed > >>>>>>> operator > >>>>>>> > >>>>>>> to > >>>>>> > >>>>>> consider other types of windows as well. The Sequence option > suggested > >>>>>>> by > >>>>>>> David seems to be something in that direction. > >>>>>>> > >>>>>>> ~ Bhupesh > >>>>>>> > >>>>>>> > >>>>>>> _______________________________________________________ > >>>>>>> > >>>>>>> Bhupesh Chawda > >>>>>>> > >>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>> > >>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> > >>>>>>> wrote: > >>>>>>> > >>>>>>> That's correct, we are looking at a generalized approach for state > >>>>>>> > >>>>>>>> management vs. a series of special cases. > >>>>>>>> > >>>>>>>> And to be clear, windowing does not imply event time, otherwise it > >>>>>>>> would > >>>>>>>> be > >>>>>>>> "EventTimeOperator" :-) > >>>>>>>> > >>>>>>>> Thomas > >>>>>>>> > >>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda < > >>>>>>>> > >>>>>>>> bhup...@datatorrent.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi David, > >>>>>>>> > >>>>>>>>> I went through the discussion, but it seems like it is more on > the > >>>>>>>>> > >>>>>>>>> event > >>>>>>>> > >>>>>>> time watermark handling as opposed to batches. What we are trying > to > >>>>>>> > >>>>>>>> do > >>>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>>> have watermarks serve the purpose of demarcating batches using > >>>>>>>>> control > >>>>>>>>> tuples. Since each batch is separate from others, we would like > to > >>>>>>>>> > >>>>>>>>> have > >>>>>>>> > >>>>>>> stateful processing within a batch, but not across batches. > >>>>>>> > >>>>>>>> At the same time, we would like to do this in a manner which is > >>>>>>>>> > >>>>>>>>> consistent > >>>>>>>> > >>>>>>>> with the windowing mechanism provided by the windowed operator. > This > >>>>>>>>> > >>>>>>>>> will > >>>>>>>> > >>>>>>>> allow us to treat a single batch as a (bounded) stream and apply > all > >>>>>>>>> > >>>>>>>>> the > >>>>>>>> > >>>>>>> event time windowing concepts in that time span. > >>>>>>> > >>>>>>>> For example, let's say I need to process data for a day (24 > hours) as > >>>>>>>>> > >>>>>>>>> a > >>>>>>>> > >>>>>>> single batch. The application is still streaming in nature: it > would > >>>>>>> > >>>>>>>> end > >>>>>>>> > >>>>>>> the batch after a day and start a new batch the next day. At the > same > >>>>>>> > >>>>>>>> time, > >>>>>>>> > >>>>>>>> I would be able to have early trigger firings every minute as > well as > >>>>>>>>> > >>>>>>>>> drop > >>>>>>>> > >>>>>>>> any data which is say, 5 mins late. All this within a single day. > >>>>>>>>> > >>>>>>>>> ~ Bhupesh > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> _______________________________________________________ > >>>>>>>>> > >>>>>>>>> Bhupesh Chawda > >>>>>>>>> > >>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>>>> > >>>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com> > >>>>>>>>> > >>>>>>>>> wrote: > >>>>>>>> > >>>>>>> There is a discussion in the Flink mailing list about key-based > >>>>>>> > >>>>>>>> watermarks. > >>>>>>>>> > >>>>>>>>> I think it's relevant to our use case here. > >>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc > >>>>>>>>>> c6510ef > >>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E > >>>>>>>>>> > >>>>>>>>>> David > >>>>>>>>>> > >>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < > >>>>>>>>>> > >>>>>>>>>> bhup...@datatorrent.com > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi David, > >>>>>>>>>> > >>>>>>>>>>> If using time window does not seem appropriate, we can have > >>>>>>>>>>> > >>>>>>>>>>> another > >>>>>>>>>> > >>>>>>>>> class > >>>>>>> > >>>>>>>> which is more suited for such sequential and distinct windows. > >>>>>>>>>> Perhaps, a > >>>>>>>>>> CustomWindow option can be introduced which takes in a window > id. > >>>>>>>>>> The > >>>>>>>>>> > >>>>>>>>> purpose of this window option could be to translate the window id > >>>>>>>>> > >>>>>>>>>> into > >>>>>>>>>> > >>>>>>>>> appropriate timestamps. > >>>>>>>>> > >>>>>>>>>> Another option would be to go with a custom timestampExtractor > for > >>>>>>>>>>> > >>>>>>>>>>> such > >>>>>>>>>> > >>>>>>>>> tuples which translates the each unique file name to a distinct > >>>>>>>>> > >>>>>>>>>> timestamp > >>>>>>>>>> while using time windows in the windowed operator. > >>>>>>>>>> > >>>>>>>>>>> ~ Bhupesh > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> _______________________________________________________ > >>>>>>>>>>> > >>>>>>>>>>> Bhupesh Chawda > >>>>>>>>>>> > >>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>>>>>> > >>>>>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan < > david...@gmail.com> > >>>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>> I now see your rationale on putting the filename in the window. > >>>>>>>>>> > >>>>>>>>>>> As far as I understand, the reasons why the filename is not > part > >>>>>>>>>>>> > >>>>>>>>>>>> of > >>>>>>>>>>> > >>>>>>>>>> the > >>>>>>>>> > >>>>>>>>> key > >>>>>>>>>> > >>>>>>>>>>> and the Global Window is not used are: > >>>>>>>>>>>> > >>>>>>>>>>>> 1) The files are processed in sequence, not in parallel > >>>>>>>>>>>> 2) The windowed operator should not keep the state associated > >>>>>>>>>>>> > >>>>>>>>>>>> with > >>>>>>>>>>> > >>>>>>>>>> the > >>>>>>> > >>>>>>>> file > >>>>>>>>>> > >>>>>>>>>>> when the processing of the file is done > >>>>>>>>>>>> 3) The trigger should be fired for the file when a file is > done > >>>>>>>>>>>> > >>>>>>>>>>>> processing. > >>>>>>>>>>> > >>>>>>>>>>> However, if the file is just a sequence has nothing to do with > a > >>>>>>>>>>>> > >>>>>>>>>>>> timestamp, > >>>>>>>>>>> > >>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do > >>>>>>>>>>>> > >>>>>>>>>>>> and > >>>>>>>>>>> > >>>>>>>>>> would > >>>>>>>>> > >>>>>>>>>> just create confusions to the users, especially when it's used > >>>>>>>>>>> as > >>>>>>>>>>> > >>>>>>>>>> an > >>>>>>> > >>>>>>>> example for new users. > >>>>>>>>> > >>>>>>>>>> How about having a separate class called SequenceWindow? And > >>>>>>>>>>>> > >>>>>>>>>>>> perhaps > >>>>>>>>>>> > >>>>>>>>>> TimeWindow can inherit from it? > >>>>>>>>> > >>>>>>>>>> David > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org > > > >>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < > >>>>>>>>>> > >>>>>>>>>>> bhup...@datatorrent.com > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I think my comments related to count based windows might be > >>>>>>>>>>>>> causing > >>>>>>>>>>>>> > >>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now. > >>>>>>>>>> > >>>>>>>>>>> Just want to make sure we are on the same page wrt. the > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "each > >>>>>>>>>>>>> > >>>>>>>>>>>> file > >>>>>>> > >>>>>>>> is a > >>>>>>>>>> > >>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from > >>>>>>>>>>>> > >>>>>>>>>>>>> the > >>>>>>>>>>>>> > >>>>>>>>>>>> same > >>>>>>>>> > >>>>>>>>>> file > >>>>>>>>>>> > >>>>>>>>>>>> has the same timestamp (which is just a sequence number) and > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> that > >>>>>>>>>>>>> > >>>>>>>>>>>> helps > >>>>>>>>> > >>>>>>>>>> keep tuples from each file in a separate window. > >>>>>>>>>>>> > >>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a > >>>>>>>>>>>>>> > >>>>>>>>>>>>> time > >>>>>>>>>>>> > >>>>>>>>>>> stamp > >>>>>>>>> > >>>>>>>>>> also, depending on the file naming convention. And if it was > >>>>>>>>>>> > >>>>>>>>>>>> event > >>>>>>>>>>>> > >>>>>>>>>>> time > >>>>>>>>> > >>>>>>>>>> processing, the watermark would be derived from records within > >>>>>>>>>>> > >>>>>>>>>>>> the > >>>>>>>>>>>> > >>>>>>>>>>> file. > >>>>>>>>> > >>>>>>>>>> Agreed, the source should have a mechanism to control the time > >>>>>>>>>>>> stamp > >>>>>>>>>>>> > >>>>>>>>>>> extraction along with everything else pertaining to the > >>>>>>>>>> > >>>>>>>>>>> watermark > >>>>>>>>>>>> > >>>>>>>>>>> generation. > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to > >>>>>>>>>>>>> identify > >>>>>>>>>>>>> > >>>>>>>>>>>> the > >>>>>>>>>>> > >>>>>>>>>>>> timestamp (sequence number) for a file. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> ~ Bhupesh > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> _______________________________________________________ > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Bhupesh Chawda > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise < > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> t...@apache.org > >>>>>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>> > >>>>>>>> I don't think this is a use case for count based window. > >>>>>>>>>>>> > >>>>>>>>>>>>> We have multiple files that are retrieved in a sequence > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>> > >>>>>>>>>>>>> there > >>>>>>> > >>>>>>>> is > >>>>>>>>>> > >>>>>>>>>>> no > >>>>>>>>>>>> > >>>>>>>>>>>> knowledge of the number of records per file. The > >>>>>>>>>>>>> > >>>>>>>>>>>>>> requirement is > >>>>>>>>>>>>>> > >>>>>>>>>>>>> to > >>>>>>>>> > >>>>>>>>>> aggregate each file separately and emit the aggregate when > >>>>>>>>>>> > >>>>>>>>>>>> the > >>>>>>>>>>>>>> > >>>>>>>>>>>>> file > >>>>>>>>> > >>>>>>>>>> is > >>>>>>>>>>> > >>>>>>>>>>>> read > >>>>>>>>>>>>> > >>>>>>>>>>>>>> fully. There is no concept of "end of something" for an > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> individual > >>>>>>>>>>>>>> > >>>>>>>>>>>>> key > >>>>>>>>>>> > >>>>>>>>>>>> and > >>>>>>>>>>>>> > >>>>>>>>>>>>>> global window isn't applicable. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>> > >>>>>>>>>>>>> can > >>>>>>>>> > >>>>>>>>>> be > >>>>>>>>>>> > >>>>>>>>>>> solved using watermark and window (in this case the window > >>>>>>>>>>>> > >>>>>>>>>>>>> timestamp > >>>>>>>>>>>>>> > >>>>>>>>>>>>> isn't > >>>>>>>>>>>> > >>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thomas > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan < > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> david...@gmail.com > >>>>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> I don't think this is the way to go. Global Window only > >>>>>>>>>>>>> > >>>>>>>>>>>>>> means > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>>> timestamp > >>>>>>>>>>> > >>>>>>>>>>>> does not matter (or that there is no timestamp). It does > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> necessarily > >>>>>>>>> > >>>>>>>>>> mean it's a large batch. Unless there is some notion of > >>>>>>>>>>>>>>> event > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> time > >>>>>>>>> > >>>>>>>>>> for > >>>>>>>>>>>> > >>>>>>>>>>>>> each > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> file, you don't want to embed the file into the window > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> itself. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> If you want the result broken up by file name, and if > >>>>>>>>>> > >>>>>>>>>>> the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> files > >>>>>>> > >>>>>>>> are > >>>>>>>>>> > >>>>>>>>>>> to > >>>>>>>>>>>> > >>>>>>>>>>>>> be > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> processed in parallel, I think making the file name be > >>>>>>>>>>>>>>> part > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> of > >>>>>>>>> > >>>>>>>>> the > >>>>>>>>>> > >>>>>>>>>>> key > >>>>>>>>>>>> > >>>>>>>>>>>>> is > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow > >>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>>> file > >>>>>>>>>>> > >>>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> be part of the window. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> you're > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> welcome > >>>>>>>>> > >>>>>>>>>> to > >>>>>>>>>>>> > >>>>>>>>>>>>> add > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> that feature. In case of count-based windows, there > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> be > >>>>>>> > >>>>>>>> no > >>>>>>>>> > >>>>>>>>> notion > >>>>>>>>>> > >>>>>>>>>>> of > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> time and you probably only trigger at the end of each > >>>>>>>>>>>>>>> window. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> In > >>>>>>>>> > >>>>>>>>>> the > >>>>>>>>>>> > >>>>>>>>>>>> case > >>>>>>>>>>>>> > >>>>>>>>>>>>>> of count-based windows, the watermark only matters for > >>>>>>>>>>>>>>> batch > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> since > >>>>>>>>> > >>>>>>>>>> you > >>>>>>>>>>>> > >>>>>>>>>>>>> need > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 10, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>> number > >>>>>>>>>> > >>>>>>>>>>> of > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to > >>>>>>>>>>>>>>> end > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>> > >>>>>>>>> last > >>>>>>>>>> > >>>>>>>>>>> window > >>>>>>>>>>>>> > >>>>>>>>>>>>>> with 5 tuples). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> bhup...@datatorrent.com > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi David, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for your comments. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The wordcount example that I created based on the > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> windowed > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> operator > >>>>>>>>> > >>>>>>>>>> does > >>>>>>>>>>>>> > >>>>>>>>>>>>>> processing of word counts per file (each file as a > >>>>>>>>>>>>>>>> separate > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> batch), > >>>>>>>>> > >>>>>>>>>> i.e. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> process counts for each file and dump into separate > >>>>>>>>>>>>>>>> files. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As I understand Global window is for one large batch; > >>>>>>>>> > >>>>>>>>>> i.e. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> all > >>>>>>>>> > >>>>>>>>>> incoming > >>>>>>>>>>> > >>>>>>>>>>>> data falls into the same batch. This could not be > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> processed > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> using > >>>>>>>>> > >>>>>>>>>> GlobalWindow option as we need more than one windows. > >>>>>>>>>>>> > >>>>>>>>>>>>> In > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> this > >>>>>>> > >>>>>>>> case, I > >>>>>>>>>> > >>>>>>>>>>> configured the windowed operator to have time windows > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1ms > >>>>>>> > >>>>>>>> each > >>>>>>>>>> > >>>>>>>>>>> and > >>>>>>>>>>>> > >>>>>>>>>>>>> passed data for each file with increasing timestamps: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (file1, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1), > >>>>>>>>>> > >>>>>>>>>>> (file2, > >>>>>>>>>>>> > >>>>>>>>>>>>> 2) and so on. Is there a better way of handling this > >>>>>>>>>>>>>>>> scenario? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is > >>>>>>>>>>> > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> trigger > >>>>>>> > >>>>>>>> option > >>>>>>>>>>> > >>>>>>>>>>>> to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> process count based windows. In case I want to process > >>>>>>>>>>>>>>>> every > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1000 > >>>>>>>>>> > >>>>>>>>>>> tuples > >>>>>>>>>>>> > >>>>>>>>>>>>> as > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> a batch, I could set the Trigger option to > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> CountTrigger > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> with > >>>>>>> > >>>>>>>> the > >>>>>>>>>> > >>>>>>>>>>> accumulation set to Discarding. Is this correct? > >>>>>>>>>>>> > >>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Global > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> window. > >>>>>>>>> > >>>>>>>>>> ~ Bhupesh > >>>>>>>>>>>>> > >>>>>>>>>>>>>> ______________________________ > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> _________________________ > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Bhupesh Chawda > >>>>>>> > >>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan < > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> david...@gmail.com> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> I'm worried that we are making the watermark concept > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> too > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> complicated. > >>>>>>>>> > >>>>>>>>>> Watermarks should simply just tell you what windows > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> be > >>>>>>>>> > >>>>>>>>> considered > >>>>>>>>>> > >>>>>>>>>>> complete. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Point 2 is basically a count-based window. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Watermarks > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> do > >>>>>>> > >>>>>>>> not > >>>>>>>>> > >>>>>>>>>> play a > >>>>>>>>>>> > >>>>>>>>>>>> role > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> here because the window is always complete at the > >>>>>>>>>>>>>>>>> n-th > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> tuple. > >>>>>>> > >>>>>>>> If I understand correctly, point 3 is for batch > >>>>>>>>>>> > >>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> of > >>>>>>>>>> > >>>>>>>>>> files. > >>>>>>>>>>> > >>>>>>>>>>>> Unless > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> can > >>>>>>>>> > >>>>>>>>>> be > >>>>>>>>>>> > >>>>>>>>>>>> achieved > >>>>>>>>>>>>> > >>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a > >>>>>>>>>>>>>>>>> watermark > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> with > >>>>>>>>> > >>>>>>>>>> a > >>>>>>>>>>>> > >>>>>>>>>>>> +infinity > >>>>>>>>>>>>> > >>>>>>>>>>>>>> timestamp can be used so that triggers will be fired > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> upon > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> receipt > >>>>>>>>> > >>>>>>>>>> of > >>>>>>>>>>>>> > >>>>>>>>>>>>>> that > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> watermark. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> achieved > >>>>>>> > >>>>>>>> with a > >>>>>>>>>>>> > >>>>>>>>>>>>> watermark with a +infinity timestamp. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> David > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Thomas, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For an input operator which is supposed to > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> generate > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> watermarks > >>>>>>> > >>>>>>>> for > >>>>>>>>>>>>> > >>>>>>>>>>>>>> downstream operators, I can think about the > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> following > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> watermarks > >>>>>>>>> > >>>>>>>>>> that > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> operator can emit: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> watermark) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n > >>>>>>>>>>>> > >>>>>>>>>>>>> tuples) > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file) > >>>>>>> > >>>>>>>> 4. Final watermark > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> batch > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> (file > >>>>>>>>> > >>>>>>>>>> based) > >>>>>>>>>>> > >>>>>>>>>>>> as > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> well, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> and hence I thought of looking at these first. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Does > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> this > >>>>>>> > >>>>>>>> seem > >>>>>>>>>> > >>>>>>>>>>> to > >>>>>>>>>>>> > >>>>>>>>>>>>> be > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> line > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> with the thought process? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> ~ Bhupesh > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> ______________________________ > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> _________________________ > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Bhupesh Chawda > >>>>>>>>>> > >>>>>>>>>>> Software Engineer > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> t...@apache.org > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I don't think this should be designed based on a > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> simplistic > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> file > >>>>>>>>>>>> > >>>>>>>>>>>>> input-output scenario. It would be good to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> include a > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> stateful > >>>>>>>>> > >>>>>>>>>> transformation based on event time. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> More complex pipelines contain stateful > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> transformations > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> that > >>>>>>>>>> > >>>>>>>>>>> depend > >>>>>>>>>>>>> > >>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> watermark > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> concept > >>>>>>>>> > >>>>>>>>>> that > >>>>>>>>>>>>> > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> based > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> increasing > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> sequence) > >>>>>>>>>>> > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> other operators can generically work with. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Note that even file input in many cases can > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> produce > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> time > >>>>>>>>> > >>>>>>>>>> based > >>>>>>>>>>> > >>>>>>>>>>>> watermarks, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> for example when you read part files that are > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> bound > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> by > >>>>>>>>> > >>>>>>>>> event > >>>>>>>>>> > >>>>>>>>>>> time. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thomas > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com > >>>>>>> > >>>>>>>> > >>>>>>>>>>>>>>>>> > >> >