Created a JIRA to track this: https://issues.apache.org/jira/browse/APEXMALHAR-2449
~ Bhupesh _______________________________________________________ Bhupesh Chawda E: bhup...@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <bhup...@datatorrent.com> wrote: > Hi Thomas, > > Even though the windowing operator is not just "event time", it seems it > is too much dependent on the "time" attribute of the incoming tuple. This > is the reason we had to model the file index as a timestamp to solve the > batch case for files. > Perhaps we should work on increasing the scope of the windowed operator to > consider other types of windows as well. The Sequence option suggested by > David seems to be something in that direction. > > ~ Bhupesh > > > _______________________________________________________ > > Bhupesh Chawda > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > www.datatorrent.com | apex.apache.org > > > > On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> wrote: > >> That's correct, we are looking at a generalized approach for state >> management vs. a series of special cases. >> >> And to be clear, windowing does not imply event time, otherwise it would >> be >> "EventTimeOperator" :-) >> >> Thomas >> >> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <bhup...@datatorrent.com> >> wrote: >> >> > Hi David, >> > >> > I went through the discussion, but it seems like it is more on the event >> > time watermark handling as opposed to batches. What we are trying to do >> is >> > have watermarks serve the purpose of demarcating batches using control >> > tuples. Since each batch is separate from others, we would like to have >> > stateful processing within a batch, but not across batches. >> > At the same time, we would like to do this in a manner which is >> consistent >> > with the windowing mechanism provided by the windowed operator. This >> will >> > allow us to treat a single batch as a (bounded) stream and apply all the >> > event time windowing concepts in that time span. >> > >> > For example, let's say I need to process data for a day (24 hours) as a >> > single batch. The application is still streaming in nature: it would end >> > the batch after a day and start a new batch the next day. At the same >> time, >> > I would be able to have early trigger firings every minute as well as >> drop >> > any data which is say, 5 mins late. All this within a single day. >> > >> > ~ Bhupesh >> > >> > >> > >> > _______________________________________________________ >> > >> > Bhupesh Chawda >> > >> > E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> > >> > www.datatorrent.com | apex.apache.org >> > >> > >> > >> > On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com> wrote: >> > >> > > There is a discussion in the Flink mailing list about key-based >> > watermarks. >> > > I think it's relevant to our use case here. >> > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef >> > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E >> > > >> > > David >> > > >> > > On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda < >> bhup...@datatorrent.com >> > > >> > > wrote: >> > > >> > > > Hi David, >> > > > >> > > > If using time window does not seem appropriate, we can have another >> > class >> > > > which is more suited for such sequential and distinct windows. >> > Perhaps, a >> > > > CustomWindow option can be introduced which takes in a window id. >> The >> > > > purpose of this window option could be to translate the window id >> into >> > > > appropriate timestamps. >> > > > >> > > > Another option would be to go with a custom timestampExtractor for >> such >> > > > tuples which translates the each unique file name to a distinct >> > timestamp >> > > > while using time windows in the windowed operator. >> > > > >> > > > ~ Bhupesh >> > > > >> > > > >> > > > _______________________________________________________ >> > > > >> > > > Bhupesh Chawda >> > > > >> > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> > > > >> > > > www.datatorrent.com | apex.apache.org >> > > > >> > > > >> > > > >> > > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com> >> > wrote: >> > > > >> > > > > I now see your rationale on putting the filename in the window. >> > > > > As far as I understand, the reasons why the filename is not part >> of >> > the >> > > > key >> > > > > and the Global Window is not used are: >> > > > > >> > > > > 1) The files are processed in sequence, not in parallel >> > > > > 2) The windowed operator should not keep the state associated with >> > the >> > > > file >> > > > > when the processing of the file is done >> > > > > 3) The trigger should be fired for the file when a file is done >> > > > processing. >> > > > > >> > > > > However, if the file is just a sequence has nothing to do with a >> > > > timestamp, >> > > > > assigning a timestamp to a file is not an intuitive thing to do >> and >> > > would >> > > > > just create confusions to the users, especially when it's used as >> an >> > > > > example for new users. >> > > > > >> > > > > How about having a separate class called SequenceWindow? And >> perhaps >> > > > > TimeWindow can inherit from it? >> > > > > >> > > > > David >> > > > > >> > > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org> >> > wrote: >> > > > > >> > > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < >> > > > bhup...@datatorrent.com >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > I think my comments related to count based windows might be >> > causing >> > > > > > > confusion. Let's not discuss count based scenarios for now. >> > > > > > > >> > > > > > > Just want to make sure we are on the same page wrt. the "each >> > file >> > > > is a >> > > > > > > batch" use case. As mentioned by Thomas, the each tuple from >> the >> > > same >> > > > > > file >> > > > > > > has the same timestamp (which is just a sequence number) and >> that >> > > > helps >> > > > > > > keep tuples from each file in a separate window. >> > > > > > > >> > > > > > >> > > > > > Yes, in this case it is a sequence number, but it could be a >> time >> > > stamp >> > > > > > also, depending on the file naming convention. And if it was >> event >> > > time >> > > > > > processing, the watermark would be derived from records within >> the >> > > > file. >> > > > > > >> > > > > > Agreed, the source should have a mechanism to control the time >> > stamp >> > > > > > extraction along with everything else pertaining to the >> watermark >> > > > > > generation. >> > > > > > >> > > > > > >> > > > > > > We could also implement a "timestampExtractor" interface to >> > > identify >> > > > > the >> > > > > > > timestamp (sequence number) for a file. >> > > > > > > >> > > > > > > ~ Bhupesh >> > > > > > > >> > > > > > > >> > > > > > > _______________________________________________________ >> > > > > > > >> > > > > > > Bhupesh Chawda >> > > > > > > >> > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > >> > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <t...@apache.org >> > >> > > > wrote: >> > > > > > > >> > > > > > > > I don't think this is a use case for count based window. >> > > > > > > > >> > > > > > > > We have multiple files that are retrieved in a sequence and >> > there >> > > > is >> > > > > no >> > > > > > > > knowledge of the number of records per file. The >> requirement is >> > > to >> > > > > > > > aggregate each file separately and emit the aggregate when >> the >> > > file >> > > > > is >> > > > > > > read >> > > > > > > > fully. There is no concept of "end of something" for an >> > > individual >> > > > > key >> > > > > > > and >> > > > > > > > global window isn't applicable. >> > > > > > > > >> > > > > > > > However, as already explained and implemented by Bhupesh, >> this >> > > can >> > > > be >> > > > > > > > solved using watermark and window (in this case the window >> > > > timestamp >> > > > > > > isn't >> > > > > > > > a timestamp, but a file sequence, but that doesn't matter. >> > > > > > > > >> > > > > > > > Thomas >> > > > > > > > >> > > > > > > > >> > > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan < >> david...@gmail.com >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > I don't think this is the way to go. Global Window only >> means >> > > the >> > > > > > > > timestamp >> > > > > > > > > does not matter (or that there is no timestamp). It does >> not >> > > > > > > necessarily >> > > > > > > > > mean it's a large batch. Unless there is some notion of >> event >> > > > time >> > > > > > for >> > > > > > > > each >> > > > > > > > > file, you don't want to embed the file into the window >> > itself. >> > > > > > > > > >> > > > > > > > > If you want the result broken up by file name, and if the >> > files >> > > > are >> > > > > > to >> > > > > > > be >> > > > > > > > > processed in parallel, I think making the file name be >> part >> > of >> > > > the >> > > > > > key >> > > > > > > is >> > > > > > > > > the way to go. I think it's very confusing if we somehow >> make >> > > the >> > > > > > file >> > > > > > > to >> > > > > > > > > be part of the window. >> > > > > > > > > >> > > > > > > > > For count-based window, it's not implemented yet and >> you're >> > > > welcome >> > > > > > to >> > > > > > > > add >> > > > > > > > > that feature. In case of count-based windows, there would >> be >> > no >> > > > > > notion >> > > > > > > of >> > > > > > > > > time and you probably only trigger at the end of each >> window. >> > > In >> > > > > the >> > > > > > > case >> > > > > > > > > of count-based windows, the watermark only matters for >> batch >> > > > since >> > > > > > you >> > > > > > > > need >> > > > > > > > > a way to know when the batch has ended (if the count is >> 10, >> > the >> > > > > > number >> > > > > > > of >> > > > > > > > > tuples in the batch is let's say 105, you need a way to >> end >> > the >> > > > > last >> > > > > > > > window >> > > > > > > > > with 5 tuples). >> > > > > > > > > >> > > > > > > > > David >> > > > > > > > > >> > > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < >> > > > > > > bhup...@datatorrent.com >> > > > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi David, >> > > > > > > > > > >> > > > > > > > > > Thanks for your comments. >> > > > > > > > > > >> > > > > > > > > > The wordcount example that I created based on the >> windowed >> > > > > operator >> > > > > > > > does >> > > > > > > > > > processing of word counts per file (each file as a >> separate >> > > > > batch), >> > > > > > > > i.e. >> > > > > > > > > > process counts for each file and dump into separate >> files. >> > > > > > > > > > As I understand Global window is for one large batch; >> i.e. >> > > all >> > > > > > > incoming >> > > > > > > > > > data falls into the same batch. This could not be >> processed >> > > > using >> > > > > > > > > > GlobalWindow option as we need more than one windows. In >> > this >> > > > > > case, I >> > > > > > > > > > configured the windowed operator to have time windows of >> > 1ms >> > > > each >> > > > > > and >> > > > > > > > > > passed data for each file with increasing timestamps: >> > (file1, >> > > > 1), >> > > > > > > > (file2, >> > > > > > > > > > 2) and so on. Is there a better way of handling this >> > > scenario? >> > > > > > > > > > >> > > > > > > > > > Regarding (2 - count based windows), I think there is a >> > > trigger >> > > > > > > option >> > > > > > > > to >> > > > > > > > > > process count based windows. In case I want to process >> > every >> > > > 1000 >> > > > > > > > tuples >> > > > > > > > > as >> > > > > > > > > > a batch, I could set the Trigger option to CountTrigger >> > with >> > > > the >> > > > > > > > > > accumulation set to Discarding. Is this correct? >> > > > > > > > > > >> > > > > > > > > > I agree that (4. Final Watermark) can be done using >> Global >> > > > > window. >> > > > > > > > > > >> > > > > > > > > > ~ Bhupesh >> > > > > > > > > > >> > > > > > > > > > _______________________________________________________ >> > > > > > > > > > >> > > > > > > > > > Bhupesh Chawda >> > > > > > > > > > >> > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > > > > >> > > > > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan < >> > > > david...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > I'm worried that we are making the watermark concept >> too >> > > > > > > complicated. >> > > > > > > > > > > >> > > > > > > > > > > Watermarks should simply just tell you what windows >> can >> > be >> > > > > > > considered >> > > > > > > > > > > complete. >> > > > > > > > > > > >> > > > > > > > > > > Point 2 is basically a count-based window. Watermarks >> do >> > > not >> > > > > > play a >> > > > > > > > > role >> > > > > > > > > > > here because the window is always complete at the n-th >> > > tuple. >> > > > > > > > > > > >> > > > > > > > > > > If I understand correctly, point 3 is for batch >> > processing >> > > of >> > > > > > > files. >> > > > > > > > > > Unless >> > > > > > > > > > > the files contain timed events, it sounds to be that >> this >> > > can >> > > > > be >> > > > > > > > > achieved >> > > > > > > > > > > with just a Global Window. For signaling EOF, a >> watermark >> > > > with >> > > > > a >> > > > > > > > > > +infinity >> > > > > > > > > > > timestamp can be used so that triggers will be fired >> upon >> > > > > receipt >> > > > > > > of >> > > > > > > > > that >> > > > > > > > > > > watermark. >> > > > > > > > > > > >> > > > > > > > > > > For point 4, just like what I mentioned above, can be >> > > > achieved >> > > > > > > with a >> > > > > > > > > > > watermark with a +infinity timestamp. >> > > > > > > > > > > >> > > > > > > > > > > David >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < >> > > > > > > > > bhup...@datatorrent.com >> > > > > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi Thomas, >> > > > > > > > > > > > >> > > > > > > > > > > > For an input operator which is supposed to generate >> > > > > watermarks >> > > > > > > for >> > > > > > > > > > > > downstream operators, I can think about the >> following >> > > > > > watermarks >> > > > > > > > that >> > > > > > > > > > the >> > > > > > > > > > > > operator can emit: >> > > > > > > > > > > > 1. Time based watermarks (the high watermark / low >> > > > watermark) >> > > > > > > > > > > > 2. Number of tuple based watermarks (Every n tuples) >> > > > > > > > > > > > 3. File based watermarks (Start file, end file) >> > > > > > > > > > > > 4. Final watermark >> > > > > > > > > > > > >> > > > > > > > > > > > File based watermarks seem to be applicable for >> batch >> > > (file >> > > > > > > based) >> > > > > > > > as >> > > > > > > > > > > well, >> > > > > > > > > > > > and hence I thought of looking at these first. Does >> > this >> > > > seem >> > > > > > to >> > > > > > > be >> > > > > > > > > in >> > > > > > > > > > > line >> > > > > > > > > > > > with the thought process? >> > > > > > > > > > > > >> > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > ______________________________ >> > _________________________ >> > > > > > > > > > > > >> > > > > > > > > > > > Bhupesh Chawda >> > > > > > > > > > > > >> > > > > > > > > > > > Software Engineer >> > > > > > > > > > > > >> > > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> > > > > > > > > > > > >> > > > > > > > > > > > www.datatorrent.com | apex.apache.org >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < >> > > > > t...@apache.org >> > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > I don't think this should be designed based on a >> > > > simplistic >> > > > > > > file >> > > > > > > > > > > > > input-output scenario. It would be good to >> include a >> > > > > stateful >> > > > > > > > > > > > > transformation based on event time. >> > > > > > > > > > > > > >> > > > > > > > > > > > > More complex pipelines contain stateful >> > transformations >> > > > > that >> > > > > > > > depend >> > > > > > > > > > on >> > > > > > > > > > > > > windowing and watermarks. I think we need a >> watermark >> > > > > concept >> > > > > > > > that >> > > > > > > > > is >> > > > > > > > > > > > based >> > > > > > > > > > > > > on progress in event time (or other monotonic >> > > increasing >> > > > > > > > sequence) >> > > > > > > > > > that >> > > > > > > > > > > > > other operators can generically work with. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Note that even file input in many cases can >> produce >> > > time >> > > > > > based >> > > > > > > > > > > > watermarks, >> > > > > > > > > > > > > for example when you read part files that are >> bound >> > by >> > > > > event >> > > > > > > > time. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Thomas >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda < >> > > > > > > > > > > bhup...@datatorrent.com >> > > > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > For better understanding the use case for >> control >> > > > tuples >> > > > > in >> > > > > > > > > batch, >> > > > > > > > > > I >> > > > > > > > > > > > am >> > > > > > > > > > > > > > creating a prototype for a batch application >> using >> > > File >> > > > > > Input >> > > > > > > > and >> > > > > > > > > > > File >> > > > > > > > > > > > > > Output operators. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > To enable basic batch processing for File IO >> > > > operators, I >> > > > > > am >> > > > > > > > > > > proposing >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > following changes to File input and output >> > operators: >> > > > > > > > > > > > > > 1. File Input operator emits a watermark each >> time >> > it >> > > > > opens >> > > > > > > and >> > > > > > > > > > > closes >> > > > > > > > > > > > a >> > > > > > > > > > > > > > file. These can be "start file" and "end file" >> > > > watermarks >> > > > > > > which >> > > > > > > > > > > include >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > corresponding file names. The "start file" tuple >> > > should >> > > > > be >> > > > > > > sent >> > > > > > > > > > > before >> > > > > > > > > > > > > any >> > > > > > > > > > > > > > of the data from that file flows. >> > > > > > > > > > > > > > 2. File Input operator can be configured to end >> the >> > > > > > > application >> > > > > > > > > > > after a >> > > > > > > > > > > > > > single or n scans of the directory (a batch). >> This >> > is >> > > > > where >> > > > > > > the >> > > > > > > > > > > > operator >> > > > > > > > > > > > > > emits the final watermark (the end of >> application >> > > > control >> > > > > > > > tuple). >> > > > > > > > > > > This >> > > > > > > > > > > > > will >> > > > > > > > > > > > > > also shutdown the application. >> > > > > > > > > > > > > > 3. The File output operator handles these >> control >> > > > tuples. >> > > > > > > > "Start >> > > > > > > > > > > file" >> > > > > > > > > > > > > > initializes the file name for the incoming >> tuples. >> > > "End >> > > > > > file" >> > > > > > > > > > > watermark >> > > > > > > > > > > > > > forces a finalize on that file. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The user would be able to enable the operators >> to >> > > send >> > > > > only >> > > > > > > > those >> > > > > > > > > > > > > > watermarks that are needed in the application. >> If >> > > none >> > > > of >> > > > > > the >> > > > > > > > > > options >> > > > > > > > > > > > are >> > > > > > > > > > > > > > configured, the operators behave as in a >> streaming >> > > > > > > application. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > There are a few challenges in the implementation >> > > where >> > > > > the >> > > > > > > > input >> > > > > > > > > > > > operator >> > > > > > > > > > > > > > is partitioned. In this case, the correlation >> > between >> > > > the >> > > > > > > > > start/end >> > > > > > > > > > > > for a >> > > > > > > > > > > > > > file and the data tuples for that file is lost. >> > Hence >> > > > we >> > > > > > need >> > > > > > > > to >> > > > > > > > > > > > maintain >> > > > > > > > > > > > > > the filename as part of each tuple in the >> pipeline. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The "start file" and "end file" control tuples >> in >> > > this >> > > > > > > example >> > > > > > > > > are >> > > > > > > > > > > > > > temporary names for watermarks. We can have >> generic >> > > > > "start >> > > > > > > > > batch" / >> > > > > > > > > > > > "end >> > > > > > > > > > > > > > batch" tuples which could be used for other use >> > cases >> > > > as >> > > > > > > well. >> > > > > > > > > The >> > > > > > > > > > > > Final >> > > > > > > > > > > > > > watermark is common and serves the same purpose >> in >> > > each >> > > > > > case. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Please let me know your thoughts on this. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh >> Chawda < >> > > > > > > > > > > > > bhup...@datatorrent.com> >> > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, this can be part of operator >> configuration. >> > > > Given >> > > > > > > this, >> > > > > > > > > for >> > > > > > > > > > a >> > > > > > > > > > > > user >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > define a batch application, would mean >> > configuring >> > > > the >> > > > > > > > > connectors >> > > > > > > > > > > > > (mostly >> > > > > > > > > > > > > > > the input operator) in the application for the >> > > > desired >> > > > > > > > > behavior. >> > > > > > > > > > > > > > Similarly, >> > > > > > > > > > > > > > > there can be other use cases that can be >> achieved >> > > > other >> > > > > > > than >> > > > > > > > > > batch. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > We may also need to take care of the >> following: >> > > > > > > > > > > > > > > 1. Make sure that the watermarks or control >> > tuples >> > > > are >> > > > > > > > > consistent >> > > > > > > > > > > > > across >> > > > > > > > > > > > > > > sources. Meaning an HDFS sink should be able >> to >> > > > > interpret >> > > > > > > the >> > > > > > > > > > > > watermark >> > > > > > > > > > > > > > > tuple sent out by, say, a JDBC source. >> > > > > > > > > > > > > > > 2. In addition to I/O connectors, we should >> also >> > > look >> > > > > at >> > > > > > > the >> > > > > > > > > need >> > > > > > > > > > > for >> > > > > > > > > > > > > > > processing operators to understand some of the >> > > > control >> > > > > > > > tuples / >> > > > > > > > > > > > > > watermarks. >> > > > > > > > > > > > > > > For example, we may want to reset the operator >> > > > behavior >> > > > > > on >> > > > > > > > > > arrival >> > > > > > > > > > > of >> > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > watermark tuple. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > ~ Bhupesh >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise >> < >> > > > > > > > t...@apache.org> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> The HDFS source can operate in two modes, >> > bounded >> > > or >> > > > > > > > > unbounded. >> > > > > > > > > > If >> > > > > > > > > > > > you >> > > > > > > > > > > > > > >> scan >> > > > > > > > > > > > > > >> only once, then it should emit the final >> > watermark >> > > > > after >> > > > > > > it >> > > > > > > > is >> > > > > > > > > > > done. >> > > > > > > > > > > > > > >> Otherwise it would emit watermarks based on a >> > > policy >> > > > > > > (files >> > > > > > > > > > names >> > > > > > > > > > > > > etc.). >> > > > > > > > > > > > > > >> The mechanism to generate the marks may >> depend >> > on >> > > > the >> > > > > > type >> > > > > > > > of >> > > > > > > > > > > source >> > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> the user needs to be able to >> influence/configure >> > > it. >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> Thomas >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh >> Chawda >> > < >> > > > > > > > > > > > > > bhup...@datatorrent.com> >> > > > > > > > > > > > > > >> wrote: >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> > Hi Thomas, >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > I am not sure that I completely understand >> > your >> > > > > > > > suggestion. >> > > > > > > > > > Are >> > > > > > > > > > > > you >> > > > > > > > > > > > > > >> > suggesting to broaden the scope of the >> > proposal >> > > to >> > > > > > treat >> > > > > > > > all >> > > > > > > > > > > > sources >> > > > > > > > > > > > > > as >> > > > > > > > > > > > > > >> > bounded as well as unbounded? >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > In case of Apex, we treat all sources as >> > > unbounded >> > > > > > > > sources. >> > > > > > > > > > Even >> > > > > > > > > > > > > > bounded >> > > > > > > > > > > > > > >> > sources like HDFS file source is treated as >> > > > > unbounded >> > > > > > by >> > > > > > > > > means >> > > > > > > > > > > of >> > > > > > > > > > > > > > >> scanning >> > > > > > > > > > > > > > >> > the input directory repeatedly. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > Let's consider HDFS file source for >> example: >> > > > > > > > > > > > > > >> > In this case, if we treat it as a bounded >> > > source, >> > > > we >> > > > > > can >> > > > > > > > > > define >> > > > > > > > > > > > > hooks >> > > > > > > > > > > > > > >> which >> > > > > > > > > > > > > > >> > allows us to detect the end of the file and >> > send >> > > > the >> > > > > > > > "final >> > > > > > > > > > > > > > watermark". >> > > > > > > > > > > > > > >> We >> > > > > > > > > > > > > > >> > could also consider HDFS file source as a >> > > > streaming >> > > > > > > source >> > > > > > > > > and >> > > > > > > > > > > > > define >> > > > > > > > > > > > > > >> hooks >> > > > > > > > > > > > > > >> > which send watermarks based on different >> kinds >> > > of >> > > > > > > windows. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > Please correct me if I misunderstand. >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > ~ Bhupesh >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas >> Weise >> > < >> > > > > > > > > t...@apache.org >> > > > > > > > > > > >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > Bhupesh, >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > Please see how that can be solved in a >> > unified >> > > > way >> > > > > > > using >> > > > > > > > > > > windows >> > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> > > watermarks. It is bounded data vs. >> unbounded >> > > > data. >> > > > > > In >> > > > > > > > Beam >> > > > > > > > > > for >> > > > > > > > > > > > > > >> example, >> > > > > > > > > > > > > > >> > you >> > > > > > > > > > > > > > >> > > can use the "global window" and the final >> > > > > watermark >> > > > > > to >> > > > > > > > > > > > accomplish >> > > > > > > > > > > > > > what >> > > > > > > > > > > > > > >> > you >> > > > > > > > > > > > > > >> > > are looking for. Batch is just a special >> > case >> > > of >> > > > > > > > streaming >> > > > > > > > > > > where >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > >> > source >> > > > > > > > > > > > > > >> > > emits the final watermark. >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > Thanks, >> > > > > > > > > > > > > > >> > > Thomas >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh >> > > Chawda >> > > > < >> > > > > > > > > > > > > > >> bhup...@datatorrent.com >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > wrote: >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a >> batch >> > > > > > > application, >> > > > > > > > > > then >> > > > > > > > > > > > > batch >> > > > > > > > > > > > > > >> aware >> > > > > > > > > > > > > > >> > > > operators need to be used in the >> > > application. >> > > > > > > > > > > > > > >> > > > The nature of the application is mostly >> > > > > controlled >> > > > > > > by >> > > > > > > > > the >> > > > > > > > > > > > input >> > > > > > > > > > > > > > and >> > > > > > > > > > > > > > >> the >> > > > > > > > > > > > > > >> > > > output operators used in the >> application. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > For example, consider an application >> which >> > > > needs >> > > > > > to >> > > > > > > > > filter >> > > > > > > > > > > > > records >> > > > > > > > > > > > > > >> in a >> > > > > > > > > > > > > > >> > > > input file and store the filtered >> records >> > in >> > > > > > another >> > > > > > > > > file. >> > > > > > > > > > > The >> > > > > > > > > > > > > > >> nature >> > > > > > > > > > > > > > >> > of >> > > > > > > > > > > > > > >> > > > this app is to end once the entire >> file is >> > > > > > > processed. >> > > > > > > > > > > > Following >> > > > > > > > > > > > > > >> things >> > > > > > > > > > > > > > >> > > are >> > > > > > > > > > > > > > >> > > > expected of the application: >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > 1. Once the input data is over, >> > finalize >> > > > the >> > > > > > > output >> > > > > > > > > > file >> > > > > > > > > > > > from >> > > > > > > > > > > > > > >> .tmp >> > > > > > > > > > > > > > >> > > > files. - Responsibility of output >> > > operator >> > > > > > > > > > > > > > >> > > > 2. End the application, once the >> data >> > is >> > > > read >> > > > > > and >> > > > > > > > > > > > processed - >> > > > > > > > > > > > > > >> > > > Responsibility of input operator >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > These functions are essential to allow >> the >> > > > user >> > > > > to >> > > > > > > do >> > > > > > > > > > higher >> > > > > > > > > > > > > level >> > > > > > > > > > > > > > >> > > > operations like scheduling or running a >> > > > workflow >> > > > > > of >> > > > > > > > > batch >> > > > > > > > > > > > > > >> applications. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > I am not sure about intermediate >> > > (processing) >> > > > > > > > operators, >> > > > > > > > > > as >> > > > > > > > > > > > > there >> > > > > > > > > > > > > > >> is no >> > > > > > > > > > > > > > >> > > > change in their functionality for batch >> > use >> > > > > cases. >> > > > > > > > > > Perhaps, >> > > > > > > > > > > > > > allowing >> > > > > > > > > > > > > > >> > > > running multiple batches in a single >> > > > application >> > > > > > may >> > > > > > > > > > require >> > > > > > > > > > > > > > similar >> > > > > > > > > > > > > > >> > > > changes in processing operators as >> well. >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, >> Priyanka >> > > > > Gugale < >> > > > > > > > > > > > > > pri...@apache.org >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > > wrote: >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > > > Will it make an impression on user >> that, >> > > if >> > > > he >> > > > > > > has a >> > > > > > > > > > batch >> > > > > > > > > > > > > > >> usecase he >> > > > > > > > > > > > > > >> > > has >> > > > > > > > > > > > > > >> > > > > to use batch aware operators only? If >> > so, >> > > is >> > > > > > that >> > > > > > > > what >> > > > > > > > > > we >> > > > > > > > > > > > > > expect? >> > > > > > > > > > > > > > >> I >> > > > > > > > > > > > > > >> > am >> > > > > > > > > > > > > > >> > > > not >> > > > > > > > > > > > > > >> > > > > aware of how do we implement batch >> > > scenario >> > > > so >> > > > > > > this >> > > > > > > > > > might >> > > > > > > > > > > > be a >> > > > > > > > > > > > > > >> basic >> > > > > > > > > > > > > > >> > > > > question. >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > -Priyanka >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, >> > Bhupesh >> > > > > > Chawda < >> > > > > > > > > > > > > > >> > > > bhup...@datatorrent.com> >> > > > > > > > > > > > > > >> > > > > wrote: >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > > > Hi All, >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > While design / implementation for >> > custom >> > > > > > control >> > > > > > > > > > tuples >> > > > > > > > > > > is >> > > > > > > > > > > > > > >> > ongoing, I >> > > > > > > > > > > > > > >> > > > > > thought it would be a good idea to >> > > > consider >> > > > > > its >> > > > > > > > > > > usefulness >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > >> one >> > > > > > > > > > > > > > >> > of >> > > > > > > > > > > > > > >> > > > the >> > > > > > > > > > > > > > >> > > > > > use cases - batch applications. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / >> extend >> > > > > existing >> > > > > > > > > > operators >> > > > > > > > > > > in >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > >> > Apache >> > > > > > > > > > > > > > >> > > > > Apex >> > > > > > > > > > > > > > >> > > > > > Malhar library so that it is easy >> to >> > use >> > > > > them >> > > > > > in >> > > > > > > > > batch >> > > > > > > > > > > use >> > > > > > > > > > > > > > >> cases. >> > > > > > > > > > > > > > >> > > > > > Naturally, this would be applicable >> > for >> > > > > only a >> > > > > > > > > subset >> > > > > > > > > > of >> > > > > > > > > > > > > > >> operators >> > > > > > > > > > > > > > >> > > like >> > > > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases. >> > > > > > > > > > > > > > >> > > > > > For example, for a file based >> store, >> > > (say >> > > > > HDFS >> > > > > > > > > store), >> > > > > > > > > > > we >> > > > > > > > > > > > > > could >> > > > > > > > > > > > > > >> > have >> > > > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput >> > > > operators >> > > > > > > which >> > > > > > > > > > allow >> > > > > > > > > > > > > easy >> > > > > > > > > > > > > > >> > > > integration >> > > > > > > > > > > > > > >> > > > > > into a batch application. These >> > > operators >> > > > > > would >> > > > > > > be >> > > > > > > > > > > > extended >> > > > > > > > > > > > > > from >> > > > > > > > > > > > > > >> > > their >> > > > > > > > > > > > > > >> > > > > > existing implementations and would >> be >> > > > "Batch >> > > > > > > > Aware", >> > > > > > > > > > in >> > > > > > > > > > > > that >> > > > > > > > > > > > > > >> they >> > > > > > > > > > > > > > >> > may >> > > > > > > > > > > > > > >> > > > > > understand the meaning of some >> > specific >> > > > > > control >> > > > > > > > > tuples >> > > > > > > > > > > > that >> > > > > > > > > > > > > > flow >> > > > > > > > > > > > > > >> > > > through >> > > > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batch >> > seem >> > > to >> > > > > be >> > > > > > > the >> > > > > > > > > > > obvious >> > > > > > > > > > > > > > >> > candidates >> > > > > > > > > > > > > > >> > > > that >> > > > > > > > > > > > > > >> > > > > > come to mind. On receipt of such >> > control >> > > > > > tuples, >> > > > > > > > > they >> > > > > > > > > > > may >> > > > > > > > > > > > > try >> > > > > > > > > > > > > > to >> > > > > > > > > > > > > > >> > > modify >> > > > > > > > > > > > > > >> > > > > the >> > > > > > > > > > > > > > >> > > > > > behavior of the operator - to >> > > reinitialize >> > > > > > some >> > > > > > > > > > metrics >> > > > > > > > > > > or >> > > > > > > > > > > > > > >> finalize >> > > > > > > > > > > > > > >> > > an >> > > > > > > > > > > > > > >> > > > > > output file for example. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > We can discuss the potential >> control >> > > > tuples >> > > > > > and >> > > > > > > > > > actions >> > > > > > > > > > > in >> > > > > > > > > > > > > > >> detail, >> > > > > > > > > > > > > > >> > > but >> > > > > > > > > > > > > > >> > > > > > first I would like to understand >> the >> > > views >> > > > > of >> > > > > > > the >> > > > > > > > > > > > community >> > > > > > > > > > > > > > for >> > > > > > > > > > > > > > >> > this >> > > > > > > > > > > > > > >> > > > > > proposal. >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > > ~ Bhupesh >> > > > > > > > > > > > > > >> > > > > > >> > > > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > > > > >> > > >> > > > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >