Maybe we should take this to the Beam mailing list and see what people think how this problem can be solved using watermarks and windowing? I think we will get some good suggestions.
David On Tue, Feb 28, 2017 at 8:17 AM, Thomas Weise <t...@apache.org> wrote: > I think that discussion is related, but in our example we have many keys > that belong to a file that all fall into a common window boundary. > > WRT the naming of the window ("sequence" etc.), it depends on the use case > and the operators should be kept generic. It could be a sequence that is > generated, derived from streaming window, from event data etc. > > Thanks, > Thomas > > > On Tue, Feb 28, 2017 at 7:57 AM, David Yan <david...@gmail.com> wrote: > > > There is a discussion in the Flink mailing list about key-based > watermarks. > > I think it's relevant to our use case here. > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E > > > > David > > > > On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <bhup...@datatorrent.com > > > > wrote: > > > > > Hi David, > > > > > > If using time window does not seem appropriate, we can have another > class > > > which is more suited for such sequential and distinct windows. > Perhaps, a > > > CustomWindow option can be introduced which takes in a window id. The > > > purpose of this window option could be to translate the window id into > > > appropriate timestamps. > > > > > > Another option would be to go with a custom timestampExtractor for such > > > tuples which translates the each unique file name to a distinct > timestamp > > > while using time windows in the windowed operator. > > > > > > ~ Bhupesh > > > > > > > > > _______________________________________________________ > > > > > > Bhupesh Chawda > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com> > wrote: > > > > > > > I now see your rationale on putting the filename in the window. > > > > As far as I understand, the reasons why the filename is not part of > the > > > key > > > > and the Global Window is not used are: > > > > > > > > 1) The files are processed in sequence, not in parallel > > > > 2) The windowed operator should not keep the state associated with > the > > > file > > > > when the processing of the file is done > > > > 3) The trigger should be fired for the file when a file is done > > > processing. > > > > > > > > However, if the file is just a sequence has nothing to do with a > > > timestamp, > > > > assigning a timestamp to a file is not an intuitive thing to do and > > would > > > > just create confusions to the users, especially when it's used as an > > > > example for new users. > > > > > > > > How about having a separate class called SequenceWindow? And perhaps > > > > TimeWindow can inherit from it? > > > > > > > > David > > > > > > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org> > wrote: > > > > > > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda < > > > bhup...@datatorrent.com > > > > > > > > > > wrote: > > > > > > > > > > > I think my comments related to count based windows might be > causing > > > > > > confusion. Let's not discuss count based scenarios for now. > > > > > > > > > > > > Just want to make sure we are on the same page wrt. the "each > file > > > is a > > > > > > batch" use case. As mentioned by Thomas, the each tuple from the > > same > > > > > file > > > > > > has the same timestamp (which is just a sequence number) and that > > > helps > > > > > > keep tuples from each file in a separate window. > > > > > > > > > > > > > > > > Yes, in this case it is a sequence number, but it could be a time > > stamp > > > > > also, depending on the file naming convention. And if it was event > > time > > > > > processing, the watermark would be derived from records within the > > > file. > > > > > > > > > > Agreed, the source should have a mechanism to control the time > stamp > > > > > extraction along with everything else pertaining to the watermark > > > > > generation. > > > > > > > > > > > > > > > > We could also implement a "timestampExtractor" interface to > > identify > > > > the > > > > > > timestamp (sequence number) for a file. > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > _______________________________________________________ > > > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <t...@apache.org> > > > wrote: > > > > > > > > > > > > > I don't think this is a use case for count based window. > > > > > > > > > > > > > > We have multiple files that are retrieved in a sequence and > there > > > is > > > > no > > > > > > > knowledge of the number of records per file. The requirement is > > to > > > > > > > aggregate each file separately and emit the aggregate when the > > file > > > > is > > > > > > read > > > > > > > fully. There is no concept of "end of something" for an > > individual > > > > key > > > > > > and > > > > > > > global window isn't applicable. > > > > > > > > > > > > > > However, as already explained and implemented by Bhupesh, this > > can > > > be > > > > > > > solved using watermark and window (in this case the window > > > timestamp > > > > > > isn't > > > > > > > a timestamp, but a file sequence, but that doesn't matter. > > > > > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan <david...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > I don't think this is the way to go. Global Window only means > > the > > > > > > > timestamp > > > > > > > > does not matter (or that there is no timestamp). It does not > > > > > > necessarily > > > > > > > > mean it's a large batch. Unless there is some notion of event > > > time > > > > > for > > > > > > > each > > > > > > > > file, you don't want to embed the file into the window > itself. > > > > > > > > > > > > > > > > If you want the result broken up by file name, and if the > files > > > are > > > > > to > > > > > > be > > > > > > > > processed in parallel, I think making the file name be part > of > > > the > > > > > key > > > > > > is > > > > > > > > the way to go. I think it's very confusing if we somehow make > > the > > > > > file > > > > > > to > > > > > > > > be part of the window. > > > > > > > > > > > > > > > > For count-based window, it's not implemented yet and you're > > > welcome > > > > > to > > > > > > > add > > > > > > > > that feature. In case of count-based windows, there would be > no > > > > > notion > > > > > > of > > > > > > > > time and you probably only trigger at the end of each window. > > In > > > > the > > > > > > case > > > > > > > > of count-based windows, the watermark only matters for batch > > > since > > > > > you > > > > > > > need > > > > > > > > a way to know when the batch has ended (if the count is 10, > the > > > > > number > > > > > > of > > > > > > > > tuples in the batch is let's say 105, you need a way to end > the > > > > last > > > > > > > window > > > > > > > > with 5 tuples). > > > > > > > > > > > > > > > > David > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda < > > > > > > bhup...@datatorrent.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi David, > > > > > > > > > > > > > > > > > > Thanks for your comments. > > > > > > > > > > > > > > > > > > The wordcount example that I created based on the windowed > > > > operator > > > > > > > does > > > > > > > > > processing of word counts per file (each file as a separate > > > > batch), > > > > > > > i.e. > > > > > > > > > process counts for each file and dump into separate files. > > > > > > > > > As I understand Global window is for one large batch; i.e. > > all > > > > > > incoming > > > > > > > > > data falls into the same batch. This could not be processed > > > using > > > > > > > > > GlobalWindow option as we need more than one windows. In > this > > > > > case, I > > > > > > > > > configured the windowed operator to have time windows of > 1ms > > > each > > > > > and > > > > > > > > > passed data for each file with increasing timestamps: > (file1, > > > 1), > > > > > > > (file2, > > > > > > > > > 2) and so on. Is there a better way of handling this > > scenario? > > > > > > > > > > > > > > > > > > Regarding (2 - count based windows), I think there is a > > trigger > > > > > > option > > > > > > > to > > > > > > > > > process count based windows. In case I want to process > every > > > 1000 > > > > > > > tuples > > > > > > > > as > > > > > > > > > a batch, I could set the Trigger option to CountTrigger > with > > > the > > > > > > > > > accumulation set to Discarding. Is this correct? > > > > > > > > > > > > > > > > > > I agree that (4. Final Watermark) can be done using Global > > > > window. > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > _______________________________________________________ > > > > > > > > > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan < > > > david...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > I'm worried that we are making the watermark concept too > > > > > > complicated. > > > > > > > > > > > > > > > > > > > > Watermarks should simply just tell you what windows can > be > > > > > > considered > > > > > > > > > > complete. > > > > > > > > > > > > > > > > > > > > Point 2 is basically a count-based window. Watermarks do > > not > > > > > play a > > > > > > > > role > > > > > > > > > > here because the window is always complete at the n-th > > tuple. > > > > > > > > > > > > > > > > > > > > If I understand correctly, point 3 is for batch > processing > > of > > > > > > files. > > > > > > > > > Unless > > > > > > > > > > the files contain timed events, it sounds to be that this > > can > > > > be > > > > > > > > achieved > > > > > > > > > > with just a Global Window. For signaling EOF, a watermark > > > with > > > > a > > > > > > > > > +infinity > > > > > > > > > > timestamp can be used so that triggers will be fired upon > > > > receipt > > > > > > of > > > > > > > > that > > > > > > > > > > watermark. > > > > > > > > > > > > > > > > > > > > For point 4, just like what I mentioned above, can be > > > achieved > > > > > > with a > > > > > > > > > > watermark with a +infinity timestamp. > > > > > > > > > > > > > > > > > > > > David > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < > > > > > > > > bhup...@datatorrent.com > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Thomas, > > > > > > > > > > > > > > > > > > > > > > For an input operator which is supposed to generate > > > > watermarks > > > > > > for > > > > > > > > > > > downstream operators, I can think about the following > > > > > watermarks > > > > > > > that > > > > > > > > > the > > > > > > > > > > > operator can emit: > > > > > > > > > > > 1. Time based watermarks (the high watermark / low > > > watermark) > > > > > > > > > > > 2. Number of tuple based watermarks (Every n tuples) > > > > > > > > > > > 3. File based watermarks (Start file, end file) > > > > > > > > > > > 4. Final watermark > > > > > > > > > > > > > > > > > > > > > > File based watermarks seem to be applicable for batch > > (file > > > > > > based) > > > > > > > as > > > > > > > > > > well, > > > > > > > > > > > and hence I thought of looking at these first. Does > this > > > seem > > > > > to > > > > > > be > > > > > > > > in > > > > > > > > > > line > > > > > > > > > > > with the thought process? > > > > > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ______________________________ > _________________________ > > > > > > > > > > > > > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > > > > > > > > > > > > > Software Engineer > > > > > > > > > > > > > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > > > > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise < > > > > t...@apache.org > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > I don't think this should be designed based on a > > > simplistic > > > > > > file > > > > > > > > > > > > input-output scenario. It would be good to include a > > > > stateful > > > > > > > > > > > > transformation based on event time. > > > > > > > > > > > > > > > > > > > > > > > > More complex pipelines contain stateful > transformations > > > > that > > > > > > > depend > > > > > > > > > on > > > > > > > > > > > > windowing and watermarks. I think we need a watermark > > > > concept > > > > > > > that > > > > > > > > is > > > > > > > > > > > based > > > > > > > > > > > > on progress in event time (or other monotonic > > increasing > > > > > > > sequence) > > > > > > > > > that > > > > > > > > > > > > other operators can generically work with. > > > > > > > > > > > > > > > > > > > > > > > > Note that even file input in many cases can produce > > time > > > > > based > > > > > > > > > > > watermarks, > > > > > > > > > > > > for example when you read part files that are bound > by > > > > event > > > > > > > time. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Thomas > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda < > > > > > > > > > > bhup...@datatorrent.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > For better understanding the use case for control > > > tuples > > > > in > > > > > > > > batch, > > > > > > > > > I > > > > > > > > > > > am > > > > > > > > > > > > > creating a prototype for a batch application using > > File > > > > > Input > > > > > > > and > > > > > > > > > > File > > > > > > > > > > > > > Output operators. > > > > > > > > > > > > > > > > > > > > > > > > > > To enable basic batch processing for File IO > > > operators, I > > > > > am > > > > > > > > > > proposing > > > > > > > > > > > > the > > > > > > > > > > > > > following changes to File input and output > operators: > > > > > > > > > > > > > 1. File Input operator emits a watermark each time > it > > > > opens > > > > > > and > > > > > > > > > > closes > > > > > > > > > > > a > > > > > > > > > > > > > file. These can be "start file" and "end file" > > > watermarks > > > > > > which > > > > > > > > > > include > > > > > > > > > > > > the > > > > > > > > > > > > > corresponding file names. The "start file" tuple > > should > > > > be > > > > > > sent > > > > > > > > > > before > > > > > > > > > > > > any > > > > > > > > > > > > > of the data from that file flows. > > > > > > > > > > > > > 2. File Input operator can be configured to end the > > > > > > application > > > > > > > > > > after a > > > > > > > > > > > > > single or n scans of the directory (a batch). This > is > > > > where > > > > > > the > > > > > > > > > > > operator > > > > > > > > > > > > > emits the final watermark (the end of application > > > control > > > > > > > tuple). > > > > > > > > > > This > > > > > > > > > > > > will > > > > > > > > > > > > > also shutdown the application. > > > > > > > > > > > > > 3. The File output operator handles these control > > > tuples. > > > > > > > "Start > > > > > > > > > > file" > > > > > > > > > > > > > initializes the file name for the incoming tuples. > > "End > > > > > file" > > > > > > > > > > watermark > > > > > > > > > > > > > forces a finalize on that file. > > > > > > > > > > > > > > > > > > > > > > > > > > The user would be able to enable the operators to > > send > > > > only > > > > > > > those > > > > > > > > > > > > > watermarks that are needed in the application. If > > none > > > of > > > > > the > > > > > > > > > options > > > > > > > > > > > are > > > > > > > > > > > > > configured, the operators behave as in a streaming > > > > > > application. > > > > > > > > > > > > > > > > > > > > > > > > > > There are a few challenges in the implementation > > where > > > > the > > > > > > > input > > > > > > > > > > > operator > > > > > > > > > > > > > is partitioned. In this case, the correlation > between > > > the > > > > > > > > start/end > > > > > > > > > > > for a > > > > > > > > > > > > > file and the data tuples for that file is lost. > Hence > > > we > > > > > need > > > > > > > to > > > > > > > > > > > maintain > > > > > > > > > > > > > the filename as part of each tuple in the pipeline. > > > > > > > > > > > > > > > > > > > > > > > > > > The "start file" and "end file" control tuples in > > this > > > > > > example > > > > > > > > are > > > > > > > > > > > > > temporary names for watermarks. We can have generic > > > > "start > > > > > > > > batch" / > > > > > > > > > > > "end > > > > > > > > > > > > > batch" tuples which could be used for other use > cases > > > as > > > > > > well. > > > > > > > > The > > > > > > > > > > > Final > > > > > > > > > > > > > watermark is common and serves the same purpose in > > each > > > > > case. > > > > > > > > > > > > > > > > > > > > > > > > > > Please let me know your thoughts on this. > > > > > > > > > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda < > > > > > > > > > > > > bhup...@datatorrent.com> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this can be part of operator configuration. > > > Given > > > > > > this, > > > > > > > > for > > > > > > > > > a > > > > > > > > > > > user > > > > > > > > > > > > > to > > > > > > > > > > > > > > define a batch application, would mean > configuring > > > the > > > > > > > > connectors > > > > > > > > > > > > (mostly > > > > > > > > > > > > > > the input operator) in the application for the > > > desired > > > > > > > > behavior. > > > > > > > > > > > > > Similarly, > > > > > > > > > > > > > > there can be other use cases that can be achieved > > > other > > > > > > than > > > > > > > > > batch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > We may also need to take care of the following: > > > > > > > > > > > > > > 1. Make sure that the watermarks or control > tuples > > > are > > > > > > > > consistent > > > > > > > > > > > > across > > > > > > > > > > > > > > sources. Meaning an HDFS sink should be able to > > > > interpret > > > > > > the > > > > > > > > > > > watermark > > > > > > > > > > > > > > tuple sent out by, say, a JDBC source. > > > > > > > > > > > > > > 2. In addition to I/O connectors, we should also > > look > > > > at > > > > > > the > > > > > > > > need > > > > > > > > > > for > > > > > > > > > > > > > > processing operators to understand some of the > > > control > > > > > > > tuples / > > > > > > > > > > > > > watermarks. > > > > > > > > > > > > > > For example, we may want to reset the operator > > > behavior > > > > > on > > > > > > > > > arrival > > > > > > > > > > of > > > > > > > > > > > > > some > > > > > > > > > > > > > > watermark tuple. > > > > > > > > > > > > > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise < > > > > > > > t...@apache.org> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> The HDFS source can operate in two modes, > bounded > > or > > > > > > > > unbounded. > > > > > > > > > If > > > > > > > > > > > you > > > > > > > > > > > > > >> scan > > > > > > > > > > > > > >> only once, then it should emit the final > watermark > > > > after > > > > > > it > > > > > > > is > > > > > > > > > > done. > > > > > > > > > > > > > >> Otherwise it would emit watermarks based on a > > policy > > > > > > (files > > > > > > > > > names > > > > > > > > > > > > etc.). > > > > > > > > > > > > > >> The mechanism to generate the marks may depend > on > > > the > > > > > type > > > > > > > of > > > > > > > > > > source > > > > > > > > > > > > and > > > > > > > > > > > > > >> the user needs to be able to influence/configure > > it. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Thomas > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda > < > > > > > > > > > > > > > bhup...@datatorrent.com> > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > Hi Thomas, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > I am not sure that I completely understand > your > > > > > > > suggestion. > > > > > > > > > Are > > > > > > > > > > > you > > > > > > > > > > > > > >> > suggesting to broaden the scope of the > proposal > > to > > > > > treat > > > > > > > all > > > > > > > > > > > sources > > > > > > > > > > > > > as > > > > > > > > > > > > > >> > bounded as well as unbounded? > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > In case of Apex, we treat all sources as > > unbounded > > > > > > > sources. > > > > > > > > > Even > > > > > > > > > > > > > bounded > > > > > > > > > > > > > >> > sources like HDFS file source is treated as > > > > unbounded > > > > > by > > > > > > > > means > > > > > > > > > > of > > > > > > > > > > > > > >> scanning > > > > > > > > > > > > > >> > the input directory repeatedly. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Let's consider HDFS file source for example: > > > > > > > > > > > > > >> > In this case, if we treat it as a bounded > > source, > > > we > > > > > can > > > > > > > > > define > > > > > > > > > > > > hooks > > > > > > > > > > > > > >> which > > > > > > > > > > > > > >> > allows us to detect the end of the file and > send > > > the > > > > > > > "final > > > > > > > > > > > > > watermark". > > > > > > > > > > > > > >> We > > > > > > > > > > > > > >> > could also consider HDFS file source as a > > > streaming > > > > > > source > > > > > > > > and > > > > > > > > > > > > define > > > > > > > > > > > > > >> hooks > > > > > > > > > > > > > >> > which send watermarks based on different kinds > > of > > > > > > windows. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > Please correct me if I misunderstand. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > ~ Bhupesh > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise > < > > > > > > > > t...@apache.org > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > Bhupesh, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Please see how that can be solved in a > unified > > > way > > > > > > using > > > > > > > > > > windows > > > > > > > > > > > > and > > > > > > > > > > > > > >> > > watermarks. It is bounded data vs. unbounded > > > data. > > > > > In > > > > > > > Beam > > > > > > > > > for > > > > > > > > > > > > > >> example, > > > > > > > > > > > > > >> > you > > > > > > > > > > > > > >> > > can use the "global window" and the final > > > > watermark > > > > > to > > > > > > > > > > > accomplish > > > > > > > > > > > > > what > > > > > > > > > > > > > >> > you > > > > > > > > > > > > > >> > > are looking for. Batch is just a special > case > > of > > > > > > > streaming > > > > > > > > > > where > > > > > > > > > > > > the > > > > > > > > > > > > > >> > source > > > > > > > > > > > > > >> > > emits the final watermark. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > > > > >> > > Thomas > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh > > Chawda > > > < > > > > > > > > > > > > > >> bhup...@datatorrent.com > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a batch > > > > > > application, > > > > > > > > > then > > > > > > > > > > > > batch > > > > > > > > > > > > > >> aware > > > > > > > > > > > > > >> > > > operators need to be used in the > > application. > > > > > > > > > > > > > >> > > > The nature of the application is mostly > > > > controlled > > > > > > by > > > > > > > > the > > > > > > > > > > > input > > > > > > > > > > > > > and > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >> > > > output operators used in the application. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > For example, consider an application which > > > needs > > > > > to > > > > > > > > filter > > > > > > > > > > > > records > > > > > > > > > > > > > >> in a > > > > > > > > > > > > > >> > > > input file and store the filtered records > in > > > > > another > > > > > > > > file. > > > > > > > > > > The > > > > > > > > > > > > > >> nature > > > > > > > > > > > > > >> > of > > > > > > > > > > > > > >> > > > this app is to end once the entire file is > > > > > > processed. > > > > > > > > > > > Following > > > > > > > > > > > > > >> things > > > > > > > > > > > > > >> > > are > > > > > > > > > > > > > >> > > > expected of the application: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > 1. Once the input data is over, > finalize > > > the > > > > > > output > > > > > > > > > file > > > > > > > > > > > from > > > > > > > > > > > > > >> .tmp > > > > > > > > > > > > > >> > > > files. - Responsibility of output > > operator > > > > > > > > > > > > > >> > > > 2. End the application, once the data > is > > > read > > > > > and > > > > > > > > > > > processed - > > > > > > > > > > > > > >> > > > Responsibility of input operator > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > These functions are essential to allow the > > > user > > > > to > > > > > > do > > > > > > > > > higher > > > > > > > > > > > > level > > > > > > > > > > > > > >> > > > operations like scheduling or running a > > > workflow > > > > > of > > > > > > > > batch > > > > > > > > > > > > > >> applications. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > I am not sure about intermediate > > (processing) > > > > > > > operators, > > > > > > > > > as > > > > > > > > > > > > there > > > > > > > > > > > > > >> is no > > > > > > > > > > > > > >> > > > change in their functionality for batch > use > > > > cases. > > > > > > > > > Perhaps, > > > > > > > > > > > > > allowing > > > > > > > > > > > > > >> > > > running multiple batches in a single > > > application > > > > > may > > > > > > > > > require > > > > > > > > > > > > > similar > > > > > > > > > > > > > >> > > > changes in processing operators as well. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > ~ Bhupesh > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka > > > > Gugale < > > > > > > > > > > > > > pri...@apache.org > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > Will it make an impression on user that, > > if > > > he > > > > > > has a > > > > > > > > > batch > > > > > > > > > > > > > >> usecase he > > > > > > > > > > > > > >> > > has > > > > > > > > > > > > > >> > > > > to use batch aware operators only? If > so, > > is > > > > > that > > > > > > > what > > > > > > > > > we > > > > > > > > > > > > > expect? > > > > > > > > > > > > > >> I > > > > > > > > > > > > > >> > am > > > > > > > > > > > > > >> > > > not > > > > > > > > > > > > > >> > > > > aware of how do we implement batch > > scenario > > > so > > > > > > this > > > > > > > > > might > > > > > > > > > > > be a > > > > > > > > > > > > > >> basic > > > > > > > > > > > > > >> > > > > question. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > -Priyanka > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, > Bhupesh > > > > > Chawda < > > > > > > > > > > > > > >> > > > bhup...@datatorrent.com> > > > > > > > > > > > > > >> > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > Hi All, > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > While design / implementation for > custom > > > > > control > > > > > > > > > tuples > > > > > > > > > > is > > > > > > > > > > > > > >> > ongoing, I > > > > > > > > > > > > > >> > > > > > thought it would be a good idea to > > > consider > > > > > its > > > > > > > > > > usefulness > > > > > > > > > > > > in > > > > > > > > > > > > > >> one > > > > > > > > > > > > > >> > of > > > > > > > > > > > > > >> > > > the > > > > > > > > > > > > > >> > > > > > use cases - batch applications. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / extend > > > > existing > > > > > > > > > operators > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > >> > Apache > > > > > > > > > > > > > >> > > > > Apex > > > > > > > > > > > > > >> > > > > > Malhar library so that it is easy to > use > > > > them > > > > > in > > > > > > > > batch > > > > > > > > > > use > > > > > > > > > > > > > >> cases. > > > > > > > > > > > > > >> > > > > > Naturally, this would be applicable > for > > > > only a > > > > > > > > subset > > > > > > > > > of > > > > > > > > > > > > > >> operators > > > > > > > > > > > > > >> > > like > > > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases. > > > > > > > > > > > > > >> > > > > > For example, for a file based store, > > (say > > > > HDFS > > > > > > > > store), > > > > > > > > > > we > > > > > > > > > > > > > could > > > > > > > > > > > > > >> > have > > > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput > > > operators > > > > > > which > > > > > > > > > allow > > > > > > > > > > > > easy > > > > > > > > > > > > > >> > > > integration > > > > > > > > > > > > > >> > > > > > into a batch application. These > > operators > > > > > would > > > > > > be > > > > > > > > > > > extended > > > > > > > > > > > > > from > > > > > > > > > > > > > >> > > their > > > > > > > > > > > > > >> > > > > > existing implementations and would be > > > "Batch > > > > > > > Aware", > > > > > > > > > in > > > > > > > > > > > that > > > > > > > > > > > > > >> they > > > > > > > > > > > > > >> > may > > > > > > > > > > > > > >> > > > > > understand the meaning of some > specific > > > > > control > > > > > > > > tuples > > > > > > > > > > > that > > > > > > > > > > > > > flow > > > > > > > > > > > > > >> > > > through > > > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batch > seem > > to > > > > be > > > > > > the > > > > > > > > > > obvious > > > > > > > > > > > > > >> > candidates > > > > > > > > > > > > > >> > > > that > > > > > > > > > > > > > >> > > > > > come to mind. On receipt of such > control > > > > > tuples, > > > > > > > > they > > > > > > > > > > may > > > > > > > > > > > > try > > > > > > > > > > > > > to > > > > > > > > > > > > > >> > > modify > > > > > > > > > > > > > >> > > > > the > > > > > > > > > > > > > >> > > > > > behavior of the operator - to > > reinitialize > > > > > some > > > > > > > > > metrics > > > > > > > > > > or > > > > > > > > > > > > > >> finalize > > > > > > > > > > > > > >> > > an > > > > > > > > > > > > > >> > > > > > output file for example. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > We can discuss the potential control > > > tuples > > > > > and > > > > > > > > > actions > > > > > > > > > > in > > > > > > > > > > > > > >> detail, > > > > > > > > > > > > > >> > > but > > > > > > > > > > > > > >> > > > > > first I would like to understand the > > views > > > > of > > > > > > the > > > > > > > > > > > community > > > > > > > > > > > > > for > > > > > > > > > > > > > >> > this > > > > > > > > > > > > > >> > > > > > proposal. > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > ~ Bhupesh > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >