I think we should have the support to allow running multiple batches
through the same DAG.

Resetting the watermark to the initial watermark seems like a good idea.
The windowed operator needs to understand the start/end batch control tuple
and reset the watermark.

~ Bhupesh



_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, May 11, 2017 at 7:37 PM, Thomas Weise <t...@apache.org> wrote:

> Usually batches are processed by different instances of a topology. First
> of all we should agree that running multiple batches through the same DAG
> *in sequence* is something that we want to address.
>
> If yes, then there is the reset problem you are referring to, and it only
> occurs when you want to do event time processing per batch, because here
> you cannot repurpose the time component to segregate batches.
>
> 2 possible options that come to mind:
>
> - Reset the watermark to the initial watermark, which basically means that
> instead of a "shutdown" tuple there is a "reset" tuple.
> - Schedule separate operators/pipelines for each batch run.
>
> Thomas
>
>
> On Tue, May 9, 2017 at 5:25 AM, AJAY GUPTA <ajaygit...@gmail.com> wrote:
>
> > After some discussion and trying out the approach discussed above, it
> seems
> > we would need to separate out the concepts of Watermarks and Batch
> Control
> > tuples.
> > The windowed operator needs to be modified to understand batch control
> > tuples.
> >
> > Even if we have watermark tuples which also include batch information,
> > windowed operator will fail when the source data is event time based.
> This
> > is because in this scenario, there are two notions of time in the
> > watermark:
> > 1. Time used to denote the file / batch boundary
> > 2. The event time in the data.
> >
> > For this reason, it makes sense to separate the concepts of batch tuples
> > (start something / end something) from the watermark tuples (which
> > essentially deal with event times).
> >
> > We could argue having a watermark tuple indicating end of the batch - a
> > final watermark (with time = Long.MAX) which would finalize all windows
> in
> > the windowed operator. However, now, if a next batch needs to be
> processed
> > subsequently by the same windowed operator, we would need to reset the
> > state of the operator as it has moved ahead in the event time domain. The
> > batch control tuples can do this resetting of state (in other words,
> > preparation for processing a new batch of data).
> >
> > As an example, consider telecom data logs for same 24 hrs of 2 regions (A
> > and B) which are to be processed as a batch. After processing data
> records
> > from region A, a "final" watermark would be emitted indicating end of all
> > data from region A. Now, unless we clear the windowed operator's state
> > information (current watermark, data storage) from the windowed operator,
> > the data records from region B will not be processed. In such scenario,
> > receiving an end batch control tuple can indicate the operator to reset
> its
> > state.
> >
> >
> > Ajay
> >
> >
> > On Sun, Apr 30, 2017 at 4:32 AM, Vlad Rozov <v.ro...@datatorrent.com>
> > wrote:
> >
> > > public static class Pojo implements Tuple
> > > {
> > >   @Override
> > >   public Object getValue()
> > >   {
> > >     return this;
> > >   }
> > > }
> > >
> > > @Override
> > > public void populateDAG(DAG dag, Configuration conf)
> > > {
> > >   CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
> > >   WindowedOperatorImpl<Pojo, Pojo, Pojo> windowedOperator =
> > > dag.addOperator("windowOperator", WindowedOperatorImpl.class);
> > >   dag.addStream("csvToWindowed", csvParser.out, new
> > > InputPort[]{windowedOperator.input});
> > > }
> > >
> > >
> > > Thank you,
> > >
> > > Vlad
> > >
> > > On 4/29/17 15:20, AJAY GUPTA wrote:
> > >
> > >> Even this will not work because the output port of CsvParser is of
> type
> > >> Object. Even though Customer extends Tuple<Object>, it will still fail
> > to
> > >> work since Tuple<Object> gets output as Object.
> > >>
> > >> *DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();*
> > >>
> > >> The input port type at windowed operator with InputT = Object :
> > >> *DefaultInputPort<Tuple<Object>>*
> > >>
> > >>
> > >> Ajay
> > >>
> > >>
> > >> On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.ro...@datatorrent.com>
> > >> wrote:
> > >>
> > >> Use Object in place of InputT in the WindowedOperatorImpl. Cast Object
> > to
> > >>> the actual type of InputT at runtime. Introducing an operator just to
> > do
> > >>> a
> > >>> cast is not a good design decision, IMO.
> > >>>
> > >>> Thank you,
> > >>> Vlad
> > >>>
> > >>> Отправлено с iPhone
> > >>>
> > >>> On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote:
> > >>>>
> > >>>> I am using WindowedOperatorImpl and it is declared as follows.
> > >>>>
> > >>>> WindowedOperatorImpl<InputT, AccumulationType, OutputType>
> > >>>>
> > >>> windowedOperator
> > >>>
> > >>>> = new WindowedOperatorImpl<>();
> > >>>>
> > >>>> In my application scenario, the InputT is Customer POJO which is
> > getting
> > >>>> output as an Object by CsvParser.
> > >>>>
> > >>>>
> > >>>> Ajay
> > >>>>
> > >>>> On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <
> v.ro...@datatorrent.com
> > >
> > >>>> wrote:
> > >>>>
> > >>>> How do you declare WindowedOperator?
> > >>>>>
> > >>>>> Thank you,
> > >>>>>
> > >>>>> Vlad
> > >>>>>
> > >>>>>
> > >>>>> On 4/28/17 10:35, AJAY GUPTA wrote:
> > >>>>>>
> > >>>>>> Vlad,
> > >>>>>>
> > >>>>>> The approach you suggested doesn't work because the CSVParser
> > outputs
> > >>>>>> Object Data Type irrespective of the POJO class being emitted.
> > >>>>>>
> > >>>>>>
> > >>>>>> Ajay
> > >>>>>>
> > >>>>>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <
> > v.ro...@datatorrent.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> Make your POJO class implement WindowedOperator Tuple interface
> (it
> > >>>>>> may
> > >>>>>>
> > >>>>>>> return itself in getValue()).
> > >>>>>>>
> > >>>>>>> Thank you,
> > >>>>>>>
> > >>>>>>> Vlad
> > >>>>>>>
> > >>>>>>> On 4/28/17 02:44, AJAY GUPTA wrote:
> > >>>>>>>
> > >>>>>>> Hi All,
> > >>>>>>>
> > >>>>>>>> I am creating an application which is using Windowed Operator.
> > This
> > >>>>>>>> application involves CsvParser operator emitting a POJO object
> > which
> > >>>>>>>>
> > >>>>>>> is
> > >>>
> > >>>> to
> > >>>>>>>> be passed as input to WindowedOperator. The WindowedOperator
> > >>>>>>>>
> > >>>>>>> requires an
> > >>>
> > >>>> instance of Tuple class as input :
> > >>>>>>>> *public final transient DefaultInputPort<Tuple<InputT>>
> > >>>>>>>> input = new DefaultInputPort<Tuple<InputT>>() *
> > >>>>>>>>
> > >>>>>>>> Due to this, the addStream cannot work as the output of
> > CsvParser's
> > >>>>>>>> output
> > >>>>>>>> port is not compatible with input port type of WindowedOperator.
> > >>>>>>>> One way to solve this problem is to have an operator between the
> > >>>>>>>>
> > >>>>>>> above
> > >>>
> > >>>> two
> > >>>>>>>> operators as a convertor.
> > >>>>>>>> I would like to know if there is any other more generic approach
> > to
> > >>>>>>>> solve
> > >>>>>>>> this problem without writing a new Operator for every new
> > >>>>>>>> application
> > >>>>>>>> using
> > >>>>>>>> Windowed Operators.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Ajay
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
> > >>>>>>>> bhup...@datatorrent.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi All,
> > >>>>>>>>
> > >>>>>>>> I think we have some agreement on the way we should use control
> > >>>>>>>>>
> > >>>>>>>> tuples
> > >>>
> > >>>> for
> > >>>>>>>>> File I/O operators to support batch.
> > >>>>>>>>>
> > >>>>>>>>> In order to have more operators in Malhar, support this
> > paradigm, I
> > >>>>>>>>> think
> > >>>>>>>>> we should also look at store operators - JDBC, Cassandra, HBase
> > >>>>>>>>> etc.
> > >>>>>>>>> The case with these operators is simpler as most of these do
> not
> > >>>>>>>>>
> > >>>>>>>> poll
> > >>>
> > >>>> the
> > >>>>>>>>> sources (except JDBC poller operator) and just stop once they
> > have
> > >>>>>>>>> read a
> > >>>>>>>>> fixed amount of data. In other words, these are inherently
> batch
> > >>>>>>>>> sources.
> > >>>>>>>>> The only change that we should add to these operators is to
> shut
> > >>>>>>>>>
> > >>>>>>>> down
> > >>>
> > >>>> the
> > >>>>>>>>> DAG once the reading of data is done. For a windowed operator
> > this
> > >>>>>>>>> would
> > >>>>>>>>> mean a Global window with a final watermark before the DAG is
> > shut
> > >>>>>>>>> down.
> > >>>>>>>>>
> > >>>>>>>>> ~ Bhupesh
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> _______________________________________________________
> > >>>>>>>>>
> > >>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>
> > >>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>
> > >>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
> > >>>>>>>>> bhup...@datatorrent.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>> Hi Thomas,
> > >>>>>>>>>
> > >>>>>>>>> Even though the windowing operator is not just "event time", it
> > >>>>>>>>>>
> > >>>>>>>>> seems
> > >>>
> > >>>> it
> > >>>>>>>>>> is too much dependent on the "time" attribute of the incoming
> > >>>>>>>>>>
> > >>>>>>>>> tuple.
> > >>>
> > >>>> This
> > >>>>>>>>>> is the reason we had to model the file index as a timestamp to
> > >>>>>>>>>>
> > >>>>>>>>> solve
> > >>>
> > >>>> the
> > >>>>>>>>>> batch case for files.
> > >>>>>>>>>> Perhaps we should work on increasing the scope of the windowed
> > >>>>>>>>>> operator
> > >>>>>>>>>>
> > >>>>>>>>>> to
> > >>>>>>>>>>
> > >>>>>>>>> consider other types of windows as well. The Sequence option
> > >>>>>>>>>
> > >>>>>>>> suggested
> > >>>
> > >>>> by
> > >>>>>>>>>> David seems to be something in that direction.
> > >>>>>>>>>>
> > >>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> _______________________________________________________
> > >>>>>>>>>>
> > >>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>
> > >>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>
> > >>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <
> t...@apache.org>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> That's correct, we are looking at a generalized approach for
> > state
> > >>>>>>>>>>
> > >>>>>>>>>> management vs. a series of special cases.
> > >>>>>>>>>>>
> > >>>>>>>>>>> And to be clear, windowing does not imply event time,
> otherwise
> > >>>>>>>>>>> it
> > >>>>>>>>>>> would
> > >>>>>>>>>>> be
> > >>>>>>>>>>> "EventTimeOperator" :-)
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thomas
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
> > >>>>>>>>>>>
> > >>>>>>>>>>> bhup...@datatorrent.com>
> > >>>>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>> Hi David,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I went through the discussion, but it seems like it is more
> on
> > >>>>>>>>>>>>
> > >>>>>>>>>>> the
> > >>>
> > >>>> event
> > >>>>>>>>>>>>
> > >>>>>>>>>>> time watermark handling as opposed to batches. What we are
> > trying
> > >>>>>>>>>>
> > >>>>>>>>> to
> > >>>
> > >>>> do
> > >>>>>>>>>>>
> > >>>>>>>>>>> is
> > >>>>>>>>>>
> > >>>>>>>>>> have watermarks serve the purpose of demarcating batches using
> > >>>>>>>>>>>
> > >>>>>>>>>>>> control
> > >>>>>>>>>>>> tuples. Since each batch is separate from others, we would
> > like
> > >>>>>>>>>>>>
> > >>>>>>>>>>> to
> > >>>
> > >>>> have
> > >>>>>>>>>>>>
> > >>>>>>>>>>> stateful processing within a batch, but not across batches.
> > >>>>>>>>>>
> > >>>>>>>>>> At the same time, we would like to do this in a manner which
> is
> > >>>>>>>>>>>
> > >>>>>>>>>>>> consistent
> > >>>>>>>>>>>>
> > >>>>>>>>>>> with the windowing mechanism provided by the windowed
> operator.
> > >>>>>>>>>>>
> > >>>>>>>>>> This
> > >>>
> > >>>> will
> > >>>>>>>>>>>>
> > >>>>>>>>>>> allow us to treat a single batch as a (bounded) stream and
> > apply
> > >>>>>>>>>>>
> > >>>>>>>>>> all
> > >>>
> > >>>> the
> > >>>>>>>>>>>>
> > >>>>>>>>>>> event time windowing concepts in that time span.
> > >>>>>>>>>>
> > >>>>>>>>>> For example, let's say I need to process data for a day (24
> > >>>>>>>>>>>
> > >>>>>>>>>> hours) as
> > >>>
> > >>>> a
> > >>>>>>>>>>>>
> > >>>>>>>>>>> single batch. The application is still streaming in nature:
> it
> > >>>>>>>>>>
> > >>>>>>>>> would
> > >>>
> > >>>> end
> > >>>>>>>>>>>
> > >>>>>>>>>>> the batch after a day and start a new batch the next day. At
> > the
> > >>>>>>>>>>
> > >>>>>>>>> same
> > >>>
> > >>>> time,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would be able to have early trigger firings every minute as
> > >>>>>>>>>>>
> > >>>>>>>>>> well as
> > >>>
> > >>>> drop
> > >>>>>>>>>>>>
> > >>>>>>>>>>> any data which is say, 5 mins late. All this within a single
> > day.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> _______________________________________________________
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <
> > david...@gmail.com>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>> There is a discussion in the Flink mailing list about
> key-based
> > >>>>>>>>>>
> > >>>>>>>>>> watermarks.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> I think it's relevant to our use case here.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> https://lists.apache.org/threa
> d.html/2b90d5b1d5e2654212cfbbc
> > >>>>>>>>>>>>> c6510ef
> > >>>>>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> David
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> bhup...@datatorrent.com
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Hi David,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> If using time window does not seem appropriate, we can have
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> another
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> class
> > >>>>>>>>>>>>
> > >>>>>>>>>>> which is more suited for such sequential and distinct
> windows.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Perhaps, a
> > >>>>>>>>>>>>> CustomWindow option can be introduced which takes in a
> window
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> id.
> > >>>
> > >>>> The
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> purpose of this window option could be to translate the
> > window
> > >>>>>>>>>>>> id
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> into
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> appropriate timestamps.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Another option would be to go with a custom
> timestampExtractor
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> for
> > >>>
> > >>>> such
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> tuples which translates the each unique file name to a
> > distinct
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> timestamp
> > >>>>>>>>>>>>> while using time windows in the windowed operator.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> _______________________________________________________
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> david...@gmail.com>
> > >>>
> > >>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> I now see your rationale on putting the filename in the
> > window.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> As far as I understand, the reasons why the filename is not
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> part
> > >>>
> > >>>> of
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> key
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> and the Global Window is not used are:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1) The files are processed in sequence, not in parallel
> > >>>>>>>>>>>>>>> 2) The windowed operator should not keep the state
> > associated
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> file
> > >>>>>>>>>>>
> > >>>>>>>>>>>> when the processing of the file is done
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 3) The trigger should be fired for the file when a file
> is
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> done
> > >>>
> > >>>> processing.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> However, if the file is just a sequence has nothing to do
> > with
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> a
> > >>>
> > >>>> timestamp,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing
> to
> > >>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> would
> > >>>>>>>>>>>>> just create confusions to the users, especially when it's
> > used
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> an
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>> example for new users.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> How about having a separate class called SequenceWindow? And
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> perhaps
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> TimeWindow can inherit from it?
> > >>>>>>>>>>>>> David
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <
> > t...@apache.org
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> bhup...@datatorrent.com
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I think my comments related to count based windows might
> be
> > >>>>>>>>>>>>>>>> causing
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> confusion. Let's not discuss count based scenarios for
> > now.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Just want to make sure we are on the same page wrt. the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> "each
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> is a
> > >>>>>>>>>>>
> > >>>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> same
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> has the same timestamp (which is just a sequence number)
> and
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> helps
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> keep tuples from each file in a separate window.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could
> be a
> > >>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> stamp
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> also, depending on the file naming convention. And if it
> was
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> event
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> processing, the watermark would be derived from records
> > within
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> file.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>> Agreed, the source should have a mechanism to control the
> > time
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> stamp
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> extraction along with everything else pertaining to the
> > >>>>>>>>>>>>>> watermark
> > >>>>>>>>>>>>>> generation.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> We could also implement a "timestampExtractor" interface
> to
> > >>>>>>>>>>>>>>>> identify
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> timestamp (sequence number) for a file.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> ______________________________
> _________________________
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> t...@apache.org
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I don't think this is a use case for count based window.
> > >>>>>>>>>>>
> > >>>>>>>>>>>> We have multiple files that are retrieved in a sequence
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>
> > >>>>>>>>>>>> no
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> knowledge of the number of records per file. The
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> requirement is
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> aggregate each file separately and emit the aggregate
> when
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> read
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> fully. There is no concept of "end of something" for an
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> individual
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> global window isn't applicable.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> However, as already explained and implemented by
> > Bhupesh,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> solved using watermark and window (in this case the window
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> timestamp
> > >>>>>>>>>>>>>>>> isn't
> > >>>>>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't
> matter.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Thomas
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> david...@gmail.com
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I don't think this is the way to go. Global Window only
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> means
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> timestamp
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> does not matter (or that there is no timestamp). It does
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> necessarily
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> mean it's a large batch. Unless there is some notion of
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> event
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> each
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> file, you don't want to embed the file into the window
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> itself.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> If you want the result broken up by file name, and if
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> files
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> processed in parallel, I think making the file name be
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> part
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> key
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the way to go. I think it's very confusing if we
> somehow
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> be part of the window.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> you're
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> welcome
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> add
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> that feature. In case of count-based windows, there
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> would
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> no
> > >>>>>>>>>>>
> > >>>>>>>>>>>> notion
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> time and you probably only trigger at the end of each
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> window.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> case
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> of count-based windows, the watermark only matters for
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> batch
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 10,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> number
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> end
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> last
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> window
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> with 5 tuples).
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> David
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Hi David,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Thanks for your comments.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The wordcount example that I created based on the
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> windowed
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> operator
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> does
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> processing of word counts per file (each file as a
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> separate
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> batch),
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> i.e.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> process counts for each file and dump into separate
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> files.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> As I understand Global window is for one large batch;
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> i.e.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> all
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> incoming
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> data falls into the same batch. This could not be
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> processed
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> using
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> GlobalWindow option as we need more than one windows.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> In
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> case, I
> > >>>>>>>>>>>
> > >>>>>>>>>>>> configured the windowed operator to have time windows
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>> 1ms
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> each
> > >>>>>>>>>>>
> > >>>>>>>>>>>> and
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> passed data for each file with increasing timestamps:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> (file1,
> > >>>>>>>>>>>>>>>>>> 1),
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> (file2,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2) and so on. Is there a better way of handling this
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> scenario?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> trigger
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> option
> > >>>>>>>>>>>
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> process count based windows. In case I want to process
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> every
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 1000
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> tuples
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> a batch, I could set the Trigger option to
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> CountTrigger
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>
> > >>>>>>>>>>>> accumulation set to Discarding. Is this correct?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Global
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> window.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> ​~ Bhupesh​
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ______________________________
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> _________________________
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>>
> > >>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> david...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I'm worried that we are making the watermark concept
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> too
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> complicated.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Watermarks should simply just tell you what windows
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> considered
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> complete.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Point 2 is basically a count-based window.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Watermarks
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> do
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> not
> > >>>>>>>>>>>
> > >>>>>>>>>>>> play a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> role
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> here because the window is always complete at the
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> n-th
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> tuple.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> If I understand correctly, point 3 is for batch
> > >>>>>>>>>>>
> > >>>>>>>>>>>> processing
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> files.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Unless
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> achieved
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> watermark
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> with
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> a
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> +infinity
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> timestamp can be used so that triggers will be fired
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> upon
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> receipt
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> watermark.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> achieved
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> with a
> > >>>>>>>>>>>
> > >>>>>>>>>>>> watermark with a +infinity timestamp.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> David
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Hi Thomas,
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> For an input operator which is supposed to
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> generate
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> watermarks
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>
> > >>>>>>>>>>>> downstream operators, I can think about the
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> following
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> watermarks
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> operator can emit:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> watermark)
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> tuples)
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> 4. Final watermark
> > >>>>>>>>>>>
> > >>>>>>>>>>>> File based watermarks seem to be applicable for
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> batch
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> (file
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> based)
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> well,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> and hence I thought of looking at these first.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Does
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> seem
> > >>>>>>>>>>>
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> be
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> in
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> line
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> with the thought process?
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> ~ Bhupesh
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> ______________________________
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> _________________________
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Bhupesh Chawda
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Software Engineer
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> t...@apache.org
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I don't think this should be designed based on a
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> simplistic
> > >>>>>>>>>>>>>>>>>>>>> file
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> input-output scenario. It would be good to
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> include a
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> stateful
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> transformation based on event time.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> More complex pipelines contain stateful
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> transformations
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> depend
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> on
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> watermark
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> concept
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> based
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> increasing
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> sequence)
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> other operators can generically work with.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Note that even file input in many cases can
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> produce
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> time
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> based
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> watermarks,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> for example when you read part files that are
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> bound
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> by
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> event
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> time.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thomas
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> <
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> >
>

Reply via email to