Even this will not work because the output port of CsvParser is of type
Object. Even though Customer extends Tuple<Object>, it will still fail to
work since Tuple<Object> gets output as Object.

*DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();*

The input port type at windowed operator with InputT = Object :
*DefaultInputPort<Tuple<Object>>*


Ajay


On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

> Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to
> the actual type of InputT at runtime. Introducing an operator just to do a
> cast is not a good design decision, IMO.
>
> Thank you,
> Vlad
>
> Отправлено с iPhone
>
> > On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote:
> >
> > I am using WindowedOperatorImpl and it is declared as follows.
> >
> > WindowedOperatorImpl<InputT, AccumulationType, OutputType>
> windowedOperator
> > = new WindowedOperatorImpl<>();
> >
> > In my application scenario, the InputT is Customer POJO which is getting
> > output as an Object by CsvParser.
> >
> >
> > Ajay
> >
> > On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com>
> > wrote:
> >
> >> How do you declare WindowedOperator?
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
> >>> On 4/28/17 10:35, AJAY GUPTA wrote:
> >>>
> >>> Vlad,
> >>>
> >>> The approach you suggested doesn't work because the CSVParser outputs
> >>> Object Data Type irrespective of the POJO class being emitted.
> >>>
> >>>
> >>> Ajay
> >>>
> >>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com>
> >>> wrote:
> >>>
> >>> Make your POJO class implement WindowedOperator Tuple interface (it may
> >>>> return itself in getValue()).
> >>>>
> >>>> Thank you,
> >>>>
> >>>> Vlad
> >>>>
> >>>> On 4/28/17 02:44, AJAY GUPTA wrote:
> >>>>
> >>>> Hi All,
> >>>>>
> >>>>> I am creating an application which is using Windowed Operator. This
> >>>>> application involves CsvParser operator emitting a POJO object which
> is
> >>>>> to
> >>>>> be passed as input to WindowedOperator. The WindowedOperator
> requires an
> >>>>> instance of Tuple class as input :
> >>>>> *public final transient DefaultInputPort<Tuple<InputT>>
> >>>>> input = new DefaultInputPort<Tuple<InputT>>() *
> >>>>>
> >>>>> Due to this, the addStream cannot work as the output of CsvParser's
> >>>>> output
> >>>>> port is not compatible with input port type of WindowedOperator.
> >>>>> One way to solve this problem is to have an operator between the
> above
> >>>>> two
> >>>>> operators as a convertor.
> >>>>> I would like to know if there is any other more generic approach to
> >>>>> solve
> >>>>> this problem without writing a new Operator for every new application
> >>>>> using
> >>>>> Windowed Operators.
> >>>>>
> >>>>> Thanks,
> >>>>> Ajay
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
> >>>>> bhup...@datatorrent.com>
> >>>>> wrote:
> >>>>>
> >>>>> Hi All,
> >>>>>
> >>>>>> I think we have some agreement on the way we should use control
> tuples
> >>>>>> for
> >>>>>> File I/O operators to support batch.
> >>>>>>
> >>>>>> In order to have more operators in Malhar, support this paradigm, I
> >>>>>> think
> >>>>>> we should also look at store operators - JDBC, Cassandra, HBase etc.
> >>>>>> The case with these operators is simpler as most of these do not
> poll
> >>>>>> the
> >>>>>> sources (except JDBC poller operator) and just stop once they have
> >>>>>> read a
> >>>>>> fixed amount of data. In other words, these are inherently batch
> >>>>>> sources.
> >>>>>> The only change that we should add to these operators is to shut
> down
> >>>>>> the
> >>>>>> DAG once the reading of data is done. For a windowed operator this
> >>>>>> would
> >>>>>> mean a Global window with a final watermark before the DAG is shut
> >>>>>> down.
> >>>>>>
> >>>>>> ~ Bhupesh
> >>>>>>
> >>>>>>
> >>>>>> _______________________________________________________
> >>>>>>
> >>>>>> Bhupesh Chawda
> >>>>>>
> >>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>
> >>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
> >>>>>> bhup...@datatorrent.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>> Hi Thomas,
> >>>>>>
> >>>>>>> Even though the windowing operator is not just "event time", it
> seems
> >>>>>>> it
> >>>>>>> is too much dependent on the "time" attribute of the incoming
> tuple.
> >>>>>>> This
> >>>>>>> is the reason we had to model the file index as a timestamp to
> solve
> >>>>>>> the
> >>>>>>> batch case for files.
> >>>>>>> Perhaps we should work on increasing the scope of the windowed
> >>>>>>> operator
> >>>>>>>
> >>>>>>> to
> >>>>>>
> >>>>>> consider other types of windows as well. The Sequence option
> suggested
> >>>>>>> by
> >>>>>>> David seems to be something in that direction.
> >>>>>>>
> >>>>>>> ~ Bhupesh
> >>>>>>>
> >>>>>>>
> >>>>>>> _______________________________________________________
> >>>>>>>
> >>>>>>> Bhupesh Chawda
> >>>>>>>
> >>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>
> >>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> That's correct, we are looking at a generalized approach for state
> >>>>>>>
> >>>>>>>> management vs. a series of special cases.
> >>>>>>>>
> >>>>>>>> And to be clear, windowing does not imply event time, otherwise it
> >>>>>>>> would
> >>>>>>>> be
> >>>>>>>> "EventTimeOperator" :-)
> >>>>>>>>
> >>>>>>>> Thomas
> >>>>>>>>
> >>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
> >>>>>>>>
> >>>>>>>> bhup...@datatorrent.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi David,
> >>>>>>>>
> >>>>>>>>> I went through the discussion, but it seems like it is more on
> the
> >>>>>>>>>
> >>>>>>>>> event
> >>>>>>>>
> >>>>>>> time watermark handling as opposed to batches. What we are trying
> to
> >>>>>>>
> >>>>>>>> do
> >>>>>>>>
> >>>>>>> is
> >>>>>>>
> >>>>>>>> have watermarks serve the purpose of demarcating batches using
> >>>>>>>>> control
> >>>>>>>>> tuples. Since each batch is separate from others, we would like
> to
> >>>>>>>>>
> >>>>>>>>> have
> >>>>>>>>
> >>>>>>> stateful processing within a batch, but not across batches.
> >>>>>>>
> >>>>>>>> At the same time, we would like to do this in a manner which is
> >>>>>>>>>
> >>>>>>>>> consistent
> >>>>>>>>
> >>>>>>>> with the windowing mechanism provided by the windowed operator.
> This
> >>>>>>>>>
> >>>>>>>>> will
> >>>>>>>>
> >>>>>>>> allow us to treat a single batch as a (bounded) stream and apply
> all
> >>>>>>>>>
> >>>>>>>>> the
> >>>>>>>>
> >>>>>>> event time windowing concepts in that time span.
> >>>>>>>
> >>>>>>>> For example, let's say I need to process data for a day (24
> hours) as
> >>>>>>>>>
> >>>>>>>>> a
> >>>>>>>>
> >>>>>>> single batch. The application is still streaming in nature: it
> would
> >>>>>>>
> >>>>>>>> end
> >>>>>>>>
> >>>>>>> the batch after a day and start a new batch the next day. At the
> same
> >>>>>>>
> >>>>>>>> time,
> >>>>>>>>
> >>>>>>>> I would be able to have early trigger firings every minute as
> well as
> >>>>>>>>>
> >>>>>>>>> drop
> >>>>>>>>
> >>>>>>>> any data which is say, 5 mins late. All this within a single day.
> >>>>>>>>>
> >>>>>>>>> ~ Bhupesh
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> _______________________________________________________
> >>>>>>>>>
> >>>>>>>>> Bhupesh Chawda
> >>>>>>>>>
> >>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>>>
> >>>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>
> >>>>>>> There is a discussion in the Flink mailing list about key-based
> >>>>>>>
> >>>>>>>> watermarks.
> >>>>>>>>>
> >>>>>>>>> I think it's relevant to our use case here.
> >>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc
> >>>>>>>>>> c6510ef
> >>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
> >>>>>>>>>>
> >>>>>>>>>> David
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
> >>>>>>>>>>
> >>>>>>>>>> bhup...@datatorrent.com
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi David,
> >>>>>>>>>>
> >>>>>>>>>>> If using time window does not seem appropriate, we can have
> >>>>>>>>>>>
> >>>>>>>>>>> another
> >>>>>>>>>>
> >>>>>>>>> class
> >>>>>>>
> >>>>>>>> which is more suited for such sequential and distinct windows.
> >>>>>>>>>> Perhaps, a
> >>>>>>>>>> CustomWindow option can be introduced which takes in a window
> id.
> >>>>>>>>>> The
> >>>>>>>>>>
> >>>>>>>>> purpose of this window option could be to translate the window id
> >>>>>>>>>
> >>>>>>>>>> into
> >>>>>>>>>>
> >>>>>>>>> appropriate timestamps.
> >>>>>>>>>
> >>>>>>>>>> Another option would be to go with a custom timestampExtractor
> for
> >>>>>>>>>>>
> >>>>>>>>>>> such
> >>>>>>>>>>
> >>>>>>>>> tuples which translates the each unique file name to a distinct
> >>>>>>>>>
> >>>>>>>>>> timestamp
> >>>>>>>>>> while using time windows in the windowed operator.
> >>>>>>>>>>
> >>>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> _______________________________________________________
> >>>>>>>>>>>
> >>>>>>>>>>> Bhupesh Chawda
> >>>>>>>>>>>
> >>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>>>>>
> >>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <
> david...@gmail.com>
> >>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>> I now see your rationale on putting the filename in the window.
> >>>>>>>>>>
> >>>>>>>>>>> As far as I understand, the reasons why the filename is not
> part
> >>>>>>>>>>>>
> >>>>>>>>>>>> of
> >>>>>>>>>>>
> >>>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>> key
> >>>>>>>>>>
> >>>>>>>>>>> and the Global Window is not used are:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) The files are processed in sequence, not in parallel
> >>>>>>>>>>>> 2) The windowed operator should not keep the state associated
> >>>>>>>>>>>>
> >>>>>>>>>>>> with
> >>>>>>>>>>>
> >>>>>>>>>> the
> >>>>>>>
> >>>>>>>> file
> >>>>>>>>>>
> >>>>>>>>>>> when the processing of the file is done
> >>>>>>>>>>>> 3) The trigger should be fired for the file when a file is
> done
> >>>>>>>>>>>>
> >>>>>>>>>>>> processing.
> >>>>>>>>>>>
> >>>>>>>>>>> However, if the file is just a sequence has nothing to do with
> a
> >>>>>>>>>>>>
> >>>>>>>>>>>> timestamp,
> >>>>>>>>>>>
> >>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do
> >>>>>>>>>>>>
> >>>>>>>>>>>> and
> >>>>>>>>>>>
> >>>>>>>>>> would
> >>>>>>>>>
> >>>>>>>>>> just create confusions to the users, especially when it's used
> >>>>>>>>>>> as
> >>>>>>>>>>>
> >>>>>>>>>> an
> >>>>>>>
> >>>>>>>> example for new users.
> >>>>>>>>>
> >>>>>>>>>> How about having a separate class called SequenceWindow? And
> >>>>>>>>>>>>
> >>>>>>>>>>>> perhaps
> >>>>>>>>>>>
> >>>>>>>>>> TimeWindow can inherit from it?
> >>>>>>>>>
> >>>>>>>>>> David
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org
> >
> >>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
> >>>>>>>>>>
> >>>>>>>>>>> bhup...@datatorrent.com
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I think my comments related to count based windows might be
> >>>>>>>>>>>>> causing
> >>>>>>>>>>>>>
> >>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now.
> >>>>>>>>>>
> >>>>>>>>>>> Just want to make sure we are on the same page wrt. the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "each
> >>>>>>>>>>>>>
> >>>>>>>>>>>> file
> >>>>>>>
> >>>>>>>> is a
> >>>>>>>>>>
> >>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from
> >>>>>>>>>>>>
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>> same
> >>>>>>>>>
> >>>>>>>>>> file
> >>>>>>>>>>>
> >>>>>>>>>>>> has the same timestamp (which is just a sequence number) and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>
> >>>>>>>>>>>> helps
> >>>>>>>>>
> >>>>>>>>>> keep tuples from each file in a separate window.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> time
> >>>>>>>>>>>>
> >>>>>>>>>>> stamp
> >>>>>>>>>
> >>>>>>>>>> also, depending on the file naming convention. And if it was
> >>>>>>>>>>>
> >>>>>>>>>>>> event
> >>>>>>>>>>>>
> >>>>>>>>>>> time
> >>>>>>>>>
> >>>>>>>>>> processing, the watermark would be derived from records within
> >>>>>>>>>>>
> >>>>>>>>>>>> the
> >>>>>>>>>>>>
> >>>>>>>>>>> file.
> >>>>>>>>>
> >>>>>>>>>> Agreed, the source should have a mechanism to control the time
> >>>>>>>>>>>> stamp
> >>>>>>>>>>>>
> >>>>>>>>>>> extraction along with everything else pertaining to the
> >>>>>>>>>>
> >>>>>>>>>>> watermark
> >>>>>>>>>>>>
> >>>>>>>>>>> generation.
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to
> >>>>>>>>>>>>> identify
> >>>>>>>>>>>>>
> >>>>>>>>>>>> the
> >>>>>>>>>>>
> >>>>>>>>>>>> timestamp (sequence number) for a file.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> _______________________________________________________
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Bhupesh Chawda
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> t...@apache.org
> >>>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I don't think this is a use case for count based window.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> We have multiple files that are retrieved in a sequence
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> there
> >>>>>>>
> >>>>>>>> is
> >>>>>>>>>>
> >>>>>>>>>>> no
> >>>>>>>>>>>>
> >>>>>>>>>>>> knowledge of the number of records per file. The
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> requirement is
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> to
> >>>>>>>>>
> >>>>>>>>>> aggregate each file separately and emit the aggregate when
> >>>>>>>>>>>
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> file
> >>>>>>>>>
> >>>>>>>>>> is
> >>>>>>>>>>>
> >>>>>>>>>>>> read
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> fully. There is no concept of "end of something" for an
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> individual
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> key
> >>>>>>>>>>>
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> global window isn't applicable.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> can
> >>>>>>>>>
> >>>>>>>>>> be
> >>>>>>>>>>>
> >>>>>>>>>>> solved using watermark and window (in this case the window
> >>>>>>>>>>>>
> >>>>>>>>>>>>> timestamp
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> isn't
> >>>>>>>>>>>>
> >>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> david...@gmail.com
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> I don't think this is the way to go. Global Window only
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> means
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>>> timestamp
> >>>>>>>>>>>
> >>>>>>>>>>>> does not matter (or that there is no timestamp). It does
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> necessarily
> >>>>>>>>>
> >>>>>>>>>> mean it's a large batch. Unless there is some notion of
> >>>>>>>>>>>>>>> event
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> time
> >>>>>>>>>
> >>>>>>>>>> for
> >>>>>>>>>>>>
> >>>>>>>>>>>>> each
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> file, you don't want to embed the file into the window
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> itself.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> If you want the result broken up by file name, and if
> >>>>>>>>>>
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> files
> >>>>>>>
> >>>>>>>> are
> >>>>>>>>>>
> >>>>>>>>>>> to
> >>>>>>>>>>>>
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> processed in parallel, I think making the file name be
> >>>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> of
> >>>>>>>>>
> >>>>>>>>> the
> >>>>>>>>>>
> >>>>>>>>>>> key
> >>>>>>>>>>>>
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow
> >>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>>> file
> >>>>>>>>>>>
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> be part of the window.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> you're
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> welcome
> >>>>>>>>>
> >>>>>>>>>> to
> >>>>>>>>>>>>
> >>>>>>>>>>>>> add
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> that feature. In case of count-based windows, there
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> be
> >>>>>>>
> >>>>>>>> no
> >>>>>>>>>
> >>>>>>>>> notion
> >>>>>>>>>>
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> time and you probably only trigger at the end of each
> >>>>>>>>>>>>>>> window.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In
> >>>>>>>>>
> >>>>>>>>>> the
> >>>>>>>>>>>
> >>>>>>>>>>>> case
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> of count-based windows, the watermark only matters for
> >>>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> since
> >>>>>>>>>
> >>>>>>>>>> you
> >>>>>>>>>>>>
> >>>>>>>>>>>>> need
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 10,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>> number
> >>>>>>>>>>
> >>>>>>>>>>> of
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to
> >>>>>>>>>>>>>>> end
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>> last
> >>>>>>>>>>
> >>>>>>>>>>> window
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> with 5 tuples).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> bhup...@datatorrent.com
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi David,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for your comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The wordcount example that I created based on the
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> windowed
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> operator
> >>>>>>>>>
> >>>>>>>>>> does
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> processing of word counts per file (each file as a
> >>>>>>>>>>>>>>>> separate
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> batch),
> >>>>>>>>>
> >>>>>>>>>> i.e.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> process counts for each file and dump into separate
> >>>>>>>>>>>>>>>> files.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> As I understand Global window is for one large batch;
> >>>>>>>>>
> >>>>>>>>>> i.e.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> all
> >>>>>>>>>
> >>>>>>>>>> incoming
> >>>>>>>>>>>
> >>>>>>>>>>>> data falls into the same batch. This could not be
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> processed
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> using
> >>>>>>>>>
> >>>>>>>>>> GlobalWindow option as we need more than one windows.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> this
> >>>>>>>
> >>>>>>>> case, I
> >>>>>>>>>>
> >>>>>>>>>>> configured the windowed operator to have time windows
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1ms
> >>>>>>>
> >>>>>>>> each
> >>>>>>>>>>
> >>>>>>>>>>> and
> >>>>>>>>>>>>
> >>>>>>>>>>>>> passed data for each file with increasing timestamps:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (file1,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1),
> >>>>>>>>>>
> >>>>>>>>>>> (file2,
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 2) and so on. Is there a better way of handling this
> >>>>>>>>>>>>>>>> scenario?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is
> >>>>>>>>>>>
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> trigger
> >>>>>>>
> >>>>>>>> option
> >>>>>>>>>>>
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> process count based windows. In case I want to process
> >>>>>>>>>>>>>>>> every
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1000
> >>>>>>>>>>
> >>>>>>>>>>> tuples
> >>>>>>>>>>>>
> >>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> a batch, I could set the Trigger option to
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> CountTrigger
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> with
> >>>>>>>
> >>>>>>>> the
> >>>>>>>>>>
> >>>>>>>>>>> accumulation set to Discarding. Is this correct?
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Global
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> window.
> >>>>>>>>>
> >>>>>>>>>> ​~ Bhupesh​
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> ______________________________
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> _________________________
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Bhupesh Chawda
> >>>>>>>
> >>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> david...@gmail.com>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I'm worried that we are making the watermark concept
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> too
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> complicated.
> >>>>>>>>>
> >>>>>>>>>> Watermarks should simply just tell you what windows
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>
> >>>>>>>>> considered
> >>>>>>>>>>
> >>>>>>>>>>> complete.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Point 2 is basically a count-based window.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Watermarks
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> do
> >>>>>>>
> >>>>>>>> not
> >>>>>>>>>
> >>>>>>>>>> play a
> >>>>>>>>>>>
> >>>>>>>>>>>> role
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> here because the window is always complete at the
> >>>>>>>>>>>>>>>>> n-th
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> tuple.
> >>>>>>>
> >>>>>>>> If I understand correctly, point 3 is for batch
> >>>>>>>>>>>
> >>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>
> >>>>>>>>>> files.
> >>>>>>>>>>>
> >>>>>>>>>>>> Unless
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> can
> >>>>>>>>>
> >>>>>>>>>> be
> >>>>>>>>>>>
> >>>>>>>>>>>> achieved
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a
> >>>>>>>>>>>>>>>>> watermark
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>
> >>>>>>>>>> a
> >>>>>>>>>>>>
> >>>>>>>>>>>> +infinity
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> timestamp can be used so that triggers will be fired
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> upon
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> receipt
> >>>>>>>>>
> >>>>>>>>>> of
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> watermark.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> achieved
> >>>>>>>
> >>>>>>>> with a
> >>>>>>>>>>>>
> >>>>>>>>>>>>> watermark with a +infinity timestamp.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> David
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Thomas,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For an input operator which is supposed to
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> generate
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> watermarks
> >>>>>>>
> >>>>>>>> for
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> downstream operators, I can think about the
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> following
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> watermarks
> >>>>>>>>>
> >>>>>>>>>> that
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> operator can emit:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> watermark)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n
> >>>>>>>>>>>>
> >>>>>>>>>>>>> tuples)
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file)
> >>>>>>>
> >>>>>>>> 4. Final watermark
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> batch
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> (file
> >>>>>>>>>
> >>>>>>>>>> based)
> >>>>>>>>>>>
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> well,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> and hence I thought of looking at these first.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Does
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> this
> >>>>>>>
> >>>>>>>> seem
> >>>>>>>>>>
> >>>>>>>>>>> to
> >>>>>>>>>>>>
> >>>>>>>>>>>>> be
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> line
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> with the thought process?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ~ Bhupesh
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ______________________________
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> _________________________
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Bhupesh Chawda
> >>>>>>>>>>
> >>>>>>>>>>> Software Engineer
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> t...@apache.org
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I don't think this should be designed based on a
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> simplistic
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> file
> >>>>>>>>>>>>
> >>>>>>>>>>>>> input-output scenario. It would be good to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> include a
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> stateful
> >>>>>>>>>
> >>>>>>>>>> transformation based on event time.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> More complex pipelines contain stateful
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> transformations
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>
> >>>>>>>>>>> depend
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> watermark
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> concept
> >>>>>>>>>
> >>>>>>>>>> that
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> based
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> increasing
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> sequence)
> >>>>>>>>>>>
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> other operators can generically work with.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Note that even file input in many cases can
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> produce
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> time
> >>>>>>>>>
> >>>>>>>>>> based
> >>>>>>>>>>>
> >>>>>>>>>>>> watermarks,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> for example when you read part files that are
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> bound
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> by
> >>>>>>>>>
> >>>>>>>>> event
> >>>>>>>>>>
> >>>>>>>>>>> time.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thomas
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>
>

Reply via email to