public static class Pojo implements Tuple
{
  @Override
  public Object getValue()
  {
    return this;
  }
}

@Override
public void populateDAG(DAG dag, Configuration conf)
{
  CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
  WindowedOperatorImpl<Pojo, Pojo, Pojo> windowedOperator = 
dag.addOperator("windowOperator", WindowedOperatorImpl.class);
  dag.addStream("csvToWindowed", csvParser.out, new 
InputPort[]{windowedOperator.input});
}


Thank you,

Vlad

On 4/29/17 15:20, AJAY GUPTA wrote:
Even this will not work because the output port of CsvParser is of type
Object. Even though Customer extends Tuple<Object>, it will still fail to
work since Tuple<Object> gets output as Object.

*DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();*

The input port type at windowed operator with InputT = Object :
*DefaultInputPort<Tuple<Object>>*


Ajay


On Sun, Apr 30, 2017 at 1:45 AM, Vlad Rozov <v.ro...@datatorrent.com> wrote:

Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to
the actual type of InputT at runtime. Introducing an operator just to do a
cast is not a good design decision, IMO.

Thank you,
Vlad

Отправлено с iPhone

On Apr 29, 2017, at 02:50, AJAY GUPTA <ajaygit...@gmail.com> wrote:

I am using WindowedOperatorImpl and it is declared as follows.

WindowedOperatorImpl<InputT, AccumulationType, OutputType>
windowedOperator
= new WindowedOperatorImpl<>();

In my application scenario, the InputT is Customer POJO which is getting
output as an Object by CsvParser.


Ajay

On Fri, Apr 28, 2017 at 11:53 PM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

How do you declare WindowedOperator?

Thank you,

Vlad


On 4/28/17 10:35, AJAY GUPTA wrote:

Vlad,

The approach you suggested doesn't work because the CSVParser outputs
Object Data Type irrespective of the POJO class being emitted.


Ajay

On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov <v.ro...@datatorrent.com>
wrote:

Make your POJO class implement WindowedOperator Tuple interface (it may
return itself in getValue()).

Thank you,

Vlad

On 4/28/17 02:44, AJAY GUPTA wrote:

Hi All,
I am creating an application which is using Windowed Operator. This
application involves CsvParser operator emitting a POJO object which
is
to
be passed as input to WindowedOperator. The WindowedOperator
requires an
instance of Tuple class as input :
*public final transient DefaultInputPort<Tuple<InputT>>
input = new DefaultInputPort<Tuple<InputT>>() *

Due to this, the addStream cannot work as the output of CsvParser's
output
port is not compatible with input port type of WindowedOperator.
One way to solve this problem is to have an operator between the
above
two
operators as a convertor.
I would like to know if there is any other more generic approach to
solve
this problem without writing a new Operator for every new application
using
Windowed Operators.

Thanks,
Ajay



On Thu, Mar 23, 2017 at 5:25 PM, Bhupesh Chawda <
bhup...@datatorrent.com>
wrote:

Hi All,

I think we have some agreement on the way we should use control
tuples
for
File I/O operators to support batch.

In order to have more operators in Malhar, support this paradigm, I
think
we should also look at store operators - JDBC, Cassandra, HBase etc.
The case with these operators is simpler as most of these do not
poll
the
sources (except JDBC poller operator) and just stop once they have
read a
fixed amount of data. In other words, these are inherently batch
sources.
The only change that we should add to these operators is to shut
down
the
DAG once the reading of data is done. For a windowed operator this
would
mean a Global window with a final watermark before the DAG is shut
down.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <
bhup...@datatorrent.com>
wrote:

Hi Thomas,

Even though the windowing operator is not just "event time", it
seems
it
is too much dependent on the "time" attribute of the incoming
tuple.
This
is the reason we had to model the file index as a timestamp to
solve
the
batch case for files.
Perhaps we should work on increasing the scope of the windowed
operator

to
consider other types of windows as well. The Sequence option
suggested
by
David seems to be something in that direction.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <t...@apache.org>
wrote:

That's correct, we are looking at a generalized approach for state

management vs. a series of special cases.

And to be clear, windowing does not imply event time, otherwise it
would
be
"EventTimeOperator" :-)

Thomas

On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <

bhup...@datatorrent.com>
wrote:

Hi David,

I went through the discussion, but it seems like it is more on
the
event
time watermark handling as opposed to batches. What we are trying
to
do

is

have watermarks serve the purpose of demarcating batches using
control
tuples. Since each batch is separate from others, we would like
to
have
stateful processing within a batch, but not across batches.

At the same time, we would like to do this in a manner which is
consistent
with the windowing mechanism provided by the windowed operator.
This
will
allow us to treat a single batch as a (bounded) stream and apply
all
the
event time windowing concepts in that time span.

For example, let's say I need to process data for a day (24
hours) as
a
single batch. The application is still streaming in nature: it
would
end

the batch after a day and start a new batch the next day. At the
same
time,

I would be able to have early trigger firings every minute as
well as
drop
any data which is say, 5 mins late. All this within a single day.
~ Bhupesh



_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 9:27 PM, David Yan <david...@gmail.com>

wrote:
There is a discussion in the Flink mailing list about key-based

watermarks.
I think it's relevant to our use case here.
https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbc
c6510ef
424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E

David

On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:

Hi David,

If using time window does not seem appropriate, we can have

another
class
which is more suited for such sequential and distinct windows.
Perhaps, a
CustomWindow option can be introduced which takes in a window
id.
The

purpose of this window option could be to translate the window id

into

appropriate timestamps.

Another option would be to go with a custom timestampExtractor
for
such
tuples which translates the each unique file name to a distinct

timestamp
while using time windows in the windowed operator.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 12:28 AM, David Yan <
david...@gmail.com>
wrote:
I now see your rationale on putting the filename in the window.

As far as I understand, the reasons why the filename is not
part
of
the
key
and the Global Window is not used are:
1) The files are processed in sequence, not in parallel
2) The windowed operator should not keep the state associated

with
the
file
when the processing of the file is done
3) The trigger should be fired for the file when a file is
done
processing.
However, if the file is just a sequence has nothing to do with
a
timestamp,
assigning a timestamp to a file is not an intuitive thing to do
and
would
just create confusions to the users, especially when it's used
as

an
example for new users.
How about having a separate class called SequenceWindow? And
perhaps
TimeWindow can inherit from it?
David
On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <t...@apache.org
wrote:
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:

I think my comments related to count based windows might be
causing

confusion. Let's not discuss count based scenarios for now.
Just want to make sure we are on the same page wrt. the
"each
file
is a
batch" use case. As mentioned by Thomas, the each tuple from
the

same
file
has the same timestamp (which is just a sequence number) and
that
helps
keep tuples from each file in a separate window.
Yes, in this case it is a sequence number, but it could be a
time
stamp
also, depending on the file naming convention. And if it was
event

time
processing, the watermark would be derived from records within
the

file.
Agreed, the source should have a mechanism to control the time
stamp

extraction along with everything else pertaining to the
watermark
generation.
We could also implement a "timestampExtractor" interface to
identify

the
timestamp (sequence number) for a file.
~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <

t...@apache.org
wrote:
I don't think this is a use case for count based window.
We have multiple files that are retrieved in a sequence
and
there
is
no
knowledge of the number of records per file. The
requirement is

to
aggregate each file separately and emit the aggregate when
the
file
is
read
fully. There is no concept of "end of something" for an
individual
key
and
global window isn't applicable.
However, as already explained and implemented by Bhupesh,

this
can
be
solved using watermark and window (in this case the window
timestamp
isn't
a timestamp, but a file sequence, but that doesn't matter.
Thomas


On Mon, Feb 27, 2017 at 8:05 AM, David Yan <

david...@gmail.com
wrote:
I don't think this is the way to go. Global Window only
means
the
timestamp
does not matter (or that there is no timestamp). It does
not
necessarily
mean it's a large batch. Unless there is some notion of
event

time
for
each
file, you don't want to embed the file into the window
itself.
If you want the result broken up by file name, and if
the
files
are
to
be
processed in parallel, I think making the file name be
part

of
the
key
is
the way to go. I think it's very confusing if we somehow
make

the
file
to
be part of the window.
For count-based window, it's not implemented yet and

you're
welcome
to
add
that feature. In case of count-based windows, there
would
be
no
notion
of
time and you probably only trigger at the end of each
window.

In
the
case
of count-based windows, the watermark only matters for
batch

since
you
need
a way to know when the batch has ended (if the count is
10,
the
number
of
tuples in the batch is let's say 105, you need a way to
end

the
last
window
with 5 tuples).
David

On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:

Hi David,

Thanks for your comments.

The wordcount example that I created based on the

windowed
operator
does
processing of word counts per file (each file as a
separate

batch),
i.e.
process counts for each file and dump into separate
files.

As I understand Global window is for one large batch;
i.e.
all
incoming
data falls into the same batch. This could not be
processed

using
GlobalWindow option as we need more than one windows.
In
this
case, I
configured the windowed operator to have time windows
of
1ms
each
and
passed data for each file with increasing timestamps:
(file1,
1),
(file2,
2) and so on. Is there a better way of handling this
scenario?

Regarding (2 - count based windows), I think there is
a
trigger
option
to
process count based windows. In case I want to process
every

1000
tuples
as
a batch, I could set the Trigger option to
CountTrigger
with
the
accumulation set to Discarding. Is this correct?
I agree that (4. Final Watermark) can be done using
Global
window.
​~ Bhupesh​
______________________________
_________________________
Bhupesh Chawda
E: bhup...@datatorrent.com | Twitter: @bhupeshsc
www.datatorrent.com  |  apex.apache.org



On Mon, Feb 27, 2017 at 12:18 PM, David Yan <

david...@gmail.com>
wrote:
I'm worried that we are making the watermark concept
too

complicated.
Watermarks should simply just tell you what windows
can
be
considered
complete.
Point 2 is basically a count-based window.
Watermarks
do
not
play a
role
here because the window is always complete at the
n-th

tuple.
If I understand correctly, point 3 is for batch
processing
of
files.
Unless
the files contain timed events, it sounds to be that
this
can
be
achieved
with just a Global Window. For signaling EOF, a
watermark

with
a
+infinity
timestamp can be used so that triggers will be fired
upon
receipt
of
that
watermark.
For point 4, just like what I mentioned above, can

be
achieved
with a
watermark with a +infinity timestamp.
David



On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <

bhup...@datatorrent.com
wrote:

Hi Thomas,

For an input operator which is supposed to

generate
watermarks
for
downstream operators, I can think about the
following
watermarks
that
the
operator can emit:
1. Time based watermarks (the high watermark / low

watermark)
2. Number of tuple based watermarks (Every n
tuples)
3. File based watermarks (Start file, end file)
4. Final watermark
File based watermarks seem to be applicable for

batch
(file
based)
as
well,
and hence I thought of looking at these first.
Does
this
seem
to
be
in
line
with the thought process?
~ Bhupesh



______________________________

_________________________
Bhupesh Chawda
Software Engineer
E: bhup...@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <

t...@apache.org
wrote:
I don't think this should be designed based on a
simplistic
file
input-output scenario. It would be good to
include a
stateful
transformation based on event time.
More complex pipelines contain stateful
transformations
that
depend
on
windowing and watermarks. I think we need a
watermark

concept
that
is
based
on progress in event time (or other monotonic
increasing
sequence)
that
other operators can generically work with.
Note that even file input in many cases can
produce
time
based
watermarks,
for example when you read part files that are
bound
by
event
time.
Thanks,
Thomas

On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda

<
bhup...@datatorrent.com

Reply via email to