I am using WindowedOperatorImpl and it is declared as follows.

WindowedOperatorImpl<InputT, AccumulationType, OutputType> windowedOperator
= new WindowedOperatorImpl<>();

In my application scenario, the InputT is Customer POJO which is getting
output as an Object by CsvParser.


Ajay

On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

> How do you declare WindowedOperator?
>
> Thank you,
>
> Vlad
>
>
> On 4/28/17 10:35, AJAY GUPTA wrote:
>
>> Vlad,
>>
>> The approach you suggested doesn't work because the CSVParser outputs
>> Object Data Type irrespective of the POJO class being emitted.
>>
>>
>> Ajay
>>
>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com>
>> wrote:
>>
>> Make your POJO class implement WindowedOperator Tuple interface (it may
>>> return itself in getValue()).
>>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>> On 4/28/17 02:44, AJAY GUPTA wrote:
>>>
>>> Hi All,
>>>>
>>>> I am creating an application which is using Windowed Operator. This
>>>> application involves CsvParser operator emitting a POJO object which is
>>>> to
>>>> be passed as input to WindowedOperator. The WindowedOperator requires an
>>>> instance of Tuple class as input :
>>>> *public final transient DefaultInputPort<Tuple<InputT>>
>>>> input = new DefaultInputPort<Tuple<InputT>>() *
>>>>
>>>> Due to this, the addStream cannot work as the output of CsvParser's
>>>> output
>>>> port is not compatible with input port type of WindowedOperator.
>>>> One way to solve this problem is to have an operator between the above
>>>> two
>>>> operators as a convertor.
>>>> I would like to know if there is any other more generic approach to
>>>> solve
>>>> this problem without writing a new Operator for every new application
>>>> using
>>>> Windowed Operators.
>>>>
>>>> Thanks,
>>>> Ajay
>>>>
>>>>
>>>>
>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
>>>> bhup...@datatorrent.com>
>>>> wrote:
>>>>
>>>> Hi All,
>>>>
>>>>> I think we have some agreement on the way we should use control tuples
>>>>> for
>>>>> File I/O operators to support batch.
>>>>>
>>>>> In order to have more operators in Malhar, support this paradigm, I
>>>>> think
>>>>> we should also look at store operators - JDBC, Cassandra, HBase etc.
>>>>> The case with these operators is simpler as most of these do not poll
>>>>> the
>>>>> sources (except JDBC poller operator) and just stop once they have
>>>>> read a
>>>>> fixed amount of data. In other words, these are inherently batch
>>>>> sources.
>>>>> The only change that we should add to these operators is to shut down
>>>>> the
>>>>> DAG once the reading of data is done. For a windowed operator this
>>>>> would
>>>>> mean a Global window with a final watermark before the DAG is shut
>>>>> down.
>>>>>
>>>>> ~ Bhupesh
>>>>>
>>>>>
>>>>> _______________________________________________________
>>>>>
>>>>> Bhupesh Chawda
>>>>>
>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>
>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
>>>>> bhup...@datatorrent.com>
>>>>> wrote:
>>>>>
>>>>> Hi Thomas,
>>>>>
>>>>>> Even though the windowing operator is not just "event time", it seems
>>>>>> it
>>>>>> is too much dependent on the "time" attribute of the incoming tuple.
>>>>>> This
>>>>>> is the reason we had to model the file index as a timestamp to solve
>>>>>> the
>>>>>> batch case for files.
>>>>>> Perhaps we should work on increasing the scope of the windowed
>>>>>> operator
>>>>>>
>>>>>> to
>>>>>
>>>>> consider other types of windows as well. The Sequence option suggested
>>>>>> by
>>>>>> David seems to be something in that direction.
>>>>>>
>>>>>> ~ Bhupesh
>>>>>>
>>>>>>
>>>>>> _______________________________________________________
>>>>>>
>>>>>> Bhupesh Chawda
>>>>>>
>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>
>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> That's correct, we are looking at a generalized approach for state
>>>>>>
>>>>>>> management vs. a series of special cases.
>>>>>>>
>>>>>>> And to be clear, windowing does not imply event time, otherwise it
>>>>>>> would
>>>>>>> be
>>>>>>> "EventTimeOperator" :-)
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
>>>>>>>
>>>>>>> bhup...@datatorrent.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>>> I went through the discussion, but it seems like it is more on the
>>>>>>>>
>>>>>>>> event
>>>>>>>
>>>>>> time watermark handling as opposed to batches. What we are trying to
>>>>>>
>>>>>>> do
>>>>>>>
>>>>>> is
>>>>>>
>>>>>>> have watermarks serve the purpose of demarcating batches using
>>>>>>>> control
>>>>>>>> tuples. Since each batch is separate from others, we would like to
>>>>>>>>
>>>>>>>> have
>>>>>>>
>>>>>> stateful processing within a batch, but not across batches.
>>>>>>
>>>>>>> At the same time, we would like to do this in a manner which is
>>>>>>>>
>>>>>>>> consistent
>>>>>>>
>>>>>>> with the windowing mechanism provided by the windowed operator. This
>>>>>>>>
>>>>>>>> will
>>>>>>>
>>>>>>> allow us to treat a single batch as a (bounded) stream and apply all
>>>>>>>>
>>>>>>>> the
>>>>>>>
>>>>>> event time windowing concepts in that time span.
>>>>>>
>>>>>>> For example, let's say I need to process data for a day (24 hours) as
>>>>>>>>
>>>>>>>> a
>>>>>>>
>>>>>> single batch. The application is still streaming in nature: it would
>>>>>>
>>>>>>> end
>>>>>>>
>>>>>> the batch after a day and start a new batch the next day. At the same
>>>>>>
>>>>>>> time,
>>>>>>>
>>>>>>> I would be able to have early trigger firings every minute as well as
>>>>>>>>
>>>>>>>> drop
>>>>>>>
>>>>>>> any data which is say, 5 mins late. All this within a single day.
>>>>>>>>
>>>>>>>> ~ Bhupesh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> _______________________________________________________
>>>>>>>>
>>>>>>>> Bhupesh Chawda
>>>>>>>>
>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>> There is a discussion in the Flink mailing list about key-based
>>>>>>
>>>>>>> watermarks.
>>>>>>>>
>>>>>>>> I think it's relevant to our use case here.
>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc
>>>>>>>>> c6510ef
>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
>>>>>>>>>
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
>>>>>>>>>
>>>>>>>>> bhup...@datatorrent.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi David,
>>>>>>>>>
>>>>>>>>>> If using time window does not seem appropriate, we can have
>>>>>>>>>>
>>>>>>>>>> another
>>>>>>>>>
>>>>>>>> class
>>>>>>
>>>>>>> which is more suited for such sequential and distinct windows.
>>>>>>>>> Perhaps, a
>>>>>>>>> CustomWindow option can be introduced which takes in a window id.
>>>>>>>>> The
>>>>>>>>>
>>>>>>>> purpose of this window option could be to translate the window id
>>>>>>>>
>>>>>>>>> into
>>>>>>>>>
>>>>>>>> appropriate timestamps.
>>>>>>>>
>>>>>>>>> Another option would be to go with a custom timestampExtractor for
>>>>>>>>>>
>>>>>>>>>> such
>>>>>>>>>
>>>>>>>> tuples which translates the each unique file name to a distinct
>>>>>>>>
>>>>>>>>> timestamp
>>>>>>>>> while using time windows in the windowed operator.
>>>>>>>>>
>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> _______________________________________________________
>>>>>>>>>>
>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>
>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>
>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com>
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>> I now see your rationale on putting the filename in the window.
>>>>>>>>>
>>>>>>>>>> As far as I understand, the reasons why the filename is not part
>>>>>>>>>>>
>>>>>>>>>>> of
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>
>>>>>>>> key
>>>>>>>>>
>>>>>>>>>> and the Global Window is not used are:
>>>>>>>>>>>
>>>>>>>>>>> 1) The files are processed in sequence, not in parallel
>>>>>>>>>>> 2) The windowed operator should not keep the state associated
>>>>>>>>>>>
>>>>>>>>>>> with
>>>>>>>>>>
>>>>>>>>> the
>>>>>>
>>>>>>> file
>>>>>>>>>
>>>>>>>>>> when the processing of the file is done
>>>>>>>>>>> 3) The trigger should be fired for the file when a file is done
>>>>>>>>>>>
>>>>>>>>>>> processing.
>>>>>>>>>>
>>>>>>>>>> However, if the file is just a sequence has nothing to do with a
>>>>>>>>>>>
>>>>>>>>>>> timestamp,
>>>>>>>>>>
>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do
>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>> would
>>>>>>>>
>>>>>>>>> just create confusions to the users, especially when it's used
>>>>>>>>>> as
>>>>>>>>>>
>>>>>>>>> an
>>>>>>
>>>>>>> example for new users.
>>>>>>>>
>>>>>>>>> How about having a separate class called SequenceWindow? And
>>>>>>>>>>>
>>>>>>>>>>> perhaps
>>>>>>>>>>
>>>>>>>>> TimeWindow can inherit from it?
>>>>>>>>
>>>>>>>>> David
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
>>>>>>>>>
>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think my comments related to count based windows might be
>>>>>>>>>>>> causing
>>>>>>>>>>>>
>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now.
>>>>>>>>>
>>>>>>>>>> Just want to make sure we are on the same page wrt. the
>>>>>>>>>>>>>
>>>>>>>>>>>>> "each
>>>>>>>>>>>>
>>>>>>>>>>> file
>>>>>>
>>>>>>> is a
>>>>>>>>>
>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from
>>>>>>>>>>>
>>>>>>>>>>>> the
>>>>>>>>>>>>
>>>>>>>>>>> same
>>>>>>>>
>>>>>>>>> file
>>>>>>>>>>
>>>>>>>>>>> has the same timestamp (which is just a sequence number) and
>>>>>>>>>>>>>
>>>>>>>>>>>>> that
>>>>>>>>>>>>
>>>>>>>>>>> helps
>>>>>>>>
>>>>>>>>> keep tuples from each file in a separate window.
>>>>>>>>>>>
>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a
>>>>>>>>>>>>>
>>>>>>>>>>>> time
>>>>>>>>>>>
>>>>>>>>>> stamp
>>>>>>>>
>>>>>>>>> also, depending on the file naming convention. And if it was
>>>>>>>>>>
>>>>>>>>>>> event
>>>>>>>>>>>
>>>>>>>>>> time
>>>>>>>>
>>>>>>>>> processing, the watermark would be derived from records within
>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>> file.
>>>>>>>>
>>>>>>>>> Agreed, the source should have a mechanism to control the time
>>>>>>>>>>> stamp
>>>>>>>>>>>
>>>>>>>>>> extraction along with everything else pertaining to the
>>>>>>>>>
>>>>>>>>>> watermark
>>>>>>>>>>>
>>>>>>>>>> generation.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to
>>>>>>>>>>>> identify
>>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> timestamp (sequence number) for a file.
>>>>>>>>>>>>
>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>>>
>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>
>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>
>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
>>>>>>>>>>>>>
>>>>>>>>>>>>> t...@apache.org
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>
>>>>>>> I don't think this is a use case for count based window.
>>>>>>>>>>>
>>>>>>>>>>>> We have multiple files that are retrieved in a sequence
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>> there
>>>>>>
>>>>>>> is
>>>>>>>>>
>>>>>>>>>> no
>>>>>>>>>>>
>>>>>>>>>>> knowledge of the number of records per file. The
>>>>>>>>>>>>
>>>>>>>>>>>>> requirement is
>>>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>
>>>>>>>>> aggregate each file separately and emit the aggregate when
>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>> file
>>>>>>>>
>>>>>>>>> is
>>>>>>>>>>
>>>>>>>>>>> read
>>>>>>>>>>>>
>>>>>>>>>>>>> fully. There is no concept of "end of something" for an
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>
>>>>>>>>>>>> key
>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>>>
>>>>>>>>>>>>> global window isn't applicable.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>
>>>>>>>>>>>> can
>>>>>>>>
>>>>>>>>> be
>>>>>>>>>>
>>>>>>>>>> solved using watermark and window (in this case the window
>>>>>>>>>>>
>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>
>>>>>>>>>>>> isn't
>>>>>>>>>>>
>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> david...@gmail.com
>>>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I don't think this is the way to go. Global Window only
>>>>>>>>>>>>
>>>>>>>>>>>>> means
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>
>>>>>>>>> timestamp
>>>>>>>>>>
>>>>>>>>>>> does not matter (or that there is no timestamp). It does
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>
>>>>>>>>>>>>> necessarily
>>>>>>>>
>>>>>>>>> mean it's a large batch. Unless there is some notion of
>>>>>>>>>>>>>> event
>>>>>>>>>>>>>>
>>>>>>>>>>>>> time
>>>>>>>>
>>>>>>>>> for
>>>>>>>>>>>
>>>>>>>>>>>> each
>>>>>>>>>>>>>
>>>>>>>>>>>>>> file, you don't want to embed the file into the window
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> If you want the result broken up by file name, and if
>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>> files
>>>>>>
>>>>>>> are
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>> processed in parallel, I think making the file name be
>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>
>>>>>>>>>>>>> of
>>>>>>>>
>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> key
>>>>>>>>>>>
>>>>>>>>>>>> is
>>>>>>>>>>>>>
>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow
>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>
>>>>>>>>> file
>>>>>>>>>>
>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>>>>>>>> be part of the window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> you're
>>>>>>>>>>>>>>
>>>>>>>>>>>>> welcome
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> add
>>>>>>>>>>>>>
>>>>>>>>>>>>>> that feature. In case of count-based windows, there
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>
>>>>>>>>>>>>> be
>>>>>>
>>>>>>> no
>>>>>>>>
>>>>>>>> notion
>>>>>>>>>
>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>> time and you probably only trigger at the end of each
>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> In
>>>>>>>>
>>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> case
>>>>>>>>>>>>
>>>>>>>>>>>>> of count-based windows, the watermark only matters for
>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>
>>>>>>>>>>>>> since
>>>>>>>>
>>>>>>>>> you
>>>>>>>>>>>
>>>>>>>>>>>> need
>>>>>>>>>>>>>
>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 10,
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>
>>>>>>>> number
>>>>>>>>>
>>>>>>>>>> of
>>>>>>>>>>>>>
>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to
>>>>>>>>>>>>>> end
>>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>
>>>>>>>> last
>>>>>>>>>
>>>>>>>>>> window
>>>>>>>>>>>>
>>>>>>>>>>>>> with 5 tuples).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The wordcount example that I created based on the
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> operator
>>>>>>>>
>>>>>>>>> does
>>>>>>>>>>>>
>>>>>>>>>>>>> processing of word counts per file (each file as a
>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> batch),
>>>>>>>>
>>>>>>>>> i.e.
>>>>>>>>>>>>
>>>>>>>>>>>>> process counts for each file and dump into separate
>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As I understand Global window is for one large batch;
>>>>>>>>
>>>>>>>>> i.e.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> all
>>>>>>>>
>>>>>>>>> incoming
>>>>>>>>>>
>>>>>>>>>>> data falls into the same batch. This could not be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> processed
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> using
>>>>>>>>
>>>>>>>>> GlobalWindow option as we need more than one windows.
>>>>>>>>>>>
>>>>>>>>>>>> In
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>>
>>>>>>> case, I
>>>>>>>>>
>>>>>>>>>> configured the windowed operator to have time windows
>>>>>>>>>>>>>
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1ms
>>>>>>
>>>>>>> each
>>>>>>>>>
>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>>> passed data for each file with increasing timestamps:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> (file1,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1),
>>>>>>>>>
>>>>>>>>>> (file2,
>>>>>>>>>>>
>>>>>>>>>>>> 2) and so on. Is there a better way of handling this
>>>>>>>>>>>>>>> scenario?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is
>>>>>>>>>>
>>>>>>>>>>> a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> trigger
>>>>>>
>>>>>>> option
>>>>>>>>>>
>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> process count based windows. In case I want to process
>>>>>>>>>>>>>>> every
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1000
>>>>>>>>>
>>>>>>>>>> tuples
>>>>>>>>>>>
>>>>>>>>>>>> as
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> a batch, I could set the Trigger option to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> CountTrigger
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> with
>>>>>>
>>>>>>> the
>>>>>>>>>
>>>>>>>>>> accumulation set to Discarding. Is this correct?
>>>>>>>>>>>
>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Global
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> window.
>>>>>>>>
>>>>>>>>> ​~ Bhupesh​
>>>>>>>>>>>>
>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>
>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> david...@gmail.com>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm worried that we are making the watermark concept
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> complicated.
>>>>>>>>
>>>>>>>>> Watermarks should simply just tell you what windows
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>
>>>>>>>> considered
>>>>>>>>>
>>>>>>>>>> complete.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Point 2 is basically a count-based window.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Watermarks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> do
>>>>>>
>>>>>>> not
>>>>>>>>
>>>>>>>>> play a
>>>>>>>>>>
>>>>>>>>>>> role
>>>>>>>>>>>>>
>>>>>>>>>>>>>> here because the window is always complete at the
>>>>>>>>>>>>>>>> n-th
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> tuple.
>>>>>>
>>>>>>> If I understand correctly, point 3 is for batch
>>>>>>>>>>
>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> of
>>>>>>>>>
>>>>>>>>> files.
>>>>>>>>>>
>>>>>>>>>>> Unless
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> can
>>>>>>>>
>>>>>>>>> be
>>>>>>>>>>
>>>>>>>>>>> achieved
>>>>>>>>>>>>
>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a
>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> with
>>>>>>>>
>>>>>>>>> a
>>>>>>>>>>>
>>>>>>>>>>> +infinity
>>>>>>>>>>>>
>>>>>>>>>>>>> timestamp can be used so that triggers will be fired
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> upon
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> receipt
>>>>>>>>
>>>>>>>>> of
>>>>>>>>>>>>
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> watermark.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> achieved
>>>>>>
>>>>>>> with a
>>>>>>>>>>>
>>>>>>>>>>>> watermark with a +infinity timestamp.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For an input operator which is supposed to
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> watermarks
>>>>>>
>>>>>>> for
>>>>>>>>>>>>
>>>>>>>>>>>>> downstream operators, I can think about the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> watermarks
>>>>>>>>
>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> operator can emit:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> watermark)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n
>>>>>>>>>>>
>>>>>>>>>>>> tuples)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file)
>>>>>>
>>>>>>> 4. Final watermark
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> (file
>>>>>>>>
>>>>>>>>> based)
>>>>>>>>>>
>>>>>>>>>>> as
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> well,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and hence I thought of looking at these first.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Does
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> this
>>>>>>
>>>>>>> seem
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> line
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> with the thought process?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>
>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> t...@apache.org
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think this should be designed based on a
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> simplistic
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> file
>>>>>>>>>>>
>>>>>>>>>>>> input-output scenario. It would be good to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> include a
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> stateful
>>>>>>>>
>>>>>>>>> transformation based on event time.
>>>>>>>>>>>>
>>>>>>>>>>>>> More complex pipelines contain stateful
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> transformations
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that
>>>>>>>>>
>>>>>>>>>> depend
>>>>>>>>>>>>
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> concept
>>>>>>>>
>>>>>>>>> that
>>>>>>>>>>>>
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> increasing
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> sequence)
>>>>>>>>>>
>>>>>>>>>>> that
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> other operators can generically work with.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Note that even file input in many cases can
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> produce
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> time
>>>>>>>>
>>>>>>>>> based
>>>>>>>>>>
>>>>>>>>>>> watermarks,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> for example when you read part files that are
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> bound
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> by
>>>>>>>>
>>>>>>>> event
>>>>>>>>>
>>>>>>>>>> time.
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>
>>>>>>>
>>>>>>>>>>>>>>>>
>

Reply via email to