Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to the actual type of InputT at runtime. Introducing an operator just to do a cast is not a good design decision, IMO.
Thank you, Vlad Отправлено с iPhone > On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote: > > I am using WindowedOperatorImpl and it is declared as follows. > > WindowedOperatorImpl<InputT, AccumulationType, OutputType> windowedOperator > = new WindowedOperatorImpl<>(); > > In my application scenario, the InputT is Customer POJO which is getting > output as an Object by CsvParser. > > > Ajay > > On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com> > wrote: > >> How do you declare WindowedOperator? >> >> Thank you, >> >> Vlad >> >> >>> On 4/28/17 10:35, AJAY GUPTA wrote: >>> >>> Vlad, >>> >>> The approach you suggested doesn't work because the CSVParser outputs >>> Object Data Type irrespective of the POJO class being emitted. >>> >>> >>> Ajay >>> >>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com> >>> wrote: >>> >>> Make your POJO class implement WindowedOperator Tuple interface (it may >>>> return itself in getValue()). >>>> >>>> Thank you, >>>> >>>> Vlad >>>> >>>> On 4/28/17 02:44, AJAY GUPTA wrote: >>>> >>>> Hi All, >>>>> >>>>> I am creating an application which is using Windowed Operator. This >>>>> application involves CsvParser operator emitting a POJO object which is >>>>> to >>>>> be passed as input to WindowedOperator. The WindowedOperator requires an >>>>> instance of Tuple class as input : >>>>> *public final transient DefaultInputPort<Tuple<InputT>> >>>>> input = new DefaultInputPort<Tuple<InputT>>() * >>>>> >>>>> Due to this, the addStream cannot work as the output of CsvParser's >>>>> output >>>>> port is not compatible with input port type of WindowedOperator. >>>>> One way to solve this problem is to have an operator between the above >>>>> two >>>>> operators as a convertor. >>>>> I would like to know if there is any other more generic approach to >>>>> solve >>>>> this problem without writing a new Operator for every new application >>>>> using >>>>> Windowed Operators. >>>>> >>>>> Thanks, >>>>> Ajay >>>>> >>>>> >>>>> >>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda < >>>>> bhup...@datatorrent.com> >>>>> wrote: >>>>> >>>>> Hi All, >>>>> >>>>>> I think we have some agreement on the way we should use control tuples >>>>>> for >>>>>> File I/O operators to support batch. >>>>>> >>>>>> In order to have more operators in Malhar, support this paradigm, I >>>>>> think >>>>>> we should also look at store operators - JDBC, Cassandra, HBase etc. >>>>>> The case with these operators is simpler as most of these do not poll >>>>>> the >>>>>> sources (except JDBC poller operator) and just stop once they have >>>>>> read a >>>>>> fixed amount of data. In other words, these are inherently batch >>>>>> sources. >>>>>> The only change that we should add to these operators is to shut down >>>>>> the >>>>>> DAG once the reading of data is done. For a windowed operator this >>>>>> would >>>>>> mean a Global window with a final watermark before the DAG is shut >>>>>> down. >>>>>> >>>>>> ~ Bhupesh >>>>>> >>>>>> >>>>>> _______________________________________________________ >>>>>> >>>>>> Bhupesh Chawda >>>>>> >>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>> >>>>>> www.datatorrent.com | apex.apache.org >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda < >>>>>> bhup...@datatorrent.com> >>>>>> wrote: >>>>>> >>>>>> Hi Thomas, >>>>>> >>>>>>> Even though the windowing operator is not just "event time", it seems >>>>>>> it >>>>>>> is too much dependent on the "time" attribute of the incoming tuple. >>>>>>> This >>>>>>> is the reason we had to model the file index as a timestamp to solve >>>>>>> the >>>>>>> batch case for files. >>>>>>> Perhaps we should work on increasing the scope of the windowed >>>>>>> operator >>>>>>> >>>>>>> to >>>>>> >>>>>> consider other types of windows as well. The Sequence option suggested >>>>>>> by >>>>>>> David seems to be something in that direction. >>>>>>> >>>>>>> ~ Bhupesh >>>>>>> >>>>>>> >>>>>>> _______________________________________________________ >>>>>>> >>>>>>> Bhupesh Chawda >>>>>>> >>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>> >>>>>>> www.datatorrent.com | apex.apache.org >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>> That's correct, we are looking at a generalized approach for state >>>>>>> >>>>>>>> management vs. a series of special cases. >>>>>>>> >>>>>>>> And to be clear, windowing does not imply event time, otherwise it >>>>>>>> would >>>>>>>> be >>>>>>>> "EventTimeOperator" :-) >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda < >>>>>>>> >>>>>>>> bhup...@datatorrent.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi David, >>>>>>>> >>>>>>>>> I went through the discussion, but it seems like it is more on the >>>>>>>>> >>>>>>>>> event >>>>>>>> >>>>>>> time watermark handling as opposed to batches. What we are trying to >>>>>>> >>>>>>>> do >>>>>>>> >>>>>>> is >>>>>>> >>>>>>>> have watermarks serve the purpose of demarcating batches using >>>>>>>>> control >>>>>>>>> tuples. Since each batch is separate from others, we would like to >>>>>>>>> >>>>>>>>> have >>>>>>>> >>>>>>> stateful processing within a batch, but not across batches. >>>>>>> >>>>>>>> At the same time, we would like to do this in a manner which is >>>>>>>>> >>>>>>>>> consistent >>>>>>>> >>>>>>>> with the windowing mechanism provided by the windowed operator. This >>>>>>>>> >>>>>>>>> will >>>>>>>> >>>>>>>> allow us to treat a single batch as a (bounded) stream and apply all >>>>>>>>> >>>>>>>>> the >>>>>>>> >>>>>>> event time windowing concepts in that time span. >>>>>>> >>>>>>>> For example, let's say I need to process data for a day (24 hours) as >>>>>>>>> >>>>>>>>> a >>>>>>>> >>>>>>> single batch. The application is still streaming in nature: it would >>>>>>> >>>>>>>> end >>>>>>>> >>>>>>> the batch after a day and start a new batch the next day. At the same >>>>>>> >>>>>>>> time, >>>>>>>> >>>>>>>> I would be able to have early trigger firings every minute as well as >>>>>>>>> >>>>>>>>> drop >>>>>>>> >>>>>>>> any data which is say, 5 mins late. All this within a single day. >>>>>>>>> >>>>>>>>> ~ Bhupesh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> _______________________________________________________ >>>>>>>>> >>>>>>>>> Bhupesh Chawda >>>>>>>>> >>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>> >>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>> >>>>>>> There is a discussion in the Flink mailing list about key-based >>>>>>> >>>>>>>> watermarks. >>>>>>>>> >>>>>>>>> I think it's relevant to our use case here. >>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc >>>>>>>>>> c6510ef >>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E >>>>>>>>>> >>>>>>>>>> David >>>>>>>>>> >>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < >>>>>>>>>> >>>>>>>>>> bhup...@datatorrent.com >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi David, >>>>>>>>>> >>>>>>>>>>> If using time window does not seem appropriate, we can have >>>>>>>>>>> >>>>>>>>>>> another >>>>>>>>>> >>>>>>>>> class >>>>>>> >>>>>>>> which is more suited for such sequential and distinct windows. >>>>>>>>>> Perhaps, a >>>>>>>>>> CustomWindow option can be introduced which takes in a window id. >>>>>>>>>> The >>>>>>>>>> >>>>>>>>> purpose of this window option could be to translate the window id >>>>>>>>> >>>>>>>>>> into >>>>>>>>>> >>>>>>>>> appropriate timestamps. >>>>>>>>> >>>>>>>>>> Another option would be to go with a custom timestampExtractor for >>>>>>>>>>> >>>>>>>>>>> such >>>>>>>>>> >>>>>>>>> tuples which translates the each unique file name to a distinct >>>>>>>>> >>>>>>>>>> timestamp >>>>>>>>>> while using time windows in the windowed operator. >>>>>>>>>> >>>>>>>>>>> ~ Bhupesh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> _______________________________________________________ >>>>>>>>>>> >>>>>>>>>>> Bhupesh Chawda >>>>>>>>>>> >>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>> >>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>> I now see your rationale on putting the filename in the window. >>>>>>>>>> >>>>>>>>>>> As far as I understand, the reasons why the filename is not part >>>>>>>>>>>> >>>>>>>>>>>> of >>>>>>>>>>> >>>>>>>>>> the >>>>>>>>> >>>>>>>>> key >>>>>>>>>> >>>>>>>>>>> and the Global Window is not used are: >>>>>>>>>>>> >>>>>>>>>>>> 1) The files are processed in sequence, not in parallel >>>>>>>>>>>> 2) The windowed operator should not keep the state associated >>>>>>>>>>>> >>>>>>>>>>>> with >>>>>>>>>>> >>>>>>>>>> the >>>>>>> >>>>>>>> file >>>>>>>>>> >>>>>>>>>>> when the processing of the file is done >>>>>>>>>>>> 3) The trigger should be fired for the file when a file is done >>>>>>>>>>>> >>>>>>>>>>>> processing. >>>>>>>>>>> >>>>>>>>>>> However, if the file is just a sequence has nothing to do with a >>>>>>>>>>>> >>>>>>>>>>>> timestamp, >>>>>>>>>>> >>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do >>>>>>>>>>>> >>>>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>> would >>>>>>>>> >>>>>>>>>> just create confusions to the users, especially when it's used >>>>>>>>>>> as >>>>>>>>>>> >>>>>>>>>> an >>>>>>> >>>>>>>> example for new users. >>>>>>>>> >>>>>>>>>> How about having a separate class called SequenceWindow? And >>>>>>>>>>>> >>>>>>>>>>>> perhaps >>>>>>>>>>> >>>>>>>>>> TimeWindow can inherit from it? >>>>>>>>> >>>>>>>>>> David >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org> >>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < >>>>>>>>>> >>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I think my comments related to count based windows might be >>>>>>>>>>>>> causing >>>>>>>>>>>>> >>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now. >>>>>>>>>> >>>>>>>>>>> Just want to make sure we are on the same page wrt. the >>>>>>>>>>>>>> >>>>>>>>>>>>>> "each >>>>>>>>>>>>> >>>>>>>>>>>> file >>>>>>> >>>>>>>> is a >>>>>>>>>> >>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from >>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>> same >>>>>>>>> >>>>>>>>>> file >>>>>>>>>>> >>>>>>>>>>>> has the same timestamp (which is just a sequence number) and >>>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>> helps >>>>>>>>> >>>>>>>>>> keep tuples from each file in a separate window. >>>>>>>>>>>> >>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a >>>>>>>>>>>>>> >>>>>>>>>>>>> time >>>>>>>>>>>> >>>>>>>>>>> stamp >>>>>>>>> >>>>>>>>>> also, depending on the file naming convention. And if it was >>>>>>>>>>> >>>>>>>>>>>> event >>>>>>>>>>>> >>>>>>>>>>> time >>>>>>>>> >>>>>>>>>> processing, the watermark would be derived from records within >>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>> file. >>>>>>>>> >>>>>>>>>> Agreed, the source should have a mechanism to control the time >>>>>>>>>>>> stamp >>>>>>>>>>>> >>>>>>>>>>> extraction along with everything else pertaining to the >>>>>>>>>> >>>>>>>>>>> watermark >>>>>>>>>>>> >>>>>>>>>>> generation. >>>>>>>>> >>>>>>>>>> >>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to >>>>>>>>>>>>> identify >>>>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>> >>>>>>>>>>>> timestamp (sequence number) for a file. >>>>>>>>>>>>> >>>>>>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> _______________________________________________________ >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bhupesh Chawda >>>>>>>>>>>>>> >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>> >>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise < >>>>>>>>>>>>>> >>>>>>>>>>>>>> t...@apache.org >>>>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>> >>>>>>>> I don't think this is a use case for count based window. >>>>>>>>>>>> >>>>>>>>>>>>> We have multiple files that are retrieved in a sequence >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>> there >>>>>>> >>>>>>>> is >>>>>>>>>> >>>>>>>>>>> no >>>>>>>>>>>> >>>>>>>>>>>> knowledge of the number of records per file. The >>>>>>>>>>>>> >>>>>>>>>>>>>> requirement is >>>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>>>>> >>>>>>>>>> aggregate each file separately and emit the aggregate when >>>>>>>>>>> >>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>> file >>>>>>>>> >>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>>> read >>>>>>>>>>>>> >>>>>>>>>>>>>> fully. There is no concept of "end of something" for an >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> individual >>>>>>>>>>>>>> >>>>>>>>>>>>> key >>>>>>>>>>> >>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>>> global window isn't applicable. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> this >>>>>>>>>>>>>> >>>>>>>>>>>>> can >>>>>>>>> >>>>>>>>>> be >>>>>>>>>>> >>>>>>>>>>> solved using watermark and window (in this case the window >>>>>>>>>>>> >>>>>>>>>>>>> timestamp >>>>>>>>>>>>>> >>>>>>>>>>>>> isn't >>>>>>>>>>>> >>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan < >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> david...@gmail.com >>>>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I don't think this is the way to go. Global Window only >>>>>>>>>>>>> >>>>>>>>>>>>>> means >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>> >>>>>>>>>> timestamp >>>>>>>>>>> >>>>>>>>>>>> does not matter (or that there is no timestamp). It does >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>> >>>>>>>>>>>>>> necessarily >>>>>>>>> >>>>>>>>>> mean it's a large batch. Unless there is some notion of >>>>>>>>>>>>>>> event >>>>>>>>>>>>>>> >>>>>>>>>>>>>> time >>>>>>>>> >>>>>>>>>> for >>>>>>>>>>>> >>>>>>>>>>>>> each >>>>>>>>>>>>>> >>>>>>>>>>>>>>> file, you don't want to embed the file into the window >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> itself. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> If you want the result broken up by file name, and if >>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>> files >>>>>>> >>>>>>>> are >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>> processed in parallel, I think making the file name be >>>>>>>>>>>>>>> part >>>>>>>>>>>>>>> >>>>>>>>>>>>>> of >>>>>>>>> >>>>>>>>> the >>>>>>>>>> >>>>>>>>>>> key >>>>>>>>>>>> >>>>>>>>>>>>> is >>>>>>>>>>>>>> >>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow >>>>>>>>>>>>>>> make >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>> >>>>>>>>>> file >>>>>>>>>>> >>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> be part of the window. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> you're >>>>>>>>>>>>>>> >>>>>>>>>>>>>> welcome >>>>>>>>> >>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> add >>>>>>>>>>>>>> >>>>>>>>>>>>>>> that feature. In case of count-based windows, there >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>>>>>> >>>>>>>> no >>>>>>>>> >>>>>>>>> notion >>>>>>>>>> >>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> time and you probably only trigger at the end of each >>>>>>>>>>>>>>> window. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> In >>>>>>>>> >>>>>>>>>> the >>>>>>>>>>> >>>>>>>>>>>> case >>>>>>>>>>>>> >>>>>>>>>>>>>> of count-based windows, the watermark only matters for >>>>>>>>>>>>>>> batch >>>>>>>>>>>>>>> >>>>>>>>>>>>>> since >>>>>>>>> >>>>>>>>>> you >>>>>>>>>>>> >>>>>>>>>>>>> need >>>>>>>>>>>>>> >>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>> >>>>>>>>> number >>>>>>>>>> >>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to >>>>>>>>>>>>>>> end >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>>>>> >>>>>>>>> last >>>>>>>>>> >>>>>>>>>>> window >>>>>>>>>>>>> >>>>>>>>>>>>>> with 5 tuples). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi David, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your comments. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The wordcount example that I created based on the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> windowed >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> operator >>>>>>>>> >>>>>>>>>> does >>>>>>>>>>>>> >>>>>>>>>>>>>> processing of word counts per file (each file as a >>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> batch), >>>>>>>>> >>>>>>>>>> i.e. >>>>>>>>>>>>> >>>>>>>>>>>>>> process counts for each file and dump into separate >>>>>>>>>>>>>>>> files. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> As I understand Global window is for one large batch; >>>>>>>>> >>>>>>>>>> i.e. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> all >>>>>>>>> >>>>>>>>>> incoming >>>>>>>>>>> >>>>>>>>>>>> data falls into the same batch. This could not be >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> processed >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> using >>>>>>>>> >>>>>>>>>> GlobalWindow option as we need more than one windows. >>>>>>>>>>>> >>>>>>>>>>>>> In >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> this >>>>>>> >>>>>>>> case, I >>>>>>>>>> >>>>>>>>>>> configured the windowed operator to have time windows >>>>>>>>>>>>>> >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1ms >>>>>>> >>>>>>>> each >>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>> passed data for each file with increasing timestamps: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> (file1, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1), >>>>>>>>>> >>>>>>>>>>> (file2, >>>>>>>>>>>> >>>>>>>>>>>>> 2) and so on. Is there a better way of handling this >>>>>>>>>>>>>>>> scenario? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is >>>>>>>>>>> >>>>>>>>>>>> a >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> trigger >>>>>>> >>>>>>>> option >>>>>>>>>>> >>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> process count based windows. In case I want to process >>>>>>>>>>>>>>>> every >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1000 >>>>>>>>>> >>>>>>>>>>> tuples >>>>>>>>>>>> >>>>>>>>>>>>> as >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a batch, I could set the Trigger option to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> CountTrigger >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> with >>>>>>> >>>>>>>> the >>>>>>>>>> >>>>>>>>>>> accumulation set to Discarding. Is this correct? >>>>>>>>>>>> >>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Global >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> window. >>>>>>>>> >>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>> >>>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> _________________________ >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Bhupesh Chawda >>>>>>> >>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> david...@gmail.com> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I'm worried that we are making the watermark concept >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> too >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> complicated. >>>>>>>>> >>>>>>>>>> Watermarks should simply just tell you what windows >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> be >>>>>>>>> >>>>>>>>> considered >>>>>>>>>> >>>>>>>>>>> complete. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Point 2 is basically a count-based window. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Watermarks >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> do >>>>>>> >>>>>>>> not >>>>>>>>> >>>>>>>>>> play a >>>>>>>>>>> >>>>>>>>>>>> role >>>>>>>>>>>>>> >>>>>>>>>>>>>>> here because the window is always complete at the >>>>>>>>>>>>>>>>> n-th >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> tuple. >>>>>>> >>>>>>>> If I understand correctly, point 3 is for batch >>>>>>>>>>> >>>>>>>>>>>> processing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> of >>>>>>>>>> >>>>>>>>>> files. >>>>>>>>>>> >>>>>>>>>>>> Unless >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> can >>>>>>>>> >>>>>>>>>> be >>>>>>>>>>> >>>>>>>>>>>> achieved >>>>>>>>>>>>> >>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a >>>>>>>>>>>>>>>>> watermark >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> with >>>>>>>>> >>>>>>>>>> a >>>>>>>>>>>> >>>>>>>>>>>> +infinity >>>>>>>>>>>>> >>>>>>>>>>>>>> timestamp can be used so that triggers will be fired >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> upon >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> receipt >>>>>>>>> >>>>>>>>>> of >>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> watermark. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> achieved >>>>>>> >>>>>>>> with a >>>>>>>>>>>> >>>>>>>>>>>>> watermark with a +infinity timestamp. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> David >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Thomas, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For an input operator which is supposed to >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> generate >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> watermarks >>>>>>> >>>>>>>> for >>>>>>>>>>>>> >>>>>>>>>>>>>> downstream operators, I can think about the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> watermarks >>>>>>>>> >>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> operator can emit: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> watermark) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n >>>>>>>>>>>> >>>>>>>>>>>>> tuples) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file) >>>>>>> >>>>>>>> 4. Final watermark >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> batch >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (file >>>>>>>>> >>>>>>>>>> based) >>>>>>>>>>> >>>>>>>>>>>> as >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> well, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and hence I thought of looking at these first. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Does >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> this >>>>>>> >>>>>>>> seem >>>>>>>>>> >>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>> in >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> line >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> with the thought process? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ~ Bhupesh >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> ______________________________ >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> _________________________ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Bhupesh Chawda >>>>>>>>>> >>>>>>>>>>> Software Engineer >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> t...@apache.org >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> I don't think this should be designed based on a >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> simplistic >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> file >>>>>>>>>>>> >>>>>>>>>>>>> input-output scenario. It would be good to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> include a >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> stateful >>>>>>>>> >>>>>>>>>> transformation based on event time. >>>>>>>>>>>>> >>>>>>>>>>>>>> More complex pipelines contain stateful >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> transformations >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> that >>>>>>>>>> >>>>>>>>>>> depend >>>>>>>>>>>>> >>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> watermark >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> concept >>>>>>>>> >>>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> based >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> increasing >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> sequence) >>>>>>>>>>> >>>>>>>>>>>> that >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> other operators can generically work with. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Note that even file input in many cases can >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> produce >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> time >>>>>>>>> >>>>>>>>>> based >>>>>>>>>>> >>>>>>>>>>>> watermarks, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> for example when you read part files that are >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> bound >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> by >>>>>>>>> >>>>>>>>> event >>>>>>>>>> >>>>>>>>>>> time. >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thomas >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com >>>>>>> >>>>>>>> >>>>>>>>>>>>>>>>> >>