I think we should have the support to allow running multiple batches through the same DAG.
Resetting the watermark to the initial watermark seems like a good idea. The windowed operator needs to understand the start/end batch control tuple and reset the watermark. ~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhup...@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Thu, May 11, 2017 at 7:37 PM, Thomas Weise <t...@apache.org> wrote: > Usually batches are processed by different instances of a topology. First > of all we should agree that running multiple batches through the same DAG > *in sequence* is something that we want to address. > > If yes, then there is the reset problem you are referring to, and it only > occurs when you want to do event time processing per batch, because here > you cannot repurpose the time component to segregate batches. > > 2 possible options that come to mind: > > - Reset the watermark to the initial watermark, which basically means that > instead of a "shutdown" tuple there is a "reset" tuple. > - Schedule separate operators/pipelines for each batch run. > > Thomas > > > On Tue, May 9, 2017 at 5:25 AM, AJAY GUPTA <ajaygit...@gmail.com> wrote: > > > After some discussion and trying out the approach discussed above, it > seems > > we would need to separate out the concepts of Watermarks and Batch > Control > > tuples. > > The windowed operator needs to be modified to understand batch control > > tuples. > > > > Even if we have watermark tuples which also include batch information, > > windowed operator will fail when the source data is event time based. > This > > is because in this scenario, there are two notions of time in the > > watermark: > > 1. Time used to denote the file / batch boundary > > 2. The event time in the data. > > > > For this reason, it makes sense to separate the concepts of batch tuples > > (start something / end something) from the watermark tuples (which > > essentially deal with event times). > > > > We could argue having a watermark tuple indicating end of the batch - a > > final watermark (with time = Long.MAX) which would finalize all windows > in > > the windowed operator. However, now, if a next batch needs to be > processed > > subsequently by the same windowed operator, we would need to reset the > > state of the operator as it has moved ahead in the event time domain. The > > batch control tuples can do this resetting of state (in other words, > > preparation for processing a new batch of data). > > > > As an example, consider telecom data logs for same 24 hrs of 2 regions (A > > and B) which are to be processed as a batch. After processing data > records > > from region A, a "final" watermark would be emitted indicating end of all > > data from region A. Now, unless we clear the windowed operator's state > > information (current watermark, data storage) from the windowed operator, > > the data records from region B will not be processed. In such scenario, > > receiving an end batch control tuple can indicate the operator to reset > its > > state. > > > > > > Ajay > > > > > > On Sun, Apr 30, 2017 at 4:32 AM, Vlad Rozov <v.ro...@datatorrent.com> > > wrote: > > > > > public static class Pojo implements Tuple > > > { > > > @Override > > > public Object getValue() > > > { > > > return this; > > > } > > > } > > > > > > @Override > > > public void populateDAG(DAG dag, Configuration conf) > > > { > > > CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); > > > WindowedOperatorImpl<Pojo, Pojo, Pojo> windowedOperator = > > > dag.addOperator("windowOperator", WindowedOperatorImpl.class); > > > dag.addStream("csvToWindowed", csvParser.out, new > > > InputPort[]{windowedOperator.input}); > > > } > > > > > > > > > Thank you, > > > > > > Vlad > > > > > > On 4/29/17 15:20, AJAY GUPTA wrote: > > > > > >> Even this will not work because the output port of CsvParser is of > type > > >> Object. Even though Customer extends Tuple<Object>, it will still fail > > to > > >> work since Tuple<Object> gets output as Object. > > >> > > >> *DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();* > > >> > > >> The input port type at windowed operator with InputT = Object : > > >> *DefaultInputPort<Tuple<Object>>* > > >> > > >> > > >> Ajay > > >> > > >> > > >> On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.ro...@datatorrent.com> > > >> wrote: > > >> > > >> Use Object in place of InputT in the WindowedOperatorImpl. Cast Object > > to > > >>> the actual type of InputT at runtime. Introducing an operator just to > > do > > >>> a > > >>> cast is not a good design decision, IMO. > > >>> > > >>> Thank you, > > >>> Vlad > > >>> > > >>> Отправлено с iPhone > > >>> > > >>> On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote: > > >>>> > > >>>> I am using WindowedOperatorImpl and it is declared as follows. > > >>>> > > >>>> WindowedOperatorImpl<InputT, AccumulationType, OutputType> > > >>>> > > >>> windowedOperator > > >>> > > >>>> = new WindowedOperatorImpl<>(); > > >>>> > > >>>> In my application scenario, the InputT is Customer POJO which is > > getting > > >>>> output as an Object by CsvParser. > > >>>> > > >>>> > > >>>> Ajay > > >>>> > > >>>> On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov < > v.ro...@datatorrent.com > > > > > >>>> wrote: > > >>>> > > >>>> How do you declare WindowedOperator? > > >>>>> > > >>>>> Thank you, > > >>>>> > > >>>>> Vlad > > >>>>> > > >>>>> > > >>>>> On 4/28/17 10:35, AJAY GUPTA wrote: > > >>>>>> > > >>>>>> Vlad, > > >>>>>> > > >>>>>> The approach you suggested doesn't work because the CSVParser > > outputs > > >>>>>> Object Data Type irrespective of the POJO class being emitted. > > >>>>>> > > >>>>>> > > >>>>>> Ajay > > >>>>>> > > >>>>>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov < > > v.ro...@datatorrent.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>> Make your POJO class implement WindowedOperator Tuple interface > (it > > >>>>>> may > > >>>>>> > > >>>>>>> return itself in getValue()). > > >>>>>>> > > >>>>>>> Thank you, > > >>>>>>> > > >>>>>>> Vlad > > >>>>>>> > > >>>>>>> On 4/28/17 02:44, AJAY GUPTA wrote: > > >>>>>>> > > >>>>>>> Hi All, > > >>>>>>> > > >>>>>>>> I am creating an application which is using Windowed Operator. > > This > > >>>>>>>> application involves CsvParser operator emitting a POJO object > > which > > >>>>>>>> > > >>>>>>> is > > >>> > > >>>> to > > >>>>>>>> be passed as input to WindowedOperator. The WindowedOperator > > >>>>>>>> > > >>>>>>> requires an > > >>> > > >>>> instance of Tuple class as input : > > >>>>>>>> *public final transient DefaultInputPort<Tuple<InputT>> > > >>>>>>>> input = new DefaultInputPort<Tuple<InputT>>() * > > >>>>>>>> > > >>>>>>>> Due to this, the addStream cannot work as the output of > > CsvParser's > > >>>>>>>> output > > >>>>>>>> port is not compatible with input port type of WindowedOperator. > > >>>>>>>> One way to solve this problem is to have an operator between the > > >>>>>>>> > > >>>>>>> above > > >>> > > >>>> two > > >>>>>>>> operators as a convertor. > > >>>>>>>> I would like to know if there is any other more generic approach > > to > > >>>>>>>> solve > > >>>>>>>> this problem without writing a new Operator for every new > > >>>>>>>> application > > >>>>>>>> using > > >>>>>>>> Windowed Operators. > > >>>>>>>> > > >>>>>>>> Thanks, > > >>>>>>>> Ajay > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda < > > >>>>>>>> bhup...@datatorrent.com> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>> Hi All, > > >>>>>>>> > > >>>>>>>> I think we have some agreement on the way we should use control > > >>>>>>>>> > > >>>>>>>> tuples > > >>> > > >>>> for > > >>>>>>>>> File I/O operators to support batch. > > >>>>>>>>> > > >>>>>>>>> In order to have more operators in Malhar, support this > > paradigm, I > > >>>>>>>>> think > > >>>>>>>>> we should also look at store operators - JDBC, Cassandra, HBase > > >>>>>>>>> etc. > > >>>>>>>>> The case with these operators is simpler as most of these do > not > > >>>>>>>>> > > >>>>>>>> poll > > >>> > > >>>> the > > >>>>>>>>> sources (except JDBC poller operator) and just stop once they > > have > > >>>>>>>>> read a > > >>>>>>>>> fixed amount of data. In other words, these are inherently > batch > > >>>>>>>>> sources. > > >>>>>>>>> The only change that we should add to these operators is to > shut > > >>>>>>>>> > > >>>>>>>> down > > >>> > > >>>> the > > >>>>>>>>> DAG once the reading of data is done. For a windowed operator > > this > > >>>>>>>>> would > > >>>>>>>>> mean a Global window with a final watermark before the DAG is > > shut > > >>>>>>>>> down. > > >>>>>>>>> > > >>>>>>>>> ~ Bhupesh > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> _______________________________________________________ > > >>>>>>>>> > > >>>>>>>>> Bhupesh Chawda > > >>>>>>>>> > > >>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>> > > >>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda < > > >>>>>>>>> bhup...@datatorrent.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Hi Thomas, > > >>>>>>>>> > > >>>>>>>>> Even though the windowing operator is not just "event time", it > > >>>>>>>>>> > > >>>>>>>>> seems > > >>> > > >>>> it > > >>>>>>>>>> is too much dependent on the "time" attribute of the incoming > > >>>>>>>>>> > > >>>>>>>>> tuple. > > >>> > > >>>> This > > >>>>>>>>>> is the reason we had to model the file index as a timestamp to > > >>>>>>>>>> > > >>>>>>>>> solve > > >>> > > >>>> the > > >>>>>>>>>> batch case for files. > > >>>>>>>>>> Perhaps we should work on increasing the scope of the windowed > > >>>>>>>>>> operator > > >>>>>>>>>> > > >>>>>>>>>> to > > >>>>>>>>>> > > >>>>>>>>> consider other types of windows as well. The Sequence option > > >>>>>>>>> > > >>>>>>>> suggested > > >>> > > >>>> by > > >>>>>>>>>> David seems to be something in that direction. > > >>>>>>>>>> > > >>>>>>>>>> ~ Bhupesh > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> _______________________________________________________ > > >>>>>>>>>> > > >>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>> > > >>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>> > > >>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise < > t...@apache.org> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>> That's correct, we are looking at a generalized approach for > > state > > >>>>>>>>>> > > >>>>>>>>>> management vs. a series of special cases. > > >>>>>>>>>>> > > >>>>>>>>>>> And to be clear, windowing does not imply event time, > otherwise > > >>>>>>>>>>> it > > >>>>>>>>>>> would > > >>>>>>>>>>> be > > >>>>>>>>>>> "EventTimeOperator" :-) > > >>>>>>>>>>> > > >>>>>>>>>>> Thomas > > >>>>>>>>>>> > > >>>>>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda < > > >>>>>>>>>>> > > >>>>>>>>>>> bhup...@datatorrent.com> > > >>>>>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>> Hi David, > > >>>>>>>>>>> > > >>>>>>>>>>> I went through the discussion, but it seems like it is more > on > > >>>>>>>>>>>> > > >>>>>>>>>>> the > > >>> > > >>>> event > > >>>>>>>>>>>> > > >>>>>>>>>>> time watermark handling as opposed to batches. What we are > > trying > > >>>>>>>>>> > > >>>>>>>>> to > > >>> > > >>>> do > > >>>>>>>>>>> > > >>>>>>>>>>> is > > >>>>>>>>>> > > >>>>>>>>>> have watermarks serve the purpose of demarcating batches using > > >>>>>>>>>>> > > >>>>>>>>>>>> control > > >>>>>>>>>>>> tuples. Since each batch is separate from others, we would > > like > > >>>>>>>>>>>> > > >>>>>>>>>>> to > > >>> > > >>>> have > > >>>>>>>>>>>> > > >>>>>>>>>>> stateful processing within a batch, but not across batches. > > >>>>>>>>>> > > >>>>>>>>>> At the same time, we would like to do this in a manner which > is > > >>>>>>>>>>> > > >>>>>>>>>>>> consistent > > >>>>>>>>>>>> > > >>>>>>>>>>> with the windowing mechanism provided by the windowed > operator. > > >>>>>>>>>>> > > >>>>>>>>>> This > > >>> > > >>>> will > > >>>>>>>>>>>> > > >>>>>>>>>>> allow us to treat a single batch as a (bounded) stream and > > apply > > >>>>>>>>>>> > > >>>>>>>>>> all > > >>> > > >>>> the > > >>>>>>>>>>>> > > >>>>>>>>>>> event time windowing concepts in that time span. > > >>>>>>>>>> > > >>>>>>>>>> For example, let's say I need to process data for a day (24 > > >>>>>>>>>>> > > >>>>>>>>>> hours) as > > >>> > > >>>> a > > >>>>>>>>>>>> > > >>>>>>>>>>> single batch. The application is still streaming in nature: > it > > >>>>>>>>>> > > >>>>>>>>> would > > >>> > > >>>> end > > >>>>>>>>>>> > > >>>>>>>>>>> the batch after a day and start a new batch the next day. At > > the > > >>>>>>>>>> > > >>>>>>>>> same > > >>> > > >>>> time, > > >>>>>>>>>>> > > >>>>>>>>>>> I would be able to have early trigger firings every minute as > > >>>>>>>>>>> > > >>>>>>>>>> well as > > >>> > > >>>> drop > > >>>>>>>>>>>> > > >>>>>>>>>>> any data which is say, 5 mins late. All this within a single > > day. > > >>>>>>>>>>> > > >>>>>>>>>>>> ~ Bhupesh > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> _______________________________________________________ > > >>>>>>>>>>>> > > >>>>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>>>> > > >>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>>>> > > >>>>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan < > > david...@gmail.com> > > >>>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>> There is a discussion in the Flink mailing list about > key-based > > >>>>>>>>>> > > >>>>>>>>>> watermarks. > > >>>>>>>>>>> > > >>>>>>>>>>>> I think it's relevant to our use case here. > > >>>>>>>>>>>> > > >>>>>>>>>>>>> https://lists.apache.org/threa > d.html/2b90d5b1d5e2654212cfbbc > > >>>>>>>>>>>>> c6510ef > > >>>>>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> David > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> bhup...@datatorrent.com > > >>>>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hi David, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If using time window does not seem appropriate, we can have > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> another > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> class > > >>>>>>>>>>>> > > >>>>>>>>>>> which is more suited for such sequential and distinct > windows. > > >>>>>>>>>>> > > >>>>>>>>>>>> Perhaps, a > > >>>>>>>>>>>>> CustomWindow option can be introduced which takes in a > window > > >>>>>>>>>>>>> > > >>>>>>>>>>>> id. > > >>> > > >>>> The > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> purpose of this window option could be to translate the > > window > > >>>>>>>>>>>> id > > >>>>>>>>>>>> > > >>>>>>>>>>>> into > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> appropriate timestamps. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Another option would be to go with a custom > timestampExtractor > > >>>>>>>>>>>>> > > >>>>>>>>>>>> for > > >>> > > >>>> such > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> tuples which translates the each unique file name to a > > distinct > > >>>>>>>>>>>> > > >>>>>>>>>>>> timestamp > > >>>>>>>>>>>>> while using time windows in the windowed operator. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> ~ Bhupesh > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> _______________________________________________________ > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan < > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> david...@gmail.com> > > >>> > > >>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> I now see your rationale on putting the filename in the > > window. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> As far as I understand, the reasons why the filename is not > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> part > > >>> > > >>>> of > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>> > > >>>>>>>>>>>> key > > >>>>>>>>>>>> > > >>>>>>>>>>>>> and the Global Window is not used are: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1) The files are processed in sequence, not in parallel > > >>>>>>>>>>>>>>> 2) The windowed operator should not keep the state > > associated > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>> > > >>>>>>>>>>>> file > > >>>>>>>>>>> > > >>>>>>>>>>>> when the processing of the file is done > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 3) The trigger should be fired for the file when a file > is > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> done > > >>> > > >>>> processing. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> However, if the file is just a sequence has nothing to do > > with > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> a > > >>> > > >>>> timestamp, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing > to > > >>>>>>>>>>>>>> do > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> would > > >>>>>>>>>>>>> just create confusions to the users, especially when it's > > used > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> as > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> an > > >>>>>>>>>>>>> > > >>>>>>>>>>>> example for new users. > > >>>>>>>>>>> > > >>>>>>>>>>>> How about having a separate class called SequenceWindow? And > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> perhaps > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> TimeWindow can inherit from it? > > >>>>>>>>>>>>> David > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise < > > t...@apache.org > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> bhup...@datatorrent.com > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I think my comments related to count based windows might > be > > >>>>>>>>>>>>>>>> causing > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> confusion. Let's not discuss count based scenarios for > > now. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Just want to make sure we are on the same page wrt. the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> "each > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> is a > > >>>>>>>>>>> > > >>>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> file > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> has the same timestamp (which is just a sequence number) > and > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> helps > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> keep tuples from each file in a separate window. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could > be a > > >>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> stamp > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> also, depending on the file naming convention. And if it > was > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> event > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> processing, the watermark would be derived from records > > within > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> file. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> Agreed, the source should have a mechanism to control the > > time > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> stamp > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> extraction along with everything else pertaining to the > > >>>>>>>>>>>>>> watermark > > >>>>>>>>>>>>>> generation. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> We could also implement a "timestampExtractor" interface > to > > >>>>>>>>>>>>>>>> identify > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> timestamp (sequence number) for a file. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> ~ Bhupesh > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ______________________________ > _________________________ > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise < > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> t...@apache.org > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I don't think this is a use case for count based window. > > >>>>>>>>>>> > > >>>>>>>>>>>> We have multiple files that are retrieved in a sequence > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>> > > >>>>>>>>>>>> no > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> knowledge of the number of records per file. The > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> requirement is > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> aggregate each file separately and emit the aggregate > when > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> read > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> fully. There is no concept of "end of something" for an > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> individual > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> key > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> global window isn't applicable. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> However, as already explained and implemented by > > Bhupesh, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> be > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> solved using watermark and window (in this case the window > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> timestamp > > >>>>>>>>>>>>>>>> isn't > > >>>>>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't > matter. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thomas > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan < > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> david...@gmail.com > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I don't think this is the way to go. Global Window only > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> means > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> timestamp > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> does not matter (or that there is no timestamp). It does > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> necessarily > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> mean it's a large batch. Unless there is some notion of > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> event > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> each > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> file, you don't want to embed the file into the window > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> itself. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> If you want the result broken up by file name, and if > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> files > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> are > > >>>>>>>>>>> > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> processed in parallel, I think making the file name be > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> part > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>> > > >>>>>>>>>>>>> key > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the way to go. I think it's very confusing if we > somehow > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> be part of the window. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> you're > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> welcome > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> add > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> that feature. In case of count-based windows, there > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> no > > >>>>>>>>>>> > > >>>>>>>>>>>> notion > > >>>>>>>>>>>> > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> time and you probably only trigger at the end of each > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> window. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> case > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> of count-based windows, the watermark only matters for > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> batch > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> since > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> need > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 10, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> number > > >>>>>>>>>>>> > > >>>>>>>>>>>>> of > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> end > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> last > > >>>>>>>>>>>> > > >>>>>>>>>>>>> window > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> with 5 tuples). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> David > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi David, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks for your comments. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The wordcount example that I created based on the > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> windowed > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> operator > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> does > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> processing of word counts per file (each file as a > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> separate > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> batch), > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> i.e. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> process counts for each file and dump into separate > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> files. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> As I understand Global window is for one large batch; > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> i.e. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> all > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> incoming > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> data falls into the same batch. This could not be > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> processed > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> using > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> GlobalWindow option as we need more than one windows. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> In > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> case, I > > >>>>>>>>>>> > > >>>>>>>>>>>> configured the windowed operator to have time windows > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>> 1ms > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> each > > >>>>>>>>>>> > > >>>>>>>>>>>> and > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> passed data for each file with increasing timestamps: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> (file1, > > >>>>>>>>>>>>>>>>>> 1), > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> (file2, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2) and so on. Is there a better way of handling this > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> scenario? > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> trigger > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> option > > >>>>>>>>>>> > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> process count based windows. In case I want to process > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> every > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 1000 > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> tuples > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> a batch, I could set the Trigger option to > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> CountTrigger > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>> > > >>>>>>>>>>>> accumulation set to Discarding. Is this correct? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Global > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> window. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> ~ Bhupesh > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> ______________________________ > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> _________________________ > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>>> > > >>>>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan < > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> david...@gmail.com> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I'm worried that we are making the watermark concept > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> too > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> complicated. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Watermarks should simply just tell you what windows > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> considered > > >>>>>>>>>>>> > > >>>>>>>>>>>>> complete. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Point 2 is basically a count-based window. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Watermarks > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> do > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> not > > >>>>>>>>>>> > > >>>>>>>>>>>> play a > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> role > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> here because the window is always complete at the > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> n-th > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> tuple. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> If I understand correctly, point 3 is for batch > > >>>>>>>>>>> > > >>>>>>>>>>>> processing > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> files. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Unless > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> achieved > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> watermark > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> +infinity > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> timestamp can be used so that triggers will be fired > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> upon > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> receipt > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> watermark. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> achieved > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> with a > > >>>>>>>>>>> > > >>>>>>>>>>>> watermark with a +infinity timestamp. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> David > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi Thomas, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> For an input operator which is supposed to > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> generate > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> watermarks > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> for > > >>>>>>>>>>> > > >>>>>>>>>>>> downstream operators, I can think about the > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> following > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> watermarks > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> operator can emit: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> watermark) > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> tuples) > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> 4. Final watermark > > >>>>>>>>>>> > > >>>>>>>>>>>> File based watermarks seem to be applicable for > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> batch > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> (file > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> based) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> well, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> and hence I thought of looking at these first. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Does > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> seem > > >>>>>>>>>>> > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> line > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> with the thought process? > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> ~ Bhupesh > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> ______________________________ > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> _________________________ > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Bhupesh Chawda > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Software Engineer > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> www.datatorrent.com | apex.apache.org > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> t...@apache.org > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I don't think this should be designed based on a > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> simplistic > > >>>>>>>>>>>>>>>>>>>>> file > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> input-output scenario. It would be good to > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> include a > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> stateful > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> transformation based on event time. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> More complex pipelines contain stateful > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> transformations > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> depend > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> on > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> watermark > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> concept > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> based > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> increasing > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> sequence) > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> other operators can generically work with. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Note that even file input in many cases can > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> produce > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> time > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> based > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> watermarks, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> for example when you read part files that are > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> bound > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> event > > >>>>>>>>>>>> > > >>>>>>>>>>>>> time. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thomas > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> < > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >