Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to the 
actual type of InputT at runtime. Introducing an operator just to do a cast is 
not a good design decision, IMO.

Thank you,
Vlad

Отправлено с iPhone

> On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote:
> 
> I am using WindowedOperatorImpl and it is declared as follows.
> 
> WindowedOperatorImpl<InputT, AccumulationType, OutputType> windowedOperator
> = new WindowedOperatorImpl<>();
> 
> In my application scenario, the InputT is Customer POJO which is getting
> output as an Object by CsvParser.
> 
> 
> Ajay
> 
> On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com>
> wrote:
> 
>> How do you declare WindowedOperator?
>> 
>> Thank you,
>> 
>> Vlad
>> 
>> 
>>> On 4/28/17 10:35, AJAY GUPTA wrote:
>>> 
>>> Vlad,
>>> 
>>> The approach you suggested doesn't work because the CSVParser outputs
>>> Object Data Type irrespective of the POJO class being emitted.
>>> 
>>> 
>>> Ajay
>>> 
>>> On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com>
>>> wrote:
>>> 
>>> Make your POJO class implement WindowedOperator Tuple interface (it may
>>>> return itself in getValue()).
>>>> 
>>>> Thank you,
>>>> 
>>>> Vlad
>>>> 
>>>> On 4/28/17 02:44, AJAY GUPTA wrote:
>>>> 
>>>> Hi All,
>>>>> 
>>>>> I am creating an application which is using Windowed Operator. This
>>>>> application involves CsvParser operator emitting a POJO object which is
>>>>> to
>>>>> be passed as input to WindowedOperator. The WindowedOperator requires an
>>>>> instance of Tuple class as input :
>>>>> *public final transient DefaultInputPort<Tuple<InputT>>
>>>>> input = new DefaultInputPort<Tuple<InputT>>() *
>>>>> 
>>>>> Due to this, the addStream cannot work as the output of CsvParser's
>>>>> output
>>>>> port is not compatible with input port type of WindowedOperator.
>>>>> One way to solve this problem is to have an operator between the above
>>>>> two
>>>>> operators as a convertor.
>>>>> I would like to know if there is any other more generic approach to
>>>>> solve
>>>>> this problem without writing a new Operator for every new application
>>>>> using
>>>>> Windowed Operators.
>>>>> 
>>>>> Thanks,
>>>>> Ajay
>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
>>>>> bhup...@datatorrent.com>
>>>>> wrote:
>>>>> 
>>>>> Hi All,
>>>>> 
>>>>>> I think we have some agreement on the way we should use control tuples
>>>>>> for
>>>>>> File I/O operators to support batch.
>>>>>> 
>>>>>> In order to have more operators in Malhar, support this paradigm, I
>>>>>> think
>>>>>> we should also look at store operators - JDBC, Cassandra, HBase etc.
>>>>>> The case with these operators is simpler as most of these do not poll
>>>>>> the
>>>>>> sources (except JDBC poller operator) and just stop once they have
>>>>>> read a
>>>>>> fixed amount of data. In other words, these are inherently batch
>>>>>> sources.
>>>>>> The only change that we should add to these operators is to shut down
>>>>>> the
>>>>>> DAG once the reading of data is done. For a windowed operator this
>>>>>> would
>>>>>> mean a Global window with a final watermark before the DAG is shut
>>>>>> down.
>>>>>> 
>>>>>> ~ Bhupesh
>>>>>> 
>>>>>> 
>>>>>> _______________________________________________________
>>>>>> 
>>>>>> Bhupesh Chawda
>>>>>> 
>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>> 
>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
>>>>>> bhup...@datatorrent.com>
>>>>>> wrote:
>>>>>> 
>>>>>> Hi Thomas,
>>>>>> 
>>>>>>> Even though the windowing operator is not just "event time", it seems
>>>>>>> it
>>>>>>> is too much dependent on the "time" attribute of the incoming tuple.
>>>>>>> This
>>>>>>> is the reason we had to model the file index as a timestamp to solve
>>>>>>> the
>>>>>>> batch case for files.
>>>>>>> Perhaps we should work on increasing the scope of the windowed
>>>>>>> operator
>>>>>>> 
>>>>>>> to
>>>>>> 
>>>>>> consider other types of windows as well. The Sequence option suggested
>>>>>>> by
>>>>>>> David seems to be something in that direction.
>>>>>>> 
>>>>>>> ~ Bhupesh
>>>>>>> 
>>>>>>> 
>>>>>>> _______________________________________________________
>>>>>>> 
>>>>>>> Bhupesh Chawda
>>>>>>> 
>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>> 
>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> That's correct, we are looking at a generalized approach for state
>>>>>>> 
>>>>>>>> management vs. a series of special cases.
>>>>>>>> 
>>>>>>>> And to be clear, windowing does not imply event time, otherwise it
>>>>>>>> would
>>>>>>>> be
>>>>>>>> "EventTimeOperator" :-)
>>>>>>>> 
>>>>>>>> Thomas
>>>>>>>> 
>>>>>>>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
>>>>>>>> 
>>>>>>>> bhup...@datatorrent.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi David,
>>>>>>>> 
>>>>>>>>> I went through the discussion, but it seems like it is more on the
>>>>>>>>> 
>>>>>>>>> event
>>>>>>>> 
>>>>>>> time watermark handling as opposed to batches. What we are trying to
>>>>>>> 
>>>>>>>> do
>>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>>> have watermarks serve the purpose of demarcating batches using
>>>>>>>>> control
>>>>>>>>> tuples. Since each batch is separate from others, we would like to
>>>>>>>>> 
>>>>>>>>> have
>>>>>>>> 
>>>>>>> stateful processing within a batch, but not across batches.
>>>>>>> 
>>>>>>>> At the same time, we would like to do this in a manner which is
>>>>>>>>> 
>>>>>>>>> consistent
>>>>>>>> 
>>>>>>>> with the windowing mechanism provided by the windowed operator. This
>>>>>>>>> 
>>>>>>>>> will
>>>>>>>> 
>>>>>>>> allow us to treat a single batch as a (bounded) stream and apply all
>>>>>>>>> 
>>>>>>>>> the
>>>>>>>> 
>>>>>>> event time windowing concepts in that time span.
>>>>>>> 
>>>>>>>> For example, let's say I need to process data for a day (24 hours) as
>>>>>>>>> 
>>>>>>>>> a
>>>>>>>> 
>>>>>>> single batch. The application is still streaming in nature: it would
>>>>>>> 
>>>>>>>> end
>>>>>>>> 
>>>>>>> the batch after a day and start a new batch the next day. At the same
>>>>>>> 
>>>>>>>> time,
>>>>>>>> 
>>>>>>>> I would be able to have early trigger firings every minute as well as
>>>>>>>>> 
>>>>>>>>> drop
>>>>>>>> 
>>>>>>>> any data which is say, 5 mins late. All this within a single day.
>>>>>>>>> 
>>>>>>>>> ~ Bhupesh
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> _______________________________________________________
>>>>>>>>> 
>>>>>>>>> Bhupesh Chawda
>>>>>>>>> 
>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>> 
>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>> 
>>>>>>> There is a discussion in the Flink mailing list about key-based
>>>>>>> 
>>>>>>>> watermarks.
>>>>>>>>> 
>>>>>>>>> I think it's relevant to our use case here.
>>>>>>>>>> https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc
>>>>>>>>>> c6510ef
>>>>>>>>>> 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
>>>>>>>>>> 
>>>>>>>>>> David
>>>>>>>>>> 
>>>>>>>>>> On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
>>>>>>>>>> 
>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi David,
>>>>>>>>>> 
>>>>>>>>>>> If using time window does not seem appropriate, we can have
>>>>>>>>>>> 
>>>>>>>>>>> another
>>>>>>>>>> 
>>>>>>>>> class
>>>>>>> 
>>>>>>>> which is more suited for such sequential and distinct windows.
>>>>>>>>>> Perhaps, a
>>>>>>>>>> CustomWindow option can be introduced which takes in a window id.
>>>>>>>>>> The
>>>>>>>>>> 
>>>>>>>>> purpose of this window option could be to translate the window id
>>>>>>>>> 
>>>>>>>>>> into
>>>>>>>>>> 
>>>>>>>>> appropriate timestamps.
>>>>>>>>> 
>>>>>>>>>> Another option would be to go with a custom timestampExtractor for
>>>>>>>>>>> 
>>>>>>>>>>> such
>>>>>>>>>> 
>>>>>>>>> tuples which translates the each unique file name to a distinct
>>>>>>>>> 
>>>>>>>>>> timestamp
>>>>>>>>>> while using time windows in the windowed operator.
>>>>>>>>>> 
>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>> 
>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>> 
>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>> 
>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com>
>>>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>> I now see your rationale on putting the filename in the window.
>>>>>>>>>> 
>>>>>>>>>>> As far as I understand, the reasons why the filename is not part
>>>>>>>>>>>> 
>>>>>>>>>>>> of
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>> key
>>>>>>>>>> 
>>>>>>>>>>> and the Global Window is not used are:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1) The files are processed in sequence, not in parallel
>>>>>>>>>>>> 2) The windowed operator should not keep the state associated
>>>>>>>>>>>> 
>>>>>>>>>>>> with
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>> 
>>>>>>>> file
>>>>>>>>>> 
>>>>>>>>>>> when the processing of the file is done
>>>>>>>>>>>> 3) The trigger should be fired for the file when a file is done
>>>>>>>>>>>> 
>>>>>>>>>>>> processing.
>>>>>>>>>>> 
>>>>>>>>>>> However, if the file is just a sequence has nothing to do with a
>>>>>>>>>>>> 
>>>>>>>>>>>> timestamp,
>>>>>>>>>>> 
>>>>>>>>>>> assigning a timestamp to a file is not an intuitive thing to do
>>>>>>>>>>>> 
>>>>>>>>>>>> and
>>>>>>>>>>> 
>>>>>>>>>> would
>>>>>>>>> 
>>>>>>>>>> just create confusions to the users, especially when it's used
>>>>>>>>>>> as
>>>>>>>>>>> 
>>>>>>>>>> an
>>>>>>> 
>>>>>>>> example for new users.
>>>>>>>>> 
>>>>>>>>>> How about having a separate class called SequenceWindow? And
>>>>>>>>>>>> 
>>>>>>>>>>>> perhaps
>>>>>>>>>>> 
>>>>>>>>>> TimeWindow can inherit from it?
>>>>>>>>> 
>>>>>>>>>> David
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org>
>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>> On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
>>>>>>>>>> 
>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> I think my comments related to count based windows might be
>>>>>>>>>>>>> causing
>>>>>>>>>>>>> 
>>>>>>>>>>>> confusion. Let's not discuss count based scenarios for now.
>>>>>>>>>> 
>>>>>>>>>>> Just want to make sure we are on the same page wrt. the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> "each
>>>>>>>>>>>>> 
>>>>>>>>>>>> file
>>>>>>> 
>>>>>>>> is a
>>>>>>>>>> 
>>>>>>>>>>> batch" use case. As mentioned by Thomas, the each tuple from
>>>>>>>>>>>> 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>> same
>>>>>>>>> 
>>>>>>>>>> file
>>>>>>>>>>> 
>>>>>>>>>>>> has the same timestamp (which is just a sequence number) and
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>> 
>>>>>>>>>>>> helps
>>>>>>>>> 
>>>>>>>>>> keep tuples from each file in a separate window.
>>>>>>>>>>>> 
>>>>>>>>>>>>> Yes, in this case it is a sequence number, but it could be a
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> time
>>>>>>>>>>>> 
>>>>>>>>>>> stamp
>>>>>>>>> 
>>>>>>>>>> also, depending on the file naming convention. And if it was
>>>>>>>>>>> 
>>>>>>>>>>>> event
>>>>>>>>>>>> 
>>>>>>>>>>> time
>>>>>>>>> 
>>>>>>>>>> processing, the watermark would be derived from records within
>>>>>>>>>>> 
>>>>>>>>>>>> the
>>>>>>>>>>>> 
>>>>>>>>>>> file.
>>>>>>>>> 
>>>>>>>>>> Agreed, the source should have a mechanism to control the time
>>>>>>>>>>>> stamp
>>>>>>>>>>>> 
>>>>>>>>>>> extraction along with everything else pertaining to the
>>>>>>>>>> 
>>>>>>>>>>> watermark
>>>>>>>>>>>> 
>>>>>>>>>>> generation.
>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>>>> We could also implement a "timestampExtractor" interface to
>>>>>>>>>>>>> identify
>>>>>>>>>>>>> 
>>>>>>>>>>>> the
>>>>>>>>>>> 
>>>>>>>>>>>> timestamp (sequence number) for a file.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> _______________________________________________________
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> t...@apache.org
>>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>> 
>>>>>>>> I don't think this is a use case for count based window.
>>>>>>>>>>>> 
>>>>>>>>>>>>> We have multiple files that are retrieved in a sequence
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> there
>>>>>>> 
>>>>>>>> is
>>>>>>>>>> 
>>>>>>>>>>> no
>>>>>>>>>>>> 
>>>>>>>>>>>> knowledge of the number of records per file. The
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> requirement is
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> to
>>>>>>>>> 
>>>>>>>>>> aggregate each file separately and emit the aggregate when
>>>>>>>>>>> 
>>>>>>>>>>>> the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> file
>>>>>>>>> 
>>>>>>>>>> is
>>>>>>>>>>> 
>>>>>>>>>>>> read
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> fully. There is no concept of "end of something" for an
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> key
>>>>>>>>>>> 
>>>>>>>>>>>> and
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> global window isn't applicable.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> However, as already explained and implemented by Bhupesh,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> can
>>>>>>>>> 
>>>>>>>>>> be
>>>>>>>>>>> 
>>>>>>>>>>> solved using watermark and window (in this case the window
>>>>>>>>>>>> 
>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> isn't
>>>>>>>>>>>> 
>>>>>>>>>>>>> a timestamp, but a file sequence, but that doesn't matter.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> david...@gmail.com
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> I don't think this is the way to go. Global Window only
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>>> timestamp
>>>>>>>>>>> 
>>>>>>>>>>>> does not matter (or that there is no timestamp). It does
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> necessarily
>>>>>>>>> 
>>>>>>>>>> mean it's a large batch. Unless there is some notion of
>>>>>>>>>>>>>>> event
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> time
>>>>>>>>> 
>>>>>>>>>> for
>>>>>>>>>>>> 
>>>>>>>>>>>>> each
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> file, you don't want to embed the file into the window
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> If you want the result broken up by file name, and if
>>>>>>>>>> 
>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> files
>>>>>>> 
>>>>>>>> are
>>>>>>>>>> 
>>>>>>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>>>> be
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> processed in parallel, I think making the file name be
>>>>>>>>>>>>>>> part
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> of
>>>>>>>>> 
>>>>>>>>> the
>>>>>>>>>> 
>>>>>>>>>>> key
>>>>>>>>>>>> 
>>>>>>>>>>>>> is
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the way to go. I think it's very confusing if we somehow
>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>>> file
>>>>>>>>>>> 
>>>>>>>>>>>> to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> be part of the window.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> For count-based window, it's not implemented yet and
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> you're
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> welcome
>>>>>>>>> 
>>>>>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>>>> add
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> that feature. In case of count-based windows, there
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> be
>>>>>>> 
>>>>>>>> no
>>>>>>>>> 
>>>>>>>>> notion
>>>>>>>>>> 
>>>>>>>>>>> of
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> time and you probably only trigger at the end of each
>>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> In
>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>>>> 
>>>>>>>>>>>> case
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> of count-based windows, the watermark only matters for
>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> since
>>>>>>>>> 
>>>>>>>>>> you
>>>>>>>>>>>> 
>>>>>>>>>>>>> need
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> a way to know when the batch has ended (if the count is
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 10,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>> number
>>>>>>>>>> 
>>>>>>>>>>> of
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> tuples in the batch is let's say 105, you need a way to
>>>>>>>>>>>>>>> end
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>> 
>>>>>>>>> last
>>>>>>>>>> 
>>>>>>>>>>> window
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> with 5 tuples).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hi David,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks for your comments.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> The wordcount example that I created based on the
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> windowed
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> operator
>>>>>>>>> 
>>>>>>>>>> does
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> processing of word counts per file (each file as a
>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> batch),
>>>>>>>>> 
>>>>>>>>>> i.e.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> process counts for each file and dump into separate
>>>>>>>>>>>>>>>> files.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> As I understand Global window is for one large batch;
>>>>>>>>> 
>>>>>>>>>> i.e.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> all
>>>>>>>>> 
>>>>>>>>>> incoming
>>>>>>>>>>> 
>>>>>>>>>>>> data falls into the same batch. This could not be
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> processed
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> using
>>>>>>>>> 
>>>>>>>>>> GlobalWindow option as we need more than one windows.
>>>>>>>>>>>> 
>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> this
>>>>>>> 
>>>>>>>> case, I
>>>>>>>>>> 
>>>>>>>>>>> configured the windowed operator to have time windows
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1ms
>>>>>>> 
>>>>>>>> each
>>>>>>>>>> 
>>>>>>>>>>> and
>>>>>>>>>>>> 
>>>>>>>>>>>>> passed data for each file with increasing timestamps:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> (file1,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1),
>>>>>>>>>> 
>>>>>>>>>>> (file2,
>>>>>>>>>>>> 
>>>>>>>>>>>>> 2) and so on. Is there a better way of handling this
>>>>>>>>>>>>>>>> scenario?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Regarding (2 - count based windows), I think there is
>>>>>>>>>>> 
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> trigger
>>>>>>> 
>>>>>>>> option
>>>>>>>>>>> 
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> process count based windows. In case I want to process
>>>>>>>>>>>>>>>> every
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 1000
>>>>>>>>>> 
>>>>>>>>>>> tuples
>>>>>>>>>>>> 
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> a batch, I could set the Trigger option to
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> CountTrigger
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> with
>>>>>>> 
>>>>>>>> the
>>>>>>>>>> 
>>>>>>>>>>> accumulation set to Discarding. Is this correct?
>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree that (4. Final Watermark) can be done using
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Global
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> window.
>>>>>>>>> 
>>>>>>>>>> ​~ Bhupesh​
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>> 
>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> david...@gmail.com>
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> I'm worried that we are making the watermark concept
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> complicated.
>>>>>>>>> 
>>>>>>>>>> Watermarks should simply just tell you what windows
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> be
>>>>>>>>> 
>>>>>>>>> considered
>>>>>>>>>> 
>>>>>>>>>>> complete.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Point 2 is basically a count-based window.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Watermarks
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> do
>>>>>>> 
>>>>>>>> not
>>>>>>>>> 
>>>>>>>>>> play a
>>>>>>>>>>> 
>>>>>>>>>>>> role
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> here because the window is always complete at the
>>>>>>>>>>>>>>>>> n-th
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> tuple.
>>>>>>> 
>>>>>>>> If I understand correctly, point 3 is for batch
>>>>>>>>>>> 
>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> of
>>>>>>>>>> 
>>>>>>>>>> files.
>>>>>>>>>>> 
>>>>>>>>>>>> Unless
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the files contain timed events, it sounds to be that
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> can
>>>>>>>>> 
>>>>>>>>>> be
>>>>>>>>>>> 
>>>>>>>>>>>> achieved
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> with just a Global Window. For signaling EOF, a
>>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> with
>>>>>>>>> 
>>>>>>>>>> a
>>>>>>>>>>>> 
>>>>>>>>>>>> +infinity
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> timestamp can be used so that triggers will be fired
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> upon
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> receipt
>>>>>>>>> 
>>>>>>>>>> of
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> watermark.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> For point 4, just like what I mentioned above, can
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> achieved
>>>>>>> 
>>>>>>>> with a
>>>>>>>>>>>> 
>>>>>>>>>>>>> watermark with a +infinity timestamp.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> For an input operator which is supposed to
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> watermarks
>>>>>>> 
>>>>>>>> for
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> downstream operators, I can think about the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> watermarks
>>>>>>>>> 
>>>>>>>>>> that
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> operator can emit:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1. Time based watermarks (the high watermark / low
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> watermark)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 2. Number of tuple based watermarks (Every n
>>>>>>>>>>>> 
>>>>>>>>>>>>> tuples)
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 3. File based watermarks (Start file, end file)
>>>>>>> 
>>>>>>>> 4. Final watermark
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> File based watermarks seem to be applicable for
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> (file
>>>>>>>>> 
>>>>>>>>>> based)
>>>>>>>>>>> 
>>>>>>>>>>>> as
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> well,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> and hence I thought of looking at these first.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Does
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> this
>>>>>>> 
>>>>>>>> seem
>>>>>>>>>> 
>>>>>>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>>>> be
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> line
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> with the thought process?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> ~ Bhupesh
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> ______________________________
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> _________________________
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Bhupesh Chawda
>>>>>>>>>> 
>>>>>>>>>>> Software Engineer
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> t...@apache.org
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I don't think this should be designed based on a
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> simplistic
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> file
>>>>>>>>>>>> 
>>>>>>>>>>>>> input-output scenario. It would be good to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> include a
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> stateful
>>>>>>>>> 
>>>>>>>>>> transformation based on event time.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> More complex pipelines contain stateful
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> transformations
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>> 
>>>>>>>>>>> depend
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> windowing and watermarks. I think we need a
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> watermark
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> concept
>>>>>>>>> 
>>>>>>>>>> that
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> on progress in event time (or other monotonic
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> increasing
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> sequence)
>>>>>>>>>>> 
>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> other operators can generically work with.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Note that even file input in many cases can
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> produce
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> time
>>>>>>>>> 
>>>>>>>>>> based
>>>>>>>>>>> 
>>>>>>>>>>>> watermarks,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> for example when you read part files that are
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> bound
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> by
>>>>>>>>> 
>>>>>>>>> event
>>>>>>>>>> 
>>>>>>>>>>> time.
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> bhup...@datatorrent.com
>>>>>>> 
>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>> 

Reply via email to