How do you declare WindowedOperator?

Thank you,

Vlad

On 4/28/17 10:35, AJAY GUPTA wrote:
Vlad,

The approach you suggested doesn't work because the CSVParser outputs
Object Data Type irrespective of the POJO class being emitted.


Ajay

On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

Make your POJO class implement WindowedOperator Tuple interface (it may
return itself in getValue()).

Thank you,

Vlad

On 4/28/17 02:44, AJAY GUPTA wrote:

Hi All,

I am creating an application which is using Windowed Operator. This
application involves CsvParser operator emitting a POJO object which is to
be passed as input to WindowedOperator. The WindowedOperator requires an
instance of Tuple class as input :
*public final transient DefaultInputPort<Tuple<InputT>>
input = new DefaultInputPort<Tuple<InputT>>() *

Due to this, the addStream cannot work as the output of CsvParser's output
port is not compatible with input port type of WindowedOperator.
One way to solve this problem is to have an operator between the above two
operators as a convertor.
I would like to know if there is any other more generic approach to solve
this problem without writing a new Operator for every new application
using
Windowed Operators.

Thanks,
Ajay



On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

Hi All,
I think we have some agreement on the way we should use control tuples
for
File I/O operators to support batch.

In order to have more operators in Malhar, support this paradigm, I think
we should also look at store operators - JDBC, Cassandra, HBase etc.
The case with these operators is simpler as most of these do not poll the
sources (except JDBC poller operator) and just stop once they have read a
fixed amount of data. In other words, these are inherently batch sources.
The only change that we should add to these operators is to shut down the
DAG once the reading of data is done. For a windowed operator this would
mean a Global window with a final watermark before the DAG is shut down.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
bhup...@datatorrent.com>
wrote:

Hi Thomas,
Even though the windowing operator is not just "event time", it seems it
is too much dependent on the "time" attribute of the incoming tuple.
This
is the reason we had to model the file index as a timestamp to solve the
batch case for files.
Perhaps we should work on increasing the scope of the windowed operator

to

consider other types of windows as well. The Sequence option suggested
by
David seems to be something in that direction.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> wrote:

That's correct, we are looking at a generalized approach for state
management vs. a series of special cases.

And to be clear, windowing does not imply event time, otherwise it
would
be
"EventTimeOperator" :-)

Thomas

On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <

bhup...@datatorrent.com>
wrote:
Hi David,
I went through the discussion, but it seems like it is more on the

event
time watermark handling as opposed to batches. What we are trying to
do
is
have watermarks serve the purpose of demarcating batches using control
tuples. Since each batch is separate from others, we would like to

have
stateful processing within a batch, but not across batches.
At the same time, we would like to do this in a manner which is

consistent

with the windowing mechanism provided by the windowed operator. This

will

allow us to treat a single batch as a (bounded) stream and apply all

the
event time windowing concepts in that time span.
For example, let's say I need to process data for a day (24 hours) as

a
single batch. The application is still streaming in nature: it would
end
the batch after a day and start a new batch the next day. At the same
time,

I would be able to have early trigger firings every minute as well as

drop

any data which is say, 5 mins late. All this within a single day.

~ Bhupesh



_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>

wrote:
There is a discussion in the Flink mailing list about key-based
watermarks.

I think it's relevant to our use case here.
https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E

David

On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:
Hi David,
If using time window does not seem appropriate, we can have

another
class
which is more suited for such sequential and distinct windows.
Perhaps, a
CustomWindow option can be introduced which takes in a window id.
The
purpose of this window option could be to translate the window id
into
appropriate timestamps.
Another option would be to go with a custom timestampExtractor for

such
tuples which translates the each unique file name to a distinct
timestamp
while using time windows in the windowed operator.
~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com>

wrote:
I now see your rationale on putting the filename in the window.
As far as I understand, the reasons why the filename is not part

of
the

key
and the Global Window is not used are:

1) The files are processed in sequence, not in parallel
2) The windowed operator should not keep the state associated

with
the
file
when the processing of the file is done
3) The trigger should be fired for the file when a file is done

processing.

However, if the file is just a sequence has nothing to do with a

timestamp,

assigning a timestamp to a file is not an intuitive thing to do

and
would
just create confusions to the users, especially when it's used
as
an
example for new users.
How about having a separate class called SequenceWindow? And

perhaps
TimeWindow can inherit from it?
David

On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org>

wrote:
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
bhup...@datatorrent.com
wrote:
I think my comments related to count based windows might be
causing
confusion. Let's not discuss count based scenarios for now.
Just want to make sure we are on the same page wrt. the

"each
file
is a
batch" use case. As mentioned by Thomas, the each tuple from
the
same
file
has the same timestamp (which is just a sequence number) and

that
helps
keep tuples from each file in a separate window.
Yes, in this case it is a sequence number, but it could be a
time
stamp
also, depending on the file naming convention. And if it was
event
time
processing, the watermark would be derived from records within
the
file.
Agreed, the source should have a mechanism to control the time
stamp
extraction along with everything else pertaining to the
watermark
generation.

We could also implement a "timestampExtractor" interface to
identify
the
timestamp (sequence number) for a file.
~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <

t...@apache.org
wrote:
I don't think this is a use case for count based window.
We have multiple files that are retrieved in a sequence

and
there
is
no

knowledge of the number of records per file. The
requirement is
to
aggregate each file separately and emit the aggregate when
the
file
is
read
fully. There is no concept of "end of something" for an

individual
key
and
global window isn't applicable.

However, as already explained and implemented by Bhupesh,

this
can
be

solved using watermark and window (in this case the window
timestamp
isn't
a timestamp, but a file sequence, but that doesn't matter.

Thomas


On Mon, Feb 27, 2017 at 8:05 AM, David Yan <

david...@gmail.com
wrote:
I don't think this is the way to go. Global Window only
means
the
timestamp
does not matter (or that there is no timestamp). It does

not
necessarily
mean it's a large batch. Unless there is some notion of
event
time
for
each
file, you don't want to embed the file into the window

itself.
If you want the result broken up by file name, and if
the
files
are
to
be

processed in parallel, I think making the file name be
part
of

the
key
is

the way to go. I think it's very confusing if we somehow
make
the
file
to

be part of the window.
For count-based window, it's not implemented yet and

you're
welcome
to
add
that feature. In case of count-based windows, there

would
be
no

notion
of

time and you probably only trigger at the end of each
window.
In
the
case
of count-based windows, the watermark only matters for
batch
since
you
need
a way to know when the batch has ended (if the count is

10,
the

number
of

tuples in the batch is let's say 105, you need a way to
end
the

last
window
with 5 tuples).

David

On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:
Hi David,
Thanks for your comments.

The wordcount example that I created based on the

windowed
operator
does
processing of word counts per file (each file as a
separate
batch),
i.e.
process counts for each file and dump into separate
files.
As I understand Global window is for one large batch;
i.e.
all
incoming
data falls into the same batch. This could not be
processed
using
GlobalWindow option as we need more than one windows.
In
this
case, I
configured the windowed operator to have time windows
of
1ms
each
and
passed data for each file with increasing timestamps:
(file1,
1),
(file2,
2) and so on. Is there a better way of handling this
scenario?
Regarding (2 - count based windows), I think there is
a
trigger
option
to

process count based windows. In case I want to process
every
1000
tuples
as

a batch, I could set the Trigger option to

CountTrigger
with
the
accumulation set to Discarding. Is this correct?
I agree that (4. Final Watermark) can be done using

Global
window.
​~ Bhupesh​
______________________________

_________________________
Bhupesh Chawda
E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 12:18 PM, David Yan <

david...@gmail.com>
wrote:
I'm worried that we are making the watermark concept
too
complicated.
Watermarks should simply just tell you what windows
can
be

considered
complete.
Point 2 is basically a count-based window.

Watermarks
do
not
play a
role
here because the window is always complete at the
n-th
tuple.
If I understand correctly, point 3 is for batch
processing
of

files.
Unless
the files contain timed events, it sounds to be that

this
can
be
achieved
with just a Global Window. For signaling EOF, a
watermark
with
a

+infinity
timestamp can be used so that triggers will be fired

upon
receipt
of
that
watermark.
For point 4, just like what I mentioned above, can

be
achieved
with a
watermark with a +infinity timestamp.
David




On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:
Hi Thomas,
For an input operator which is supposed to

generate
watermarks
for
downstream operators, I can think about the
following
watermarks
that
the
operator can emit:
1. Time based watermarks (the high watermark / low

watermark)
2. Number of tuple based watermarks (Every n
tuples)
3. File based watermarks (Start file, end file)
4. Final watermark

File based watermarks seem to be applicable for

batch
(file
based)
as

well,
and hence I thought of looking at these first.

Does
this
seem
to
be

in
line
with the thought process?

~ Bhupesh



______________________________

_________________________
Bhupesh Chawda
Software Engineer

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <

t...@apache.org
wrote:
I don't think this should be designed based on a
simplistic
file
input-output scenario. It would be good to
include a
stateful
transformation based on event time.
More complex pipelines contain stateful

transformations
that
depend
on
windowing and watermarks. I think we need a
watermark
concept
that
is

based
on progress in event time (or other monotonic

increasing
sequence)
that
other operators can generically work with.
Note that even file input in many cases can

produce
time
based
watermarks,
for example when you read part files that are

bound
by

event
time.
Thanks,
Thomas


On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda

<
bhup...@datatorrent.com


Reply via email to