Yes Amol,
Watermarks, in general solve a different issue and we should not mix it
with data association. If needed, the data association can be solved by the
user on the operator/ application layer. The engine should not worry about
it.

Regarding your point on event time based watermarks, I think they too can
be solved at the operator level where the input operators can play a role.
We should keep the event time awareness logic out of the engine.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

Software Engineer

E: [email protected] | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Sat, Feb 18, 2017 at 10:44 PM, Amol Kekre <[email protected]> wrote:

> Bhupesh,
> That is true, but in reality watermarks do not solve a design problem in
> the DAG where data is getting mixed up. All the watermarks do is to convey
> "start" and "end" within the stream. The start and end control tuples
> should have the physical operator id, + a monotonically increasing number.
> Both these are inserted by engine and are not user supplied, i.e. engine
> takes up the guarantees of idenfying these watermarks. This concept is same
> as our current start-window and end-window (which has worked well).
>
> Today Apex does not have watermarks, and lets say I am sending "start
> something", "end something" through another port. I will still need to not
> mix data in a transform operator down stream. That problem exist today and
> will continue. Putting filename on every tuple is too much of a performance
> hit. Secondly a lot of batch operations are not file related (i.e. file to
> file), they are collection of "data" split into part files (due to
> performance reason) and grouping/dimensions/event time/... are done based
> on internals of the file. In case of file to file copy, user should be
> expected to route the data properly (parallel partition?).
>
> Event-time based watermarks needs a separate thread. I am certain that
> engine will need to be event-time aware, and will need to take this into
> account for proper layout.
>
> Thks
> Amol
>
>
> *Follow @amolhkekre*
> *Join us at Apex Big Data World-San Jose
> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> [image: http://www.apexbigdata.com/san-jose-register.html]
> <http://www.apexbigdata.com/san-jose-register.html>
>
> On Sat, Feb 18, 2017 at 8:17 AM, Bhupesh Chawda <[email protected]>
> wrote:
>
> > Amol, agreed. We can address event time based watermarks once file batch
> is
> > done.
> > Regarding, file batch support: by allowing to partition an input (file)
> > operator, we are implicitly mixing multiple batches. Even if the user
> does
> > not do any transformations, we should be able to write the correct data
> to
> > right files at the destination.
> >
> > ~ Bhupesh
> >
> >
> > _______________________________________________________
> >
> > Bhupesh Chawda
> >
> > Software Engineer
> >
> > E: [email protected] | Twitter: @bhupeshsc
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> >
> >
> > On Sat, Feb 18, 2017 at 12:26 PM, Amol Kekre <[email protected]>
> wrote:
> >
> > > Thomas,
> > > The watermarks we have in Apex (start-window and end-window) are
> working
> > > good. It is fine to take a look at event time, but basic file I/O does
> > not
> > > need anything more than start and end. Lets say they are
> start-something,
> > > end-something. The main difference here is that the tuples are user
> > > generated, other than that they should follow similar principle as
> > > start-window & end-window. The commonality includes
> > > - dedup of start-st and end-st
> > > - First start-st passes through
> > > - Last end-st passes through
> > > - Engine indentifies them with chronologically increasing number and
> > source
> > >
> > > The only main difference is that an emit of these is user controlled
> and
> > > cannot be guaranteed to happen as such. BTW, part files are rarely done
> > > based on event time, they are almost always split by size. A vast
> > majority
> > > of batch cases have hourly files bound by arrival time and not event
> > time.
> > >
> > > Bhupesh,
> > > Attaching file names to tuples does not scale. If user mixes two
> batches,
> > > then the user would need to handle the transformations. Post file batch
> > > support, we should look at event time support. Unlike file based
> batches,
> > > event time will overlap each other, i.e. at a given time at least two
> (if
> > > not more) event times will be active. I think the engine will need to
> be
> > > event time aware.
> > >
> > > Thks
> > > Amol
> > >
> > >
> > >
> > > *Follow @amolhkekre*
> > > *Join us at Apex Big Data World-San Jose
> > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > <http://www.apexbigdata.com/san-jose-register.html>
> > >
> > > On Wed, Feb 15, 2017 at 9:07 PM, Thomas Weise <[email protected]> wrote:
> > >
> > > > I don't think this should be designed based on a simplistic file
> > > > input-output scenario. It would be good to include a stateful
> > > > transformation based on event time.
> > > >
> > > > More complex pipelines contain stateful transformations that depend
> on
> > > > windowing and watermarks. I think we need a watermark concept that is
> > > based
> > > > on progress in event time (or other monotonic increasing sequence)
> that
> > > > other operators can generically work with.
> > > >
> > > > Note that even file input in many cases can produce time based
> > > watermarks,
> > > > for example when you read part files that are bound by event time.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > [email protected]
> > > >
> > > > wrote:
> > > >
> > > > > For better understanding the use case for control tuples in batch,
> ​I
> > > am
> > > > > creating a prototype for a batch application using File Input and
> > File
> > > > > Output operators.
> > > > >
> > > > > To enable basic batch processing for File IO operators, I am
> > proposing
> > > > the
> > > > > following changes to File input and output operators:
> > > > > 1. File Input operator emits a watermark each time it opens and
> > closes
> > > a
> > > > > file. These can be "start file" and "end file" watermarks which
> > include
> > > > the
> > > > > corresponding file names. The "start file" tuple should be sent
> > before
> > > > any
> > > > > of the data from that file flows.
> > > > > 2. File Input operator can be configured to end the application
> > after a
> > > > > single or n scans of the directory (a batch). This is where the
> > > operator
> > > > > emits the final watermark (the end of application control tuple).
> > This
> > > > will
> > > > > also shutdown the application.
> > > > > 3. The File output operator handles these control tuples. "Start
> > file"
> > > > > initializes the file name for the incoming tuples. "End file"
> > watermark
> > > > > forces a finalize on that file.
> > > > >
> > > > > The user would be able to enable the operators to send only those
> > > > > watermarks that are needed in the application. If none of the
> options
> > > are
> > > > > configured, the operators behave as in a streaming application.
> > > > >
> > > > > There are a few challenges in the implementation where the input
> > > operator
> > > > > is partitioned. In this case, the correlation between the start/end
> > > for a
> > > > > file and the data tuples for that file is lost. Hence we need to
> > > maintain
> > > > > the filename as part of each tuple in the pipeline.
> > > > >
> > > > > The "start file" and "end file" control tuples in this example are
> > > > > temporary names for watermarks. We can have generic "start batch" /
> > > "end
> > > > > batch" tuples which could be used for other use cases as well. The
> > > Final
> > > > > watermark is common and serves the same purpose in each case.
> > > > >
> > > > > Please let me know your thoughts on this.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > [email protected]>
> > > > > wrote:
> > > > >
> > > > > > Yes, this can be part of operator configuration. Given this, for
> a
> > > user
> > > > > to
> > > > > > define a batch application, would mean configuring the connectors
> > > > (mostly
> > > > > > the input operator) in the application for the desired behavior.
> > > > > Similarly,
> > > > > > there can be other use cases that can be achieved other than
> batch.
> > > > > >
> > > > > > We may also need to take care of the following:
> > > > > > 1. Make sure that the watermarks or control tuples are consistent
> > > > across
> > > > > > sources. Meaning an HDFS sink should be able to interpret the
> > > watermark
> > > > > > tuple sent out by, say, a JDBC source.
> > > > > > 2. In addition to I/O connectors, we should also look at the need
> > for
> > > > > > processing operators to understand some of the control tuples /
> > > > > watermarks.
> > > > > > For example, we may want to reset the operator behavior on
> arrival
> > of
> > > > > some
> > > > > > watermark tuple.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <[email protected]>
> > > wrote:
> > > > > >
> > > > > >> The HDFS source can operate in two modes, bounded or unbounded.
> If
> > > you
> > > > > >> scan
> > > > > >> only once, then it should emit the final watermark after it is
> > done.
> > > > > >> Otherwise it would emit watermarks based on a policy (files
> names
> > > > etc.).
> > > > > >> The mechanism to generate the marks may depend on the type of
> > source
> > > > and
> > > > > >> the user needs to be able to influence/configure it.
> > > > > >>
> > > > > >> Thomas
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda <
> > > > > [email protected]>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Thomas,
> > > > > >> >
> > > > > >> > I am not sure that I completely understand your suggestion.
> Are
> > > you
> > > > > >> > suggesting to broaden the scope of the proposal to treat all
> > > sources
> > > > > as
> > > > > >> > bounded as well as unbounded?
> > > > > >> >
> > > > > >> > In case of Apex, we treat all sources as unbounded sources.
> Even
> > > > > bounded
> > > > > >> > sources like HDFS file source is treated as unbounded by means
> > of
> > > > > >> scanning
> > > > > >> > the input directory repeatedly.
> > > > > >> >
> > > > > >> > Let's consider HDFS file source for example:
> > > > > >> > In this case, if we treat it as a bounded source, we can
> define
> > > > hooks
> > > > > >> which
> > > > > >> > allows us to detect the end of the file and send the "final
> > > > > watermark".
> > > > > >> We
> > > > > >> > could also consider HDFS file source as a streaming source and
> > > > define
> > > > > >> hooks
> > > > > >> > which send watermarks based on different kinds of windows.
> > > > > >> >
> > > > > >> > Please correct me if I misunderstand.
> > > > > >> >
> > > > > >> > ~ Bhupesh
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <[email protected]
> >
> > > > wrote:
> > > > > >> >
> > > > > >> > > Bhupesh,
> > > > > >> > >
> > > > > >> > > Please see how that can be solved in a unified way using
> > windows
> > > > and
> > > > > >> > > watermarks. It is bounded data vs. unbounded data. In Beam
> for
> > > > > >> example,
> > > > > >> > you
> > > > > >> > > can use the "global window" and the final watermark to
> > > accomplish
> > > > > what
> > > > > >> > you
> > > > > >> > > are looking for. Batch is just a special case of streaming
> > where
> > > > the
> > > > > >> > source
> > > > > >> > > emits the final watermark.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Thomas
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda <
> > > > > >> [email protected]
> > > > > >> > >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Yes, if the user needs to develop a batch application,
> then
> > > > batch
> > > > > >> aware
> > > > > >> > > > operators need to be used in the application.
> > > > > >> > > > The nature of the application is mostly controlled by the
> > > input
> > > > > and
> > > > > >> the
> > > > > >> > > > output operators used in the application.
> > > > > >> > > >
> > > > > >> > > > For example, consider an application which needs to filter
> > > > records
> > > > > >> in a
> > > > > >> > > > input file and store the filtered records in another file.
> > The
> > > > > >> nature
> > > > > >> > of
> > > > > >> > > > this app is to end once the entire file is processed.
> > > Following
> > > > > >> things
> > > > > >> > > are
> > > > > >> > > > expected of the application:
> > > > > >> > > >
> > > > > >> > > >    1. Once the input data is over, finalize the output
> file
> > > from
> > > > > >> .tmp
> > > > > >> > > >    files. - Responsibility of output operator
> > > > > >> > > >    2. End the application, once the data is read and
> > > processed -
> > > > > >> > > >    Responsibility of input operator
> > > > > >> > > >
> > > > > >> > > > These functions are essential to allow the user to do
> higher
> > > > level
> > > > > >> > > > operations like scheduling or running a workflow of batch
> > > > > >> applications.
> > > > > >> > > >
> > > > > >> > > > I am not sure about intermediate (processing) operators,
> as
> > > > there
> > > > > >> is no
> > > > > >> > > > change in their functionality for batch use cases.
> Perhaps,
> > > > > allowing
> > > > > >> > > > running multiple batches in a single application may
> require
> > > > > similar
> > > > > >> > > > changes in processing operators as well.
> > > > > >> > > >
> > > > > >> > > > ~ Bhupesh
> > > > > >> > > >
> > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale <
> > > > > [email protected]
> > > > > >> >
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Will it make an impression on user that, if he has a
> batch
> > > > > >> usecase he
> > > > > >> > > has
> > > > > >> > > > > to use batch aware operators only? If so, is that what
> we
> > > > > expect?
> > > > > >> I
> > > > > >> > am
> > > > > >> > > > not
> > > > > >> > > > > aware of how do we implement batch scenario so this
> might
> > > be a
> > > > > >> basic
> > > > > >> > > > > question.
> > > > > >> > > > >
> > > > > >> > > > > -Priyanka
> > > > > >> > > > >
> > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda <
> > > > > >> > > > [email protected]>
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi All,
> > > > > >> > > > > >
> > > > > >> > > > > > While design / implementation for custom control
> tuples
> > is
> > > > > >> > ongoing, I
> > > > > >> > > > > > thought it would be a good idea to consider its
> > usefulness
> > > > in
> > > > > >> one
> > > > > >> > of
> > > > > >> > > > the
> > > > > >> > > > > > use cases -  batch applications.
> > > > > >> > > > > >
> > > > > >> > > > > > This is a proposal to adapt / extend existing
> operators
> > in
> > > > the
> > > > > >> > Apache
> > > > > >> > > > > Apex
> > > > > >> > > > > > Malhar library so that it is easy to use them in batch
> > use
> > > > > >> cases.
> > > > > >> > > > > > Naturally, this would be applicable for only a subset
> of
> > > > > >> operators
> > > > > >> > > like
> > > > > >> > > > > > File, JDBC and NoSQL databases.
> > > > > >> > > > > > For example, for a file based store, (say HDFS store),
> > we
> > > > > could
> > > > > >> > have
> > > > > >> > > > > > FileBatchInput and FileBatchOutput operators which
> allow
> > > > easy
> > > > > >> > > > integration
> > > > > >> > > > > > into a batch application. These operators would be
> > > extended
> > > > > from
> > > > > >> > > their
> > > > > >> > > > > > existing implementations and would be "Batch Aware",
> in
> > > that
> > > > > >> they
> > > > > >> > may
> > > > > >> > > > > > understand the meaning of some specific control tuples
> > > that
> > > > > flow
> > > > > >> > > > through
> > > > > >> > > > > > the DAG. Start batch and end batch seem to be the
> > obvious
> > > > > >> > candidates
> > > > > >> > > > that
> > > > > >> > > > > > come to mind. On receipt of such control tuples, they
> > may
> > > > try
> > > > > to
> > > > > >> > > modify
> > > > > >> > > > > the
> > > > > >> > > > > > behavior of the operator - to reinitialize some
> metrics
> > or
> > > > > >> finalize
> > > > > >> > > an
> > > > > >> > > > > > output file for example.
> > > > > >> > > > > >
> > > > > >> > > > > > We can discuss the potential control tuples and
> actions
> > in
> > > > > >> detail,
> > > > > >> > > but
> > > > > >> > > > > > first I would like to understand the views of the
> > > community
> > > > > for
> > > > > >> > this
> > > > > >> > > > > > proposal.
> > > > > >> > > > > >
> > > > > >> > > > > > ~ Bhupesh
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to