Hi Thomas,

A comment on following in your previous mails:



*An operator that identifies the batch boundary tells theengine about it
and corresponding control tuples are submitted through thestream, leading
to callbacks on downstream operators*

This would mean there will be a single boundary definition of a batch in
the application DAG.
I think we should give freedom to individual operator to define what a
batch is and produce a callbacks accordingly.

Considering that in mind, here is a quick sketch/suggestion of how it can
be done:

1) The operator that needs to work on a batch can implement an interface,
lets say BatchListener.

2) This will have 4 methods:
*    startBatch*
*    endBatch*
*    configureBatch*
*    callAtApplicationWindowBoundary *(maybe some better name??)

3) *configureBatch* will tell what tell what is the boundary of a batch.
This will be called right after setup OR activate, basically before
beginning of the stream. The return value will be set with operator thread.

4) Based on configuration, the *startBatch* and *endBatch* will be called.

5) the *callAtApplicationWindowBoundary* should return *true/false* based
on whether start/end batch calls should happen at application window
boundary OR not. Here is where user can choose to take care of
checkpointing of tuples within a windows by platform OR whether user wants
to do that of his own.


Thoughts?


-Chinmay.


~ Chinmay.

On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <[email protected]>
wrote:

> On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh <[email protected]
> >
> wrote:
>
> > +1 for batch support in Apex. I would be interested to be part of this
> > work.
> >
> > I would like to start with basics and would like to know how one will
> > define "batch" in Apex context. Which of the following cases would be
> > supported under batch:
> >
> >    1. A program completes a task and auto shutdown itself once the task
> is
> >    complete. E.g.  the program needs to copy a set of files from source
> to
> >    destination.
> >    2. A program completes a task and then waits for pre-defined time to
> >    poll for something more to work on. E.g. the program copies all the
> > files
> >    from source location and then periodically checks, say every 1 hour,
> if
> >    there are new files at the source and copies them.
> >    3. A program completes a task and then polls every 1 hr as in case 2
> but
> >    releases resources during wait time.
> >
> >
>
> Yes, both, 1. and 2. are valid use cases. I would not make a further
> distinction between 2. and 3. at this point.
>
> Ability to run an application that expands and shrinks as self contained
> unit can be a benefit, as otherwise you need an external scheduler just to
> launch jobs (such as Oozie). The associated extra integration work may be
> brittle and an unwanted barrier for certain use cases.
>
>
>
> > Needs for each of the above will vary. I am putting down some basic
> > requirements for each of them
> >
> > 1. This case will need a mechanism to shutdown automatically on
> completion
> > of the task.
> >
> > StartProgram()
> >     StartBatch()
> >         Streaming Application starts, runs and finishes
> >     EndBatch()
> > EndProgram()
> >
> > 2. This will simply need a construct to wait for some time ( say 10
> > minutes) or till some time ( till 1pm) .
> >
> > StartProgram()
> > while(true)
> > {
> >     StartBatch()
> >         Streaming Application starts, runs and finishes
> >     EndBatch()
> >     WaitTill(time) or WaitFor(timeperiod)
> > }
> > EndProgram()
> >
> > 3. Apart from wait construct, we also need release resources support
> >
> > StartProgram()
> > while(true)
> > {
> >     RestartFromSavedState() // if any state is saved previously.
> >     StartBatch()
> >         Streaming Application starts, runs and finishes
> >     EndBatch()
> >     SaveState()
> >     RelaseResources()
> >     WaitTill(time) or WaitFor(timeperiod)
> > }
> > EndProgram()
> >
> >
> > All the constructs : waitTime(), RestartFromSavedState(), SaveState()
> > , RelaseResources()
> > could be very well be part of StartBatch() or EndBatch(). I have put them
> > separately for clear understanding only.
> >
> > Another point to think on would be scheduler. A batch job is generally
> > triggered as a cron job. Do we still see Apex jobs being triggered by
> cron
> > or would like to include a scheduler within Apex that will trigger jobs
> > based on time or on some external trigger or even polling for events.
> >
> > Regards
> > Sandeep
> >
> > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda <[email protected]
> >
> > wrote:
> >
> > > +1
> > >
> > > I think in the batch case, application windows may be transparent to
> the
> > > user application / operator logic.  A batch can be thought of as one
> > > instantiation of a Apex Dag, from setup() to teardown() for all
> > operators.
> > > May be we need to define a higher level API which encapsulates a
> > streaming
> > > application.
> > > Something like:
> > >
> > > StartBatch()
> > >   Streaming Application starts, runs and finishes
> > > EndBatch()
> > >
> > > The streaming application will run transparently with all the
> windowing /
> > > checkpointing logic that it currently does. Checkpointing large amounts
> > of
> > > data may be avoided by either checkpointing at large intervals or even
> > > disabling checkpointing for the batch job.
> > > Additionally, the external trigger (existence of some file etc. ) can
> be
> > > controlled by the StartBatch() and EndBatch() calls. In all the batch
> use
> > > cases, it is usually the case that once the input is processed
> > completely,
> > > the batch is done. Example: In map reduce all splits processed means
> > batch
> > > job is done. Similar primitives can be supported by Apex in order to
> > > facilitate the control management in the StartBatch() and EndBatch()
> > > methods.
> > >
> > > -Bhupesh
> > >
> > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise <[email protected]>
> > > wrote:
> > >
> > > > Following JIRA is open to enhance the support for batch:
> > > >
> > > > https://issues.apache.org/jira/browse/APEXCORE-235
> > > >
> > > > One of the challenges with batch on Apex today is that there isn't
> any
> > > > native support to identify begin/end of batch and associate actions
> to
> > > it.
> > > > For example, at the beginning we may want to fetch some data needed
> for
> > > all
> > > > subsequent processing and at the end perform some finalization action
> > or
> > > > push to external system (add partition to Hive table or similar).
> > > >
> > > > Absent native support, the workaround is to add a bunch of ports and
> > > extra
> > > > operators for propagation and synchronization purposes, which makes
> > > > building the batch application with standard operators or development
> > of
> > > > custom operators rather difficult and inefficient.
> > > >
> > > > The span of a batch can also be seen as a user defined window, with
> > logic
> > > > for begin and end. The current "application window" support is
> limited
> > > to a
> > > > multiple of streaming window on a per operator basis. In the batch
> > case,
> > > > the boundary needs to be more flexible - user code needs to be able
> to
> > > > determine begin/endWindow based on external data (existence of files
> > > etc.).
> > > >
> > > > There is another commonality with application window, and that's
> > > alignment
> > > > of checkpointing. For batches where it is more efficient to redo the
> > > > processing instead of checkpointing potentially large amounts of
> > > > intermediate state for incremental recovery, it would be nice to be
> > able
> > > to
> > > > say "user window == checkpoint interval".
> > > >
> > > > This is to float the idea of having a window control that can be
> > > influenced
> > > > by user code. An operator that identifies the batch boundary tells
> the
> > > > engine about it and corresponding control tuples are submitted
> through
> > > the
> > > > stream, leading to callbacks on downstream operators. These control
> > > > tuples should
> > > > be able to carry contextual information that can be used in
> downstream
> > > > operator logic (file names, schema information etc.)
> > > >
> > > > I don't expect the current beginWindow/endWindow can be augmented in
> a
> > > > backward compatible way to accommodate this, but a similar optional
> > > > interface could be supported to enable batch aware operators and
> > > > checkpointing optimization.
> > > >
> > > > Thoughts?
> > > >
> > >
> >
>

Reply via email to