Make your POJO class implement WindowedOperator Tuple interface (it may return itself in getValue()).

Thank you,

Vlad

On 4/28/17 02:44, AJAY GUPTA wrote:
Hi All,

I am creating an application which is using Windowed Operator. This
application involves CsvParser operator emitting a POJO object which is to
be passed as input to WindowedOperator. The WindowedOperator requires an
instance of Tuple class as input :
*public final transient DefaultInputPort<Tuple<InputT>>
input = new DefaultInputPort<Tuple<InputT>>() *

Due to this, the addStream cannot work as the output of CsvParser's output
port is not compatible with input port type of WindowedOperator.
One way to solve this problem is to have an operator between the above two
operators as a convertor.
I would like to know if there is any other more generic approach to solve
this problem without writing a new Operator for every new application using
Windowed Operators.

Thanks,
Ajay



On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

Hi All,

I think we have some agreement on the way we should use control tuples for
File I/O operators to support batch.

In order to have more operators in Malhar, support this paradigm, I think
we should also look at store operators - JDBC, Cassandra, HBase etc.
The case with these operators is simpler as most of these do not poll the
sources (except JDBC poller operator) and just stop once they have read a
fixed amount of data. In other words, these are inherently batch sources.
The only change that we should add to these operators is to shut down the
DAG once the reading of data is done. For a windowed operator this would
mean a Global window with a final watermark before the DAG is shut down.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

Hi Thomas,

Even though the windowing operator is not just "event time", it seems it
is too much dependent on the "time" attribute of the incoming tuple. This
is the reason we had to model the file index as a timestamp to solve the
batch case for files.
Perhaps we should work on increasing the scope of the windowed operator
to
consider other types of windows as well. The Sequence option suggested by
David seems to be something in that direction.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org> wrote:

That's correct, we are looking at a generalized approach for state
management vs. a series of special cases.

And to be clear, windowing does not imply event time, otherwise it would
be
"EventTimeOperator" :-)

Thomas

On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <
bhup...@datatorrent.com>
wrote:

Hi David,

I went through the discussion, but it seems like it is more on the
event
time watermark handling as opposed to batches. What we are trying to
do
is
have watermarks serve the purpose of demarcating batches using control
tuples. Since each batch is separate from others, we would like to
have
stateful processing within a batch, but not across batches.
At the same time, we would like to do this in a manner which is
consistent
with the windowing mechanism provided by the windowed operator. This
will
allow us to treat a single batch as a (bounded) stream and apply all
the
event time windowing concepts in that time span.

For example, let's say I need to process data for a day (24 hours) as
a
single batch. The application is still streaming in nature: it would
end
the batch after a day and start a new batch the next day. At the same
time,
I would be able to have early trigger firings every minute as well as
drop
any data which is say, 5 mins late. All this within a single day.

~ Bhupesh



_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>
wrote:
There is a discussion in the Flink mailing list about key-based
watermarks.
I think it's relevant to our use case here.
https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E

David

On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
bhup...@datatorrent.com
wrote:

Hi David,

If using time window does not seem appropriate, we can have
another
class
which is more suited for such sequential and distinct windows.
Perhaps, a
CustomWindow option can be introduced which takes in a window id.
The
purpose of this window option could be to translate the window id
into
appropriate timestamps.

Another option would be to go with a custom timestampExtractor for
such
tuples which translates the each unique file name to a distinct
timestamp
while using time windows in the windowed operator.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 12:28 AM, David Yan <david...@gmail.com>
wrote:
I now see your rationale on putting the filename in the window.
As far as I understand, the reasons why the filename is not part
of
the
key
and the Global Window is not used are:

1) The files are processed in sequence, not in parallel
2) The windowed operator should not keep the state associated
with
the
file
when the processing of the file is done
3) The trigger should be fired for the file when a file is done
processing.
However, if the file is just a sequence has nothing to do with a
timestamp,
assigning a timestamp to a file is not an intuitive thing to do
and
would
just create confusions to the users, especially when it's used
as
an
example for new users.

How about having a separate class called SequenceWindow? And
perhaps
TimeWindow can inherit from it?

David

On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org>
wrote:
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
bhup...@datatorrent.com
wrote:

I think my comments related to count based windows might be
causing
confusion. Let's not discuss count based scenarios for now.

Just want to make sure we are on the same page wrt. the
"each
file
is a
batch" use case. As mentioned by Thomas, the each tuple from
the
same
file
has the same timestamp (which is just a sequence number) and
that
helps
keep tuples from each file in a separate window.

Yes, in this case it is a sequence number, but it could be a
time
stamp
also, depending on the file naming convention. And if it was
event
time
processing, the watermark would be derived from records within
the
file.
Agreed, the source should have a mechanism to control the time
stamp
extraction along with everything else pertaining to the
watermark
generation.


We could also implement a "timestampExtractor" interface to
identify
the
timestamp (sequence number) for a file.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <
t...@apache.org
wrote:
I don't think this is a use case for count based window.

We have multiple files that are retrieved in a sequence
and
there
is
no
knowledge of the number of records per file. The
requirement is
to
aggregate each file separately and emit the aggregate when
the
file
is
read
fully. There is no concept of "end of something" for an
individual
key
and
global window isn't applicable.

However, as already explained and implemented by Bhupesh,
this
can
be
solved using watermark and window (in this case the window
timestamp
isn't
a timestamp, but a file sequence, but that doesn't matter.

Thomas


On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
david...@gmail.com
wrote:
I don't think this is the way to go. Global Window only
means
the
timestamp
does not matter (or that there is no timestamp). It does
not
necessarily
mean it's a large batch. Unless there is some notion of
event
time
for
each
file, you don't want to embed the file into the window
itself.
If you want the result broken up by file name, and if
the
files
are
to
be
processed in parallel, I think making the file name be
part
of
the
key
is
the way to go. I think it's very confusing if we somehow
make
the
file
to
be part of the window.

For count-based window, it's not implemented yet and
you're
welcome
to
add
that feature. In case of count-based windows, there
would
be
no
notion
of
time and you probably only trigger at the end of each
window.
In
the
case
of count-based windows, the watermark only matters for
batch
since
you
need
a way to know when the batch has ended (if the count is
10,
the
number
of
tuples in the batch is let's say 105, you need a way to
end
the
last
window
with 5 tuples).

David

On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
bhup...@datatorrent.com
wrote:

Hi David,

Thanks for your comments.

The wordcount example that I created based on the
windowed
operator
does
processing of word counts per file (each file as a
separate
batch),
i.e.
process counts for each file and dump into separate
files.
As I understand Global window is for one large batch;
i.e.
all
incoming
data falls into the same batch. This could not be
processed
using
GlobalWindow option as we need more than one windows.
In
this
case, I
configured the windowed operator to have time windows
of
1ms
each
and
passed data for each file with increasing timestamps:
(file1,
1),
(file2,
2) and so on. Is there a better way of handling this
scenario?
Regarding (2 - count based windows), I think there is
a
trigger
option
to
process count based windows. In case I want to process
every
1000
tuples
as
a batch, I could set the Trigger option to
CountTrigger
with
the
accumulation set to Discarding. Is this correct?

I agree that (4. Final Watermark) can be done using
Global
window.
​~ Bhupesh​

______________________________
_________________________
Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
david...@gmail.com>
wrote:
I'm worried that we are making the watermark concept
too
complicated.
Watermarks should simply just tell you what windows
can
be
considered
complete.

Point 2 is basically a count-based window.
Watermarks
do
not
play a
role
here because the window is always complete at the
n-th
tuple.
If I understand correctly, point 3 is for batch
processing
of
files.
Unless
the files contain timed events, it sounds to be that
this
can
be
achieved
with just a Global Window. For signaling EOF, a
watermark
with
a
+infinity
timestamp can be used so that triggers will be fired
upon
receipt
of
that
watermark.

For point 4, just like what I mentioned above, can
be
achieved
with a
watermark with a +infinity timestamp.

David




On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
bhup...@datatorrent.com
wrote:

Hi Thomas,

For an input operator which is supposed to
generate
watermarks
for
downstream operators, I can think about the
following
watermarks
that
the
operator can emit:
1. Time based watermarks (the high watermark / low
watermark)
2. Number of tuple based watermarks (Every n
tuples)
3. File based watermarks (Start file, end file)
4. Final watermark

File based watermarks seem to be applicable for
batch
(file
based)
as
well,
and hence I thought of looking at these first.
Does
this
seem
to
be
in
line
with the thought process?

~ Bhupesh



______________________________
_________________________
Bhupesh Chawda

Software Engineer

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
t...@apache.org
wrote:
I don't think this should be designed based on a
simplistic
file
input-output scenario. It would be good to
include a
stateful
transformation based on event time.

More complex pipelines contain stateful
transformations
that
depend
on
windowing and watermarks. I think we need a
watermark
concept
that
is
based
on progress in event time (or other monotonic
increasing
sequence)
that
other operators can generically work with.

Note that even file input in many cases can
produce
time
based
watermarks,
for example when you read part files that are
bound
by
event
time.
Thanks,
Thomas


On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda
<
bhup...@datatorrent.com
wrote:

For better understanding the use case for
control
tuples
in
batch,
​I
am
creating a prototype for a batch application
using
File
Input
and
File
Output operators.

To enable basic batch processing for File IO
operators, I
am
proposing
the
following changes to File input and output
operators:
1. File Input operator emits a watermark each
time
it
opens
and
closes
a
file. These can be "start file" and "end file"
watermarks
which
include
the
corresponding file names. The "start file"
tuple
should
be
sent
before
any
of the data from that file flows.
2. File Input operator can be configured to
end
the
application
after a
single or n scans of the directory (a batch).
This
is
where
the
operator
emits the final watermark (the end of
application
control
tuple).
This
will
also shutdown the application.
3. The File output operator handles these
control
tuples.
"Start
file"
initializes the file name for the incoming
tuples.
"End
file"
watermark
forces a finalize on that file.

The user would be able to enable the operators
to
send
only
those
watermarks that are needed in the application.
If
none
of
the
options
are
configured, the operators behave as in a
streaming
application.
There are a few challenges in the
implementation
where
the
input
operator
is partitioned. In this case, the correlation
between
the
start/end
for a
file and the data tuples for that file is
lost.
Hence
we
need
to
maintain
the filename as part of each tuple in the
pipeline.
The "start file" and "end file" control tuples
in
this
example
are
temporary names for watermarks. We can have
generic
"start
batch" /
"end
batch" tuples which could be used for other
use
cases
as
well.
The
Final
watermark is common and serves the same
purpose
in
each
case.
Please let me know your thoughts on this.

~ Bhupesh



On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh
Chawda <
bhup...@datatorrent.com>
wrote:

Yes, this can be part of operator
configuration.
Given
this,
for
a
user
to
define a batch application, would mean
configuring
the
connectors
(mostly
the input operator) in the application for
the
desired
behavior.
Similarly,
there can be other use cases that can be
achieved
other
than
batch.
We may also need to take care of the
following:
1. Make sure that the watermarks or control
tuples
are
consistent
across
sources. Meaning an HDFS sink should be able
to
interpret
the
watermark
tuple sent out by, say, a JDBC source.
2. In addition to I/O connectors, we should
also
look
at
the
need
for
processing operators to understand some of
the
control
tuples /
watermarks.
For example, we may want to reset the
operator
behavior
on
arrival
of
some
watermark tuple.

~ Bhupesh

On Tue, Jan 17, 2017 at 9:59 PM, Thomas
Weise
<
t...@apache.org>
wrote:
The HDFS source can operate in two modes,
bounded
or
unbounded.
If
you
scan
only once, then it should emit the final
watermark
after
it
is
done.
Otherwise it would emit watermarks based
on a
policy
(files
names
etc.).
The mechanism to generate the marks may
depend
on
the
type
of
source
and
the user needs to be able to
influence/configure
it.
Thomas


On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh
Chawda
<
bhup...@datatorrent.com>
wrote:

Hi Thomas,

I am not sure that I completely
understand
your
suggestion.
Are
you
suggesting to broaden the scope of the
proposal
to
treat
all
sources
as
bounded as well as unbounded?

In case of Apex, we treat all sources as
unbounded
sources.
Even
bounded
sources like HDFS file source is treated
as
unbounded
by
means
of
scanning
the input directory repeatedly.

Let's consider HDFS file source for
example:
In this case, if we treat it as a bounded
source,
we
can
define
hooks
which
allows us to detect the end of the file
and
send
the
"final
watermark".
We
could also consider HDFS file source as a
streaming
source
and
define
hooks
which send watermarks based on different
kinds
of
windows.
Please correct me if I misunderstand.

~ Bhupesh


On Mon, Jan 16, 2017 at 9:23 PM, Thomas
Weise
<
t...@apache.org
wrote:
Bhupesh,

Please see how that can be solved in a
unified
way
using
windows
and
watermarks. It is bounded data vs.
unbounded
data.
In
Beam
for
example,
you
can use the "global window" and the
final
watermark
to
accomplish
what
you
are looking for. Batch is just a
special
case
of
streaming
where
the
source
emits the final watermark.

Thanks,
Thomas


On Mon, Jan 16, 2017 at 1:02 AM,
Bhupesh
Chawda
<
bhup...@datatorrent.com
wrote:

Yes, if the user needs to develop a
batch
application,
then
batch
aware
operators need to be used in the
application.
The nature of the application is
mostly
controlled
by
the
input
and
the
output operators used in the
application.
For example, consider an application
which
needs
to
filter
records
in a
input file and store the filtered
records
in
another
file.
The
nature
of
this app is to end once the entire
file is
processed.
Following
things
are
expected of the application:

    1. Once the input data is over,
finalize
the
output
file
from
.tmp
    files. - Responsibility of output
operator
    2. End the application, once the
data
is
read
and
processed -
    Responsibility of input operator

These functions are essential to
allow
the
user
to
do
higher
level
operations like scheduling or
running a
workflow
of
batch
applications.
I am not sure about intermediate
(processing)
operators,
as
there
is no
change in their functionality for
batch
use
cases.
Perhaps,
allowing
running multiple batches in a single
application
may
require
similar
changes in processing operators as
well.
~ Bhupesh

On Mon, Jan 16, 2017 at 2:19 PM,
Priyanka
Gugale <
pri...@apache.org
wrote:

Will it make an impression on user
that,
if
he
has a
batch
usecase he
has
to use batch aware operators only?
If
so,
is
that
what
we
expect?
I
am
not
aware of how do we implement batch
scenario
so
this
might
be a
basic
question.

-Priyanka

On Mon, Jan 16, 2017 at 12:02 PM,
Bhupesh
Chawda <
bhup...@datatorrent.com>
wrote:

Hi All,

While design / implementation for
custom
control
tuples
is
ongoing, I
thought it would be a good idea
to
consider
its
usefulness
in
one
of
the
use cases -  batch applications.

This is a proposal to adapt /
extend
existing
operators
in
the
Apache
Apex
Malhar library so that it is easy
to
use
them
in
batch
use
cases.
Naturally, this would be
applicable
for
only a
subset
of
operators
like
File, JDBC and NoSQL databases.
For example, for a file based
store,
(say
HDFS
store),
we
could
have
FileBatchInput and
FileBatchOutput
operators
which
allow
easy
integration
into a batch application. These
operators
would
be
extended
from
their
existing implementations and
would
be
"Batch
Aware",
in
that
they
may
understand the meaning of some
specific
control
tuples
that
flow
through
the DAG. Start batch and end
batch
seem
to
be
the
obvious
candidates
that
come to mind. On receipt of such
control
tuples,
they
may
try
to
modify
the
behavior of the operator - to
reinitialize
some
metrics
or
finalize
an
output file for example.

We can discuss the potential
control
tuples
and
actions
in
detail,
but
first I would like to understand
the
views
of
the
community
for
this
proposal.

~ Bhupesh




Reply via email to