Time to resume this discussion. I think it makes sense to look at the batch as execution of a DAG from setup to teardown for all its operators, as suggested by Bhupesh and Sandeep. The DAG comes into existence when the batch begins and terminates when it is done.
We have also seen from customers that there is demand for having the scheduler function built in, when there is no external component already present. For example, a file or set of files could be identified as "batch". As the application is idle, there is only a scheduler operator which polls for files. Once work is ready, that operator would launch the DAG for processing (within same application, but not connected through stream). When processing is complete, that DAG terminates and returns the resources. As discussed, there is the need to be able to turn off checkpointing, which is different from setting a large checkpoint window. No checkpointing means no incremental recovery and hence no need to keep data in buffers. There is also the need to relay begin/end signal through the entire DAG. This is different from setup/shutdown. It is more like begin/endWindow, but there is only a single "window" in a batch. On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar <[email protected] > wrote: > Hi Thomas, > > A comment on following in your previous mails: > > > > *An operator that identifies the batch boundary tells theengine about it > and corresponding control tuples are submitted through thestream, leading > to callbacks on downstream operators* > > This would mean there will be a single boundary definition of a batch in > the application DAG. > I think we should give freedom to individual operator to define what a > batch is and produce a callbacks accordingly. > > Considering that in mind, here is a quick sketch/suggestion of how it can > be done: > > 1) The operator that needs to work on a batch can implement an interface, > lets say BatchListener. > > 2) This will have 4 methods: > * startBatch* > * endBatch* > * configureBatch* > * callAtApplicationWindowBoundary *(maybe some better name??) > > 3) *configureBatch* will tell what tell what is the boundary of a batch. > This will be called right after setup OR activate, basically before > beginning of the stream. The return value will be set with operator thread. > > 4) Based on configuration, the *startBatch* and *endBatch* will be called. > > 5) the *callAtApplicationWindowBoundary* should return *true/false* based > on whether start/end batch calls should happen at application window > boundary OR not. Here is where user can choose to take care of > checkpointing of tuples within a windows by platform OR whether user wants > to do that of his own. > > > Thoughts? > > > -Chinmay. > > > ~ Chinmay. > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <[email protected]> > wrote: > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh < > [email protected] > > > > > wrote: > > > > > +1 for batch support in Apex. I would be interested to be part of this > > > work. > > > > > > I would like to start with basics and would like to know how one will > > > define "batch" in Apex context. Which of the following cases would be > > > supported under batch: > > > > > > 1. A program completes a task and auto shutdown itself once the task > > is > > > complete. E.g. the program needs to copy a set of files from source > > to > > > destination. > > > 2. A program completes a task and then waits for pre-defined time to > > > poll for something more to work on. E.g. the program copies all the > > > files > > > from source location and then periodically checks, say every 1 hour, > > if > > > there are new files at the source and copies them. > > > 3. A program completes a task and then polls every 1 hr as in case 2 > > but > > > releases resources during wait time. > > > > > > > > > > Yes, both, 1. and 2. are valid use cases. I would not make a further > > distinction between 2. and 3. at this point. > > > > Ability to run an application that expands and shrinks as self contained > > unit can be a benefit, as otherwise you need an external scheduler just > to > > launch jobs (such as Oozie). The associated extra integration work may be > > brittle and an unwanted barrier for certain use cases. > > > > > > > > > Needs for each of the above will vary. I am putting down some basic > > > requirements for each of them > > > > > > 1. This case will need a mechanism to shutdown automatically on > > completion > > > of the task. > > > > > > StartProgram() > > > StartBatch() > > > Streaming Application starts, runs and finishes > > > EndBatch() > > > EndProgram() > > > > > > 2. This will simply need a construct to wait for some time ( say 10 > > > minutes) or till some time ( till 1pm) . > > > > > > StartProgram() > > > while(true) > > > { > > > StartBatch() > > > Streaming Application starts, runs and finishes > > > EndBatch() > > > WaitTill(time) or WaitFor(timeperiod) > > > } > > > EndProgram() > > > > > > 3. Apart from wait construct, we also need release resources support > > > > > > StartProgram() > > > while(true) > > > { > > > RestartFromSavedState() // if any state is saved previously. > > > StartBatch() > > > Streaming Application starts, runs and finishes > > > EndBatch() > > > SaveState() > > > RelaseResources() > > > WaitTill(time) or WaitFor(timeperiod) > > > } > > > EndProgram() > > > > > > > > > All the constructs : waitTime(), RestartFromSavedState(), SaveState() > > > , RelaseResources() > > > could be very well be part of StartBatch() or EndBatch(). I have put > them > > > separately for clear understanding only. > > > > > > Another point to think on would be scheduler. A batch job is generally > > > triggered as a cron job. Do we still see Apex jobs being triggered by > > cron > > > or would like to include a scheduler within Apex that will trigger jobs > > > based on time or on some external trigger or even polling for events. > > > > > > Regards > > > Sandeep > > > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda < > [email protected] > > > > > > wrote: > > > > > > > +1 > > > > > > > > I think in the batch case, application windows may be transparent to > > the > > > > user application / operator logic. A batch can be thought of as one > > > > instantiation of a Apex Dag, from setup() to teardown() for all > > > operators. > > > > May be we need to define a higher level API which encapsulates a > > > streaming > > > > application. > > > > Something like: > > > > > > > > StartBatch() > > > > Streaming Application starts, runs and finishes > > > > EndBatch() > > > > > > > > The streaming application will run transparently with all the > > windowing / > > > > checkpointing logic that it currently does. Checkpointing large > amounts > > > of > > > > data may be avoided by either checkpointing at large intervals or > even > > > > disabling checkpointing for the batch job. > > > > Additionally, the external trigger (existence of some file etc. ) can > > be > > > > controlled by the StartBatch() and EndBatch() calls. In all the batch > > use > > > > cases, it is usually the case that once the input is processed > > > completely, > > > > the batch is done. Example: In map reduce all splits processed means > > > batch > > > > job is done. Similar primitives can be supported by Apex in order to > > > > facilitate the control management in the StartBatch() and EndBatch() > > > > methods. > > > > > > > > -Bhupesh > > > > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise < > [email protected]> > > > > wrote: > > > > > > > > > Following JIRA is open to enhance the support for batch: > > > > > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235 > > > > > > > > > > One of the challenges with batch on Apex today is that there isn't > > any > > > > > native support to identify begin/end of batch and associate actions > > to > > > > it. > > > > > For example, at the beginning we may want to fetch some data needed > > for > > > > all > > > > > subsequent processing and at the end perform some finalization > action > > > or > > > > > push to external system (add partition to Hive table or similar). > > > > > > > > > > Absent native support, the workaround is to add a bunch of ports > and > > > > extra > > > > > operators for propagation and synchronization purposes, which makes > > > > > building the batch application with standard operators or > development > > > of > > > > > custom operators rather difficult and inefficient. > > > > > > > > > > The span of a batch can also be seen as a user defined window, with > > > logic > > > > > for begin and end. The current "application window" support is > > limited > > > > to a > > > > > multiple of streaming window on a per operator basis. In the batch > > > case, > > > > > the boundary needs to be more flexible - user code needs to be able > > to > > > > > determine begin/endWindow based on external data (existence of > files > > > > etc.). > > > > > > > > > > There is another commonality with application window, and that's > > > > alignment > > > > > of checkpointing. For batches where it is more efficient to redo > the > > > > > processing instead of checkpointing potentially large amounts of > > > > > intermediate state for incremental recovery, it would be nice to be > > > able > > > > to > > > > > say "user window == checkpoint interval". > > > > > > > > > > This is to float the idea of having a window control that can be > > > > influenced > > > > > by user code. An operator that identifies the batch boundary tells > > the > > > > > engine about it and corresponding control tuples are submitted > > through > > > > the > > > > > stream, leading to callbacks on downstream operators. These control > > > > > tuples should > > > > > be able to carry contextual information that can be used in > > downstream > > > > > operator logic (file names, schema information etc.) > > > > > > > > > > I don't expect the current beginWindow/endWindow can be augmented > in > > a > > > > > backward compatible way to accommodate this, but a similar optional > > > > > interface could be supported to enable batch aware operators and > > > > > checkpointing optimization. > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > >
