Yes Amol, Watermarks, in general solve a different issue and we should not mix it with data association. If needed, the data association can be solved by the user on the operator/ application layer. The engine should not worry about it.
Regarding your point on event time based watermarks, I think they too can be solved at the operator level where the input operators can play a role. We should keep the event time awareness logic out of the engine. ~ Bhupesh _______________________________________________________ Bhupesh Chawda Software Engineer E: [email protected] | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Sat, Feb 18, 2017 at 10:44 PM, Amol Kekre <[email protected]> wrote: > Bhupesh, > That is true, but in reality watermarks do not solve a design problem in > the DAG where data is getting mixed up. All the watermarks do is to convey > "start" and "end" within the stream. The start and end control tuples > should have the physical operator id, + a monotonically increasing number. > Both these are inserted by engine and are not user supplied, i.e. engine > takes up the guarantees of idenfying these watermarks. This concept is same > as our current start-window and end-window (which has worked well). > > Today Apex does not have watermarks, and lets say I am sending "start > something", "end something" through another port. I will still need to not > mix data in a transform operator down stream. That problem exist today and > will continue. Putting filename on every tuple is too much of a performance > hit. Secondly a lot of batch operations are not file related (i.e. file to > file), they are collection of "data" split into part files (due to > performance reason) and grouping/dimensions/event time/... are done based > on internals of the file. In case of file to file copy, user should be > expected to route the data properly (parallel partition?). > > Event-time based watermarks needs a separate thread. I am certain that > engine will need to be event-time aware, and will need to take this into > account for proper layout. > > Thks > Amol > > > *Follow @amolhkekre* > *Join us at Apex Big Data World-San Jose > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > [image: http://www.apexbigdata.com/san-jose-register.html] > <http://www.apexbigdata.com/san-jose-register.html> > > On Sat, Feb 18, 2017 at 8:17 AM, Bhupesh Chawda <[email protected]> > wrote: > > > Amol, agreed. We can address event time based watermarks once file batch > is > > done. > > Regarding, file batch support: by allowing to partition an input (file) > > operator, we are implicitly mixing multiple batches. Even if the user > does > > not do any transformations, we should be able to write the correct data > to > > right files at the destination. > > > > ~ Bhupesh > > > > > > _______________________________________________________ > > > > Bhupesh Chawda > > > > Software Engineer > > > > E: [email protected] | Twitter: @bhupeshsc > > > > www.datatorrent.com | apex.apache.org > > > > > > > > On Sat, Feb 18, 2017 at 12:26 PM, Amol Kekre <[email protected]> > wrote: > > > > > Thomas, > > > The watermarks we have in Apex (start-window and end-window) are > working > > > good. It is fine to take a look at event time, but basic file I/O does > > not > > > need anything more than start and end. Lets say they are > start-something, > > > end-something. The main difference here is that the tuples are user > > > generated, other than that they should follow similar principle as > > > start-window & end-window. The commonality includes > > > - dedup of start-st and end-st > > > - First start-st passes through > > > - Last end-st passes through > > > - Engine indentifies them with chronologically increasing number and > > source > > > > > > The only main difference is that an emit of these is user controlled > and > > > cannot be guaranteed to happen as such. BTW, part files are rarely done > > > based on event time, they are almost always split by size. A vast > > majority > > > of batch cases have hourly files bound by arrival time and not event > > time. > > > > > > Bhupesh, > > > Attaching file names to tuples does not scale. If user mixes two > batches, > > > then the user would need to handle the transformations. Post file batch > > > support, we should look at event time support. Unlike file based > batches, > > > event time will overlap each other, i.e. at a given time at least two > (if > > > not more) event times will be active. I think the engine will need to > be > > > event time aware. > > > > > > Thks > > > Amol > > > > > > > > > > > > *Follow @amolhkekre* > > > *Join us at Apex Big Data World-San Jose > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > <http://www.apexbigdata.com/san-jose-register.html> > > > > > > On Wed, Feb 15, 2017 at 9:07 PM, Thomas Weise <[email protected]> wrote: > > > > > > > I don't think this should be designed based on a simplistic file > > > > input-output scenario. It would be good to include a stateful > > > > transformation based on event time. > > > > > > > > More complex pipelines contain stateful transformations that depend > on > > > > windowing and watermarks. I think we need a watermark concept that is > > > based > > > > on progress in event time (or other monotonic increasing sequence) > that > > > > other operators can generically work with. > > > > > > > > Note that even file input in many cases can produce time based > > > watermarks, > > > > for example when you read part files that are bound by event time. > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda < > > [email protected] > > > > > > > > wrote: > > > > > > > > > For better understanding the use case for control tuples in batch, > I > > > am > > > > > creating a prototype for a batch application using File Input and > > File > > > > > Output operators. > > > > > > > > > > To enable basic batch processing for File IO operators, I am > > proposing > > > > the > > > > > following changes to File input and output operators: > > > > > 1. File Input operator emits a watermark each time it opens and > > closes > > > a > > > > > file. These can be "start file" and "end file" watermarks which > > include > > > > the > > > > > corresponding file names. The "start file" tuple should be sent > > before > > > > any > > > > > of the data from that file flows. > > > > > 2. File Input operator can be configured to end the application > > after a > > > > > single or n scans of the directory (a batch). This is where the > > > operator > > > > > emits the final watermark (the end of application control tuple). > > This > > > > will > > > > > also shutdown the application. > > > > > 3. The File output operator handles these control tuples. "Start > > file" > > > > > initializes the file name for the incoming tuples. "End file" > > watermark > > > > > forces a finalize on that file. > > > > > > > > > > The user would be able to enable the operators to send only those > > > > > watermarks that are needed in the application. If none of the > options > > > are > > > > > configured, the operators behave as in a streaming application. > > > > > > > > > > There are a few challenges in the implementation where the input > > > operator > > > > > is partitioned. In this case, the correlation between the start/end > > > for a > > > > > file and the data tuples for that file is lost. Hence we need to > > > maintain > > > > > the filename as part of each tuple in the pipeline. > > > > > > > > > > The "start file" and "end file" control tuples in this example are > > > > > temporary names for watermarks. We can have generic "start batch" / > > > "end > > > > > batch" tuples which could be used for other use cases as well. The > > > Final > > > > > watermark is common and serves the same purpose in each case. > > > > > > > > > > Please let me know your thoughts on this. > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda < > > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Yes, this can be part of operator configuration. Given this, for > a > > > user > > > > > to > > > > > > define a batch application, would mean configuring the connectors > > > > (mostly > > > > > > the input operator) in the application for the desired behavior. > > > > > Similarly, > > > > > > there can be other use cases that can be achieved other than > batch. > > > > > > > > > > > > We may also need to take care of the following: > > > > > > 1. Make sure that the watermarks or control tuples are consistent > > > > across > > > > > > sources. Meaning an HDFS sink should be able to interpret the > > > watermark > > > > > > tuple sent out by, say, a JDBC source. > > > > > > 2. In addition to I/O connectors, we should also look at the need > > for > > > > > > processing operators to understand some of the control tuples / > > > > > watermarks. > > > > > > For example, we may want to reset the operator behavior on > arrival > > of > > > > > some > > > > > > watermark tuple. > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <[email protected]> > > > wrote: > > > > > > > > > > > >> The HDFS source can operate in two modes, bounded or unbounded. > If > > > you > > > > > >> scan > > > > > >> only once, then it should emit the final watermark after it is > > done. > > > > > >> Otherwise it would emit watermarks based on a policy (files > names > > > > etc.). > > > > > >> The mechanism to generate the marks may depend on the type of > > source > > > > and > > > > > >> the user needs to be able to influence/configure it. > > > > > >> > > > > > >> Thomas > > > > > >> > > > > > >> > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda < > > > > > [email protected]> > > > > > >> wrote: > > > > > >> > > > > > >> > Hi Thomas, > > > > > >> > > > > > > >> > I am not sure that I completely understand your suggestion. > Are > > > you > > > > > >> > suggesting to broaden the scope of the proposal to treat all > > > sources > > > > > as > > > > > >> > bounded as well as unbounded? > > > > > >> > > > > > > >> > In case of Apex, we treat all sources as unbounded sources. > Even > > > > > bounded > > > > > >> > sources like HDFS file source is treated as unbounded by means > > of > > > > > >> scanning > > > > > >> > the input directory repeatedly. > > > > > >> > > > > > > >> > Let's consider HDFS file source for example: > > > > > >> > In this case, if we treat it as a bounded source, we can > define > > > > hooks > > > > > >> which > > > > > >> > allows us to detect the end of the file and send the "final > > > > > watermark". > > > > > >> We > > > > > >> > could also consider HDFS file source as a streaming source and > > > > define > > > > > >> hooks > > > > > >> > which send watermarks based on different kinds of windows. > > > > > >> > > > > > > >> > Please correct me if I misunderstand. > > > > > >> > > > > > > >> > ~ Bhupesh > > > > > >> > > > > > > >> > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <[email protected] > > > > > > wrote: > > > > > >> > > > > > > >> > > Bhupesh, > > > > > >> > > > > > > > >> > > Please see how that can be solved in a unified way using > > windows > > > > and > > > > > >> > > watermarks. It is bounded data vs. unbounded data. In Beam > for > > > > > >> example, > > > > > >> > you > > > > > >> > > can use the "global window" and the final watermark to > > > accomplish > > > > > what > > > > > >> > you > > > > > >> > > are looking for. Batch is just a special case of streaming > > where > > > > the > > > > > >> > source > > > > > >> > > emits the final watermark. > > > > > >> > > > > > > > >> > > Thanks, > > > > > >> > > Thomas > > > > > >> > > > > > > > >> > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda < > > > > > >> [email protected] > > > > > >> > > > > > > > >> > > wrote: > > > > > >> > > > > > > > >> > > > Yes, if the user needs to develop a batch application, > then > > > > batch > > > > > >> aware > > > > > >> > > > operators need to be used in the application. > > > > > >> > > > The nature of the application is mostly controlled by the > > > input > > > > > and > > > > > >> the > > > > > >> > > > output operators used in the application. > > > > > >> > > > > > > > > >> > > > For example, consider an application which needs to filter > > > > records > > > > > >> in a > > > > > >> > > > input file and store the filtered records in another file. > > The > > > > > >> nature > > > > > >> > of > > > > > >> > > > this app is to end once the entire file is processed. > > > Following > > > > > >> things > > > > > >> > > are > > > > > >> > > > expected of the application: > > > > > >> > > > > > > > > >> > > > 1. Once the input data is over, finalize the output > file > > > from > > > > > >> .tmp > > > > > >> > > > files. - Responsibility of output operator > > > > > >> > > > 2. End the application, once the data is read and > > > processed - > > > > > >> > > > Responsibility of input operator > > > > > >> > > > > > > > > >> > > > These functions are essential to allow the user to do > higher > > > > level > > > > > >> > > > operations like scheduling or running a workflow of batch > > > > > >> applications. > > > > > >> > > > > > > > > >> > > > I am not sure about intermediate (processing) operators, > as > > > > there > > > > > >> is no > > > > > >> > > > change in their functionality for batch use cases. > Perhaps, > > > > > allowing > > > > > >> > > > running multiple batches in a single application may > require > > > > > similar > > > > > >> > > > changes in processing operators as well. > > > > > >> > > > > > > > > >> > > > ~ Bhupesh > > > > > >> > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale < > > > > > [email protected] > > > > > >> > > > > > > >> > > > wrote: > > > > > >> > > > > > > > > >> > > > > Will it make an impression on user that, if he has a > batch > > > > > >> usecase he > > > > > >> > > has > > > > > >> > > > > to use batch aware operators only? If so, is that what > we > > > > > expect? > > > > > >> I > > > > > >> > am > > > > > >> > > > not > > > > > >> > > > > aware of how do we implement batch scenario so this > might > > > be a > > > > > >> basic > > > > > >> > > > > question. > > > > > >> > > > > > > > > > >> > > > > -Priyanka > > > > > >> > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda < > > > > > >> > > > [email protected]> > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi All, > > > > > >> > > > > > > > > > > >> > > > > > While design / implementation for custom control > tuples > > is > > > > > >> > ongoing, I > > > > > >> > > > > > thought it would be a good idea to consider its > > usefulness > > > > in > > > > > >> one > > > > > >> > of > > > > > >> > > > the > > > > > >> > > > > > use cases - batch applications. > > > > > >> > > > > > > > > > > >> > > > > > This is a proposal to adapt / extend existing > operators > > in > > > > the > > > > > >> > Apache > > > > > >> > > > > Apex > > > > > >> > > > > > Malhar library so that it is easy to use them in batch > > use > > > > > >> cases. > > > > > >> > > > > > Naturally, this would be applicable for only a subset > of > > > > > >> operators > > > > > >> > > like > > > > > >> > > > > > File, JDBC and NoSQL databases. > > > > > >> > > > > > For example, for a file based store, (say HDFS store), > > we > > > > > could > > > > > >> > have > > > > > >> > > > > > FileBatchInput and FileBatchOutput operators which > allow > > > > easy > > > > > >> > > > integration > > > > > >> > > > > > into a batch application. These operators would be > > > extended > > > > > from > > > > > >> > > their > > > > > >> > > > > > existing implementations and would be "Batch Aware", > in > > > that > > > > > >> they > > > > > >> > may > > > > > >> > > > > > understand the meaning of some specific control tuples > > > that > > > > > flow > > > > > >> > > > through > > > > > >> > > > > > the DAG. Start batch and end batch seem to be the > > obvious > > > > > >> > candidates > > > > > >> > > > that > > > > > >> > > > > > come to mind. On receipt of such control tuples, they > > may > > > > try > > > > > to > > > > > >> > > modify > > > > > >> > > > > the > > > > > >> > > > > > behavior of the operator - to reinitialize some > metrics > > or > > > > > >> finalize > > > > > >> > > an > > > > > >> > > > > > output file for example. > > > > > >> > > > > > > > > > > >> > > > > > We can discuss the potential control tuples and > actions > > in > > > > > >> detail, > > > > > >> > > but > > > > > >> > > > > > first I would like to understand the views of the > > > community > > > > > for > > > > > >> > this > > > > > >> > > > > > proposal. > > > > > >> > > > > > > > > > > >> > > > > > ~ Bhupesh > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >
