Maybe we should take this to the Beam mailing list and see what people
think how this problem can be solved using watermarks and windowing? I
think we will get some good suggestions.

David

On Tue, Feb 28, 2017 at 8:17 AM, Thomas Weise <t...@apache.org> wrote:

> I think that discussion is related, but in our example we have many keys
> that belong to a file that all fall into a common window boundary.
>
> WRT the naming of the window ("sequence" etc.), it depends on the use case
> and the operators should be kept generic. It could be a sequence that is
> generated, derived from streaming window, from event data etc.
>
> Thanks,
> Thomas
>
>
> On Tue, Feb 28, 2017 at 7:57 AM, David Yan <david...@gmail.com> wrote:
>
> > There is a discussion in the Flink mailing list about key-based
> watermarks.
> > I think it's relevant to our use case here.
> > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
> > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
> >
> > David
> >
> > On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > Hi David,
> > >
> > > If using time window does not seem appropriate, we can have another
> class
> > > which is more suited for such sequential and distinct windows.
> Perhaps, a
> > > CustomWindow option can be introduced which takes in a window id. The
> > > purpose of this window option could be to translate the window id into
> > > appropriate timestamps.
> > >
> > > Another option would be to go with a custom timestampExtractor for such
> > > tuples which translates the each unique file name to a distinct
> timestamp
> > > while using time windows in the windowed operator.
> > >
> > > ~ Bhupesh
> > >
> > >
> > > _______________________________________________________
> > >
> > > Bhupesh Chawda
> > >
> > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >
> > > www.datatorrent.com  |  apex.apache.org
> > >
> > >
> > >
> > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com>
> wrote:
> > >
> > > > I now see your rationale on putting the filename in the window.
> > > > As far as I understand, the reasons why the filename is not part of
> the
> > > key
> > > > and the Global Window is not used are:
> > > >
> > > > 1) The files are processed in sequence, not in parallel
> > > > 2) The windowed operator should not keep the state associated with
> the
> > > file
> > > > when the processing of the file is done
> > > > 3) The trigger should be fired for the file when a file is done
> > > processing.
> > > >
> > > > However, if the file is just a sequence has nothing to do with a
> > > timestamp,
> > > > assigning a timestamp to a file is not an intuitive thing to do and
> > would
> > > > just create confusions to the users, especially when it's used as an
> > > > example for new users.
> > > >
> > > > How about having a separate class called SequenceWindow? And perhaps
> > > > TimeWindow can inherit from it?
> > > >
> > > > David
> > > >
> > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org>
> wrote:
> > > >
> > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
> > > bhup...@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > I think my comments related to count based windows might be
> causing
> > > > > > confusion. Let's not discuss count based scenarios for now.
> > > > > >
> > > > > > Just want to make sure we are on the same page wrt. the "each
> file
> > > is a
> > > > > > batch" use case. As mentioned by Thomas, the each tuple from the
> > same
> > > > > file
> > > > > > has the same timestamp (which is just a sequence number) and that
> > > helps
> > > > > > keep tuples from each file in a separate window.
> > > > > >
> > > > >
> > > > > Yes, in this case it is a sequence number, but it could be a time
> > stamp
> > > > > also, depending on the file naming convention. And if it was event
> > time
> > > > > processing, the watermark would be derived from records within the
> > > file.
> > > > >
> > > > > Agreed, the source should have a mechanism to control the time
> stamp
> > > > > extraction along with everything else pertaining to the watermark
> > > > > generation.
> > > > >
> > > > >
> > > > > > We could also implement a "timestampExtractor" interface to
> > identify
> > > > the
> > > > > > timestamp (sequence number) for a file.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > >
> > > > > > _______________________________________________________
> > > > > >
> > > > > > Bhupesh Chawda
> > > > > >
> > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > > > > >
> > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >
> > > > > > > I don't think this is a use case for count based window.
> > > > > > >
> > > > > > > We have multiple files that are retrieved in a sequence and
> there
> > > is
> > > > no
> > > > > > > knowledge of the number of records per file. The requirement is
> > to
> > > > > > > aggregate each file separately and emit the aggregate when the
> > file
> > > > is
> > > > > > read
> > > > > > > fully. There is no concept of "end of something" for an
> > individual
> > > > key
> > > > > > and
> > > > > > > global window isn't applicable.
> > > > > > >
> > > > > > > However, as already explained and implemented by Bhupesh, this
> > can
> > > be
> > > > > > > solved using watermark and window (in this case the window
> > > timestamp
> > > > > > isn't
> > > > > > > a timestamp, but a file sequence, but that doesn't matter.
> > > > > > >
> > > > > > > Thomas
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan <david...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > I don't think this is the way to go. Global Window only means
> > the
> > > > > > > timestamp
> > > > > > > > does not matter (or that there is no timestamp). It does not
> > > > > > necessarily
> > > > > > > > mean it's a large batch. Unless there is some notion of event
> > > time
> > > > > for
> > > > > > > each
> > > > > > > > file, you don't want to embed the file into the window
> itself.
> > > > > > > >
> > > > > > > > If you want the result broken up by file name, and if the
> files
> > > are
> > > > > to
> > > > > > be
> > > > > > > > processed in parallel, I think making the file name be part
> of
> > > the
> > > > > key
> > > > > > is
> > > > > > > > the way to go. I think it's very confusing if we somehow make
> > the
> > > > > file
> > > > > > to
> > > > > > > > be part of the window.
> > > > > > > >
> > > > > > > > For count-based window, it's not implemented yet and you're
> > > welcome
> > > > > to
> > > > > > > add
> > > > > > > > that feature. In case of count-based windows, there would be
> no
> > > > > notion
> > > > > > of
> > > > > > > > time and you probably only trigger at the end of each window.
> > In
> > > > the
> > > > > > case
> > > > > > > > of count-based windows, the watermark only matters for batch
> > > since
> > > > > you
> > > > > > > need
> > > > > > > > a way to know when the batch has ended (if the count is 10,
> the
> > > > > number
> > > > > > of
> > > > > > > > tuples in the batch is let's say 105, you need a way to end
> the
> > > > last
> > > > > > > window
> > > > > > > > with 5 tuples).
> > > > > > > >
> > > > > > > > David
> > > > > > > >
> > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
> > > > > > bhup...@datatorrent.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi David,
> > > > > > > > >
> > > > > > > > > Thanks for your comments.
> > > > > > > > >
> > > > > > > > > The wordcount example that I created based on the windowed
> > > > operator
> > > > > > > does
> > > > > > > > > processing of word counts per file (each file as a separate
> > > > batch),
> > > > > > > i.e.
> > > > > > > > > process counts for each file and dump into separate files.
> > > > > > > > > As I understand Global window is for one large batch; i.e.
> > all
> > > > > > incoming
> > > > > > > > > data falls into the same batch. This could not be processed
> > > using
> > > > > > > > > GlobalWindow option as we need more than one windows. In
> this
> > > > > case, I
> > > > > > > > > configured the windowed operator to have time windows of
> 1ms
> > > each
> > > > > and
> > > > > > > > > passed data for each file with increasing timestamps:
> (file1,
> > > 1),
> > > > > > > (file2,
> > > > > > > > > 2) and so on. Is there a better way of handling this
> > scenario?
> > > > > > > > >
> > > > > > > > > Regarding (2 - count based windows), I think there is a
> > trigger
> > > > > > option
> > > > > > > to
> > > > > > > > > process count based windows. In case I want to process
> every
> > > 1000
> > > > > > > tuples
> > > > > > > > as
> > > > > > > > > a batch, I could set the Trigger option to CountTrigger
> with
> > > the
> > > > > > > > > accumulation set to Discarding. Is this correct?
> > > > > > > > >
> > > > > > > > > I agree that (4. Final Watermark) can be done using Global
> > > > window.
> > > > > > > > >
> > > > > > > > > ​~ Bhupesh​
> > > > > > > > >
> > > > > > > > > _______________________________________________________
> > > > > > > > >
> > > > > > > > > Bhupesh Chawda
> > > > > > > > >
> > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > > > > > > > >
> > > > > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
> > > david...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I'm worried that we are making the watermark concept too
> > > > > > complicated.
> > > > > > > > > >
> > > > > > > > > > Watermarks should simply just tell you what windows can
> be
> > > > > > considered
> > > > > > > > > > complete.
> > > > > > > > > >
> > > > > > > > > > Point 2 is basically a count-based window. Watermarks do
> > not
> > > > > play a
> > > > > > > > role
> > > > > > > > > > here because the window is always complete at the n-th
> > tuple.
> > > > > > > > > >
> > > > > > > > > > If I understand correctly, point 3 is for batch
> processing
> > of
> > > > > > files.
> > > > > > > > > Unless
> > > > > > > > > > the files contain timed events, it sounds to be that this
> > can
> > > > be
> > > > > > > > achieved
> > > > > > > > > > with just a Global Window. For signaling EOF, a watermark
> > > with
> > > > a
> > > > > > > > > +infinity
> > > > > > > > > > timestamp can be used so that triggers will be fired upon
> > > > receipt
> > > > > > of
> > > > > > > > that
> > > > > > > > > > watermark.
> > > > > > > > > >
> > > > > > > > > > For point 4, just like what I mentioned above, can be
> > > achieved
> > > > > > with a
> > > > > > > > > > watermark with a +infinity timestamp.
> > > > > > > > > >
> > > > > > > > > > David
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> > > > > > > > bhup...@datatorrent.com
> > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Thomas,
> > > > > > > > > > >
> > > > > > > > > > > For an input operator which is supposed to generate
> > > > watermarks
> > > > > > for
> > > > > > > > > > > downstream operators, I can think about the following
> > > > > watermarks
> > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > operator can emit:
> > > > > > > > > > > 1. Time based watermarks (the high watermark / low
> > > watermark)
> > > > > > > > > > > 2. Number of tuple based watermarks (Every n tuples)
> > > > > > > > > > > 3. File based watermarks (Start file, end file)
> > > > > > > > > > > 4. Final watermark
> > > > > > > > > > >
> > > > > > > > > > > File based watermarks seem to be applicable for batch
> > (file
> > > > > > based)
> > > > > > > as
> > > > > > > > > > well,
> > > > > > > > > > > and hence I thought of looking at these first. Does
> this
> > > seem
> > > > > to
> > > > > > be
> > > > > > > > in
> > > > > > > > > > line
> > > > > > > > > > > with the thought process?
> > > > > > > > > > >
> > > > > > > > > > > ~ Bhupesh
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > ______________________________
> _________________________
> > > > > > > > > > >
> > > > > > > > > > > Bhupesh Chawda
> > > > > > > > > > >
> > > > > > > > > > > Software Engineer
> > > > > > > > > > >
> > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > > > > > > > > > >
> > > > > > > > > > > www.datatorrent.com  |  apex.apache.org
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
> > > > t...@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > I don't think this should be designed based on a
> > > simplistic
> > > > > > file
> > > > > > > > > > > > input-output scenario. It would be good to include a
> > > > stateful
> > > > > > > > > > > > transformation based on event time.
> > > > > > > > > > > >
> > > > > > > > > > > > More complex pipelines contain stateful
> transformations
> > > > that
> > > > > > > depend
> > > > > > > > > on
> > > > > > > > > > > > windowing and watermarks. I think we need a watermark
> > > > concept
> > > > > > > that
> > > > > > > > is
> > > > > > > > > > > based
> > > > > > > > > > > > on progress in event time (or other monotonic
> > increasing
> > > > > > > sequence)
> > > > > > > > > that
> > > > > > > > > > > > other operators can generically work with.
> > > > > > > > > > > >
> > > > > > > > > > > > Note that even file input in many cases can produce
> > time
> > > > > based
> > > > > > > > > > > watermarks,
> > > > > > > > > > > > for example when you read part files that are bound
> by
> > > > event
> > > > > > > time.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Thomas
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > > > > > > > > > bhup...@datatorrent.com
> > > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > For better understanding the use case for control
> > > tuples
> > > > in
> > > > > > > > batch,
> > > > > > > > > ​I
> > > > > > > > > > > am
> > > > > > > > > > > > > creating a prototype for a batch application using
> > File
> > > > > Input
> > > > > > > and
> > > > > > > > > > File
> > > > > > > > > > > > > Output operators.
> > > > > > > > > > > > >
> > > > > > > > > > > > > To enable basic batch processing for File IO
> > > operators, I
> > > > > am
> > > > > > > > > > proposing
> > > > > > > > > > > > the
> > > > > > > > > > > > > following changes to File input and output
> operators:
> > > > > > > > > > > > > 1. File Input operator emits a watermark each time
> it
> > > > opens
> > > > > > and
> > > > > > > > > > closes
> > > > > > > > > > > a
> > > > > > > > > > > > > file. These can be "start file" and "end file"
> > > watermarks
> > > > > > which
> > > > > > > > > > include
> > > > > > > > > > > > the
> > > > > > > > > > > > > corresponding file names. The "start file" tuple
> > should
> > > > be
> > > > > > sent
> > > > > > > > > > before
> > > > > > > > > > > > any
> > > > > > > > > > > > > of the data from that file flows.
> > > > > > > > > > > > > 2. File Input operator can be configured to end the
> > > > > > application
> > > > > > > > > > after a
> > > > > > > > > > > > > single or n scans of the directory (a batch). This
> is
> > > > where
> > > > > > the
> > > > > > > > > > > operator
> > > > > > > > > > > > > emits the final watermark (the end of application
> > > control
> > > > > > > tuple).
> > > > > > > > > > This
> > > > > > > > > > > > will
> > > > > > > > > > > > > also shutdown the application.
> > > > > > > > > > > > > 3. The File output operator handles these control
> > > tuples.
> > > > > > > "Start
> > > > > > > > > > file"
> > > > > > > > > > > > > initializes the file name for the incoming tuples.
> > "End
> > > > > file"
> > > > > > > > > > watermark
> > > > > > > > > > > > > forces a finalize on that file.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The user would be able to enable the operators to
> > send
> > > > only
> > > > > > > those
> > > > > > > > > > > > > watermarks that are needed in the application. If
> > none
> > > of
> > > > > the
> > > > > > > > > options
> > > > > > > > > > > are
> > > > > > > > > > > > > configured, the operators behave as in a streaming
> > > > > > application.
> > > > > > > > > > > > >
> > > > > > > > > > > > > There are a few challenges in the implementation
> > where
> > > > the
> > > > > > > input
> > > > > > > > > > > operator
> > > > > > > > > > > > > is partitioned. In this case, the correlation
> between
> > > the
> > > > > > > > start/end
> > > > > > > > > > > for a
> > > > > > > > > > > > > file and the data tuples for that file is lost.
> Hence
> > > we
> > > > > need
> > > > > > > to
> > > > > > > > > > > maintain
> > > > > > > > > > > > > the filename as part of each tuple in the pipeline.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The "start file" and "end file" control tuples in
> > this
> > > > > > example
> > > > > > > > are
> > > > > > > > > > > > > temporary names for watermarks. We can have generic
> > > > "start
> > > > > > > > batch" /
> > > > > > > > > > > "end
> > > > > > > > > > > > > batch" tuples which could be used for other use
> cases
> > > as
> > > > > > well.
> > > > > > > > The
> > > > > > > > > > > Final
> > > > > > > > > > > > > watermark is common and serves the same purpose in
> > each
> > > > > case.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Please let me know your thoughts on this.
> > > > > > > > > > > > >
> > > > > > > > > > > > > ~ Bhupesh
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > > > > > > > > > bhup...@datatorrent.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Yes, this can be part of operator configuration.
> > > Given
> > > > > > this,
> > > > > > > > for
> > > > > > > > > a
> > > > > > > > > > > user
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > define a batch application, would mean
> configuring
> > > the
> > > > > > > > connectors
> > > > > > > > > > > > (mostly
> > > > > > > > > > > > > > the input operator) in the application for the
> > > desired
> > > > > > > > behavior.
> > > > > > > > > > > > > Similarly,
> > > > > > > > > > > > > > there can be other use cases that can be achieved
> > > other
> > > > > > than
> > > > > > > > > batch.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We may also need to take care of the following:
> > > > > > > > > > > > > > 1. Make sure that the watermarks or control
> tuples
> > > are
> > > > > > > > consistent
> > > > > > > > > > > > across
> > > > > > > > > > > > > > sources. Meaning an HDFS sink should be able to
> > > > interpret
> > > > > > the
> > > > > > > > > > > watermark
> > > > > > > > > > > > > > tuple sent out by, say, a JDBC source.
> > > > > > > > > > > > > > 2. In addition to I/O connectors, we should also
> > look
> > > > at
> > > > > > the
> > > > > > > > need
> > > > > > > > > > for
> > > > > > > > > > > > > > processing operators to understand some of the
> > > control
> > > > > > > tuples /
> > > > > > > > > > > > > watermarks.
> > > > > > > > > > > > > > For example, we may want to reset the operator
> > > behavior
> > > > > on
> > > > > > > > > arrival
> > > > > > > > > > of
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > watermark tuple.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > ~ Bhupesh
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <
> > > > > > > t...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> The HDFS source can operate in two modes,
> bounded
> > or
> > > > > > > > unbounded.
> > > > > > > > > If
> > > > > > > > > > > you
> > > > > > > > > > > > > >> scan
> > > > > > > > > > > > > >> only once, then it should emit the final
> watermark
> > > > after
> > > > > > it
> > > > > > > is
> > > > > > > > > > done.
> > > > > > > > > > > > > >> Otherwise it would emit watermarks based on a
> > policy
> > > > > > (files
> > > > > > > > > names
> > > > > > > > > > > > etc.).
> > > > > > > > > > > > > >> The mechanism to generate the marks may depend
> on
> > > the
> > > > > type
> > > > > > > of
> > > > > > > > > > source
> > > > > > > > > > > > and
> > > > > > > > > > > > > >> the user needs to be able to influence/configure
> > it.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thomas
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda
> <
> > > > > > > > > > > > > bhup...@datatorrent.com>
> > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hi Thomas,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > I am not sure that I completely understand
> your
> > > > > > > suggestion.
> > > > > > > > > Are
> > > > > > > > > > > you
> > > > > > > > > > > > > >> > suggesting to broaden the scope of the
> proposal
> > to
> > > > > treat
> > > > > > > all
> > > > > > > > > > > sources
> > > > > > > > > > > > > as
> > > > > > > > > > > > > >> > bounded as well as unbounded?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > In case of Apex, we treat all sources as
> > unbounded
> > > > > > > sources.
> > > > > > > > > Even
> > > > > > > > > > > > > bounded
> > > > > > > > > > > > > >> > sources like HDFS file source is treated as
> > > > unbounded
> > > > > by
> > > > > > > > means
> > > > > > > > > > of
> > > > > > > > > > > > > >> scanning
> > > > > > > > > > > > > >> > the input directory repeatedly.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Let's consider HDFS file source for example:
> > > > > > > > > > > > > >> > In this case, if we treat it as a bounded
> > source,
> > > we
> > > > > can
> > > > > > > > > define
> > > > > > > > > > > > hooks
> > > > > > > > > > > > > >> which
> > > > > > > > > > > > > >> > allows us to detect the end of the file and
> send
> > > the
> > > > > > > "final
> > > > > > > > > > > > > watermark".
> > > > > > > > > > > > > >> We
> > > > > > > > > > > > > >> > could also consider HDFS file source as a
> > > streaming
> > > > > > source
> > > > > > > > and
> > > > > > > > > > > > define
> > > > > > > > > > > > > >> hooks
> > > > > > > > > > > > > >> > which send watermarks based on different kinds
> > of
> > > > > > windows.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Please correct me if I misunderstand.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > ~ Bhupesh
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise
> <
> > > > > > > > t...@apache.org
> > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > Bhupesh,
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Please see how that can be solved in a
> unified
> > > way
> > > > > > using
> > > > > > > > > > windows
> > > > > > > > > > > > and
> > > > > > > > > > > > > >> > > watermarks. It is bounded data vs. unbounded
> > > data.
> > > > > In
> > > > > > > Beam
> > > > > > > > > for
> > > > > > > > > > > > > >> example,
> > > > > > > > > > > > > >> > you
> > > > > > > > > > > > > >> > > can use the "global window" and the final
> > > > watermark
> > > > > to
> > > > > > > > > > > accomplish
> > > > > > > > > > > > > what
> > > > > > > > > > > > > >> > you
> > > > > > > > > > > > > >> > > are looking for. Batch is just a special
> case
> > of
> > > > > > > streaming
> > > > > > > > > > where
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > source
> > > > > > > > > > > > > >> > > emits the final watermark.
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > > >> > > Thomas
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh
> > Chawda
> > > <
> > > > > > > > > > > > > >> bhup...@datatorrent.com
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a batch
> > > > > > application,
> > > > > > > > > then
> > > > > > > > > > > > batch
> > > > > > > > > > > > > >> aware
> > > > > > > > > > > > > >> > > > operators need to be used in the
> > application.
> > > > > > > > > > > > > >> > > > The nature of the application is mostly
> > > > controlled
> > > > > > by
> > > > > > > > the
> > > > > > > > > > > input
> > > > > > > > > > > > > and
> > > > > > > > > > > > > >> the
> > > > > > > > > > > > > >> > > > output operators used in the application.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > For example, consider an application which
> > > needs
> > > > > to
> > > > > > > > filter
> > > > > > > > > > > > records
> > > > > > > > > > > > > >> in a
> > > > > > > > > > > > > >> > > > input file and store the filtered records
> in
> > > > > another
> > > > > > > > file.
> > > > > > > > > > The
> > > > > > > > > > > > > >> nature
> > > > > > > > > > > > > >> > of
> > > > > > > > > > > > > >> > > > this app is to end once the entire file is
> > > > > > processed.
> > > > > > > > > > > Following
> > > > > > > > > > > > > >> things
> > > > > > > > > > > > > >> > > are
> > > > > > > > > > > > > >> > > > expected of the application:
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > >    1. Once the input data is over,
> finalize
> > > the
> > > > > > output
> > > > > > > > > file
> > > > > > > > > > > from
> > > > > > > > > > > > > >> .tmp
> > > > > > > > > > > > > >> > > >    files. - Responsibility of output
> > operator
> > > > > > > > > > > > > >> > > >    2. End the application, once the data
> is
> > > read
> > > > > and
> > > > > > > > > > > processed -
> > > > > > > > > > > > > >> > > >    Responsibility of input operator
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > These functions are essential to allow the
> > > user
> > > > to
> > > > > > do
> > > > > > > > > higher
> > > > > > > > > > > > level
> > > > > > > > > > > > > >> > > > operations like scheduling or running a
> > > workflow
> > > > > of
> > > > > > > > batch
> > > > > > > > > > > > > >> applications.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > I am not sure about intermediate
> > (processing)
> > > > > > > operators,
> > > > > > > > > as
> > > > > > > > > > > > there
> > > > > > > > > > > > > >> is no
> > > > > > > > > > > > > >> > > > change in their functionality for batch
> use
> > > > cases.
> > > > > > > > > Perhaps,
> > > > > > > > > > > > > allowing
> > > > > > > > > > > > > >> > > > running multiple batches in a single
> > > application
> > > > > may
> > > > > > > > > require
> > > > > > > > > > > > > similar
> > > > > > > > > > > > > >> > > > changes in processing operators as well.
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > ~ Bhupesh
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka
> > > > Gugale <
> > > > > > > > > > > > > pri...@apache.org
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > > > > Will it make an impression on user that,
> > if
> > > he
> > > > > > has a
> > > > > > > > > batch
> > > > > > > > > > > > > >> usecase he
> > > > > > > > > > > > > >> > > has
> > > > > > > > > > > > > >> > > > > to use batch aware operators only? If
> so,
> > is
> > > > > that
> > > > > > > what
> > > > > > > > > we
> > > > > > > > > > > > > expect?
> > > > > > > > > > > > > >> I
> > > > > > > > > > > > > >> > am
> > > > > > > > > > > > > >> > > > not
> > > > > > > > > > > > > >> > > > > aware of how do we implement batch
> > scenario
> > > so
> > > > > > this
> > > > > > > > > might
> > > > > > > > > > > be a
> > > > > > > > > > > > > >> basic
> > > > > > > > > > > > > >> > > > > question.
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > -Priyanka
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM,
> Bhupesh
> > > > > Chawda <
> > > > > > > > > > > > > >> > > > bhup...@datatorrent.com>
> > > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > > > > Hi All,
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > While design / implementation for
> custom
> > > > > control
> > > > > > > > > tuples
> > > > > > > > > > is
> > > > > > > > > > > > > >> > ongoing, I
> > > > > > > > > > > > > >> > > > > > thought it would be a good idea to
> > > consider
> > > > > its
> > > > > > > > > > usefulness
> > > > > > > > > > > > in
> > > > > > > > > > > > > >> one
> > > > > > > > > > > > > >> > of
> > > > > > > > > > > > > >> > > > the
> > > > > > > > > > > > > >> > > > > > use cases -  batch applications.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / extend
> > > > existing
> > > > > > > > > operators
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > >> > Apache
> > > > > > > > > > > > > >> > > > > Apex
> > > > > > > > > > > > > >> > > > > > Malhar library so that it is easy to
> use
> > > > them
> > > > > in
> > > > > > > > batch
> > > > > > > > > > use
> > > > > > > > > > > > > >> cases.
> > > > > > > > > > > > > >> > > > > > Naturally, this would be applicable
> for
> > > > only a
> > > > > > > > subset
> > > > > > > > > of
> > > > > > > > > > > > > >> operators
> > > > > > > > > > > > > >> > > like
> > > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases.
> > > > > > > > > > > > > >> > > > > > For example, for a file based store,
> > (say
> > > > HDFS
> > > > > > > > store),
> > > > > > > > > > we
> > > > > > > > > > > > > could
> > > > > > > > > > > > > >> > have
> > > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput
> > > operators
> > > > > > which
> > > > > > > > > allow
> > > > > > > > > > > > easy
> > > > > > > > > > > > > >> > > > integration
> > > > > > > > > > > > > >> > > > > > into a batch application. These
> > operators
> > > > > would
> > > > > > be
> > > > > > > > > > > extended
> > > > > > > > > > > > > from
> > > > > > > > > > > > > >> > > their
> > > > > > > > > > > > > >> > > > > > existing implementations and would be
> > > "Batch
> > > > > > > Aware",
> > > > > > > > > in
> > > > > > > > > > > that
> > > > > > > > > > > > > >> they
> > > > > > > > > > > > > >> > may
> > > > > > > > > > > > > >> > > > > > understand the meaning of some
> specific
> > > > > control
> > > > > > > > tuples
> > > > > > > > > > > that
> > > > > > > > > > > > > flow
> > > > > > > > > > > > > >> > > > through
> > > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batch
> seem
> > to
> > > > be
> > > > > > the
> > > > > > > > > > obvious
> > > > > > > > > > > > > >> > candidates
> > > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > > >> > > > > > come to mind. On receipt of such
> control
> > > > > tuples,
> > > > > > > > they
> > > > > > > > > > may
> > > > > > > > > > > > try
> > > > > > > > > > > > > to
> > > > > > > > > > > > > >> > > modify
> > > > > > > > > > > > > >> > > > > the
> > > > > > > > > > > > > >> > > > > > behavior of the operator - to
> > reinitialize
> > > > > some
> > > > > > > > > metrics
> > > > > > > > > > or
> > > > > > > > > > > > > >> finalize
> > > > > > > > > > > > > >> > > an
> > > > > > > > > > > > > >> > > > > > output file for example.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > We can discuss the potential control
> > > tuples
> > > > > and
> > > > > > > > > actions
> > > > > > > > > > in
> > > > > > > > > > > > > >> detail,
> > > > > > > > > > > > > >> > > but
> > > > > > > > > > > > > >> > > > > > first I would like to understand the
> > views
> > > > of
> > > > > > the
> > > > > > > > > > > community
> > > > > > > > > > > > > for
> > > > > > > > > > > > > >> > this
> > > > > > > > > > > > > >> > > > > > proposal.
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > > > ~ Bhupesh
> > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to