Re: RichMapFunction setup method

2017-06-14 Thread Mikhail Pryakhin
Hi Chesnay,
Done. https://issues.apache.org/jira/browse/FLINK-6912 

Kind Regards,
Mike Pryakhin


> On 13 Jun 2017, at 21:47, Chesnay Schepler  wrote:
> 
> It is a remnant of the past since that method signature originates from the 
> Record API,
> the predecessor of the current DataSet API.
> 
> Even in the DataSet API you can just pass arguments through the constructor.
> Feel free to open a JIRA, just make sure it is a subtask of FLINK-3957.
> 
> On 13.06.2017 16:40, Mikhail Pryakhin wrote:
>> Thanks a lot  Chesnay,
>> 
>> In case it works properly in the Batch API, don’t you think that it should 
>> not be called "remnant of the past“?
>> Should I create an issue so we don’t forget about it and may be fix it in 
>> the future, I think I’m not the only one who deals with this method.
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
>> 
>>> On 13 Jun 2017, at 17:20, Chesnay Schepler >> > wrote:
>>> 
>>> I'm not aware of any plans to replace it.
>>> 
>>> For the Batch API it also works properly, so deprecating it would be 
>>> misleading.
>>> 
>>> On 13.06.2017 16:04, Mikhail Pryakhin wrote:
 Hi Chesnay,
 Thanks for the reply,
 
> The existing signature for open() is a remnant of the past.
 
 Should the method be deprecated then so that it doesn’t confuse users?
 
 Kind Regards,
 Mike Pryakhin
 
 
> On 13 Jun 2017, at 16:54, Chesnay Schepler  > wrote:
> 
> The existing signature for open() is a remnant of the past.
> 
> We currently recommend to pass all arguments through the constructor and 
> store them in fields.
> You can of course also pass a Configuration containing all parameters.
> 
> On 13.06.2017 15:46, Mikhail Pryakhin wrote:
>> Hi all!
>> 
>> A RichMapFunction [1] provides a very handy setup method 
>> RichFunction#open(org.apache.flink.configuration.Configuration) which 
>> consumes a Configuration instance as an argument, but this argument 
>> doesn't bear any configuration parameters because it is always passed to 
>> the method as a new instance. [2] depicts the problem.
>> 
>> Is there any way to pass configuration parameters to the  
>> RichFunction#open method via the Configuration parameter? Or is it a bug?
>> P.S. I'm using flink 1.3
>> 
>> Thanks in advance!
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
>>  
>> 
>> [2] 
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java#L111
>>  
>> 
>> Kind Regards,
>> Mike Pryakhin
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
> 
 
>>> 
>> 
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: User self resource file.

2017-06-14 Thread yunfan123
It sames not supported by flink? 
I think there should a function like getBlobStore in the RuntimeContext
interface.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-self-resource-file-tp13693p13720.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Question about the custom partitioner

2017-06-14 Thread Xingcan Cui
Hi all,

I want to duplicate records to multiple downstream tasks (not all of them
thus the
Broadcasting should not work) in stream environment.
However, it seems that the current custom partitioner can return only one
partition index.
Why this restriction exists or do I miss something?

Thanks,
Xingcan


Re: ReduceFunction mechanism

2017-06-14 Thread Fabian Hueske
You can use a MapFunction (however, it will touch each element and not only
the first).
An alternative could be the AggregateFunction if you are using
ReduceFunction on a WindowedStream. The interface is a bit more complex
though.

Best, Fabian



2017-06-13 10:55 GMT+02:00 nragon :

> So, if my reduce function applies some transformation I must migrate that
> transformation to a map before the reduce to ensure it transforms, even if
> there is only one element?
> I can chain them together and it will be "almost" as they were in the same
> function(Ensure same thread processing)?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/ReduceFunction-
> mechanism-tp13651p13679.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: ReduceFunction mechanism

2017-06-14 Thread nragon
My goal is to touch each element before the aggregation but i could do it in
the reduce function a not having to add another function, thus creating more
overhead. The reduce method receives the reduced and a new element which i
would change and apply my aggregation.
I'm doing keyby->reduce.
Using a map before all this is a solution.
I've  never tried AggregateFunction, any examples?

Thanks,
Nuno



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ReduceFunction-mechanism-tp13651p13723.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Cannot write record to fresh sort buffer. Record too large.

2017-06-14 Thread Sebastian Neef
Hi,

I removed the .distinct() and ran another test.

Without filtering duplicate entries, the Job processes more data and
runs much longer, but eventually fails with the following error:

> java.lang.OutOfMemoryError: Requested array size exceeds VM limit

Even then playing around with the aforementioned Flink settings does not
resolve the problem.

I guess, I need to debug this some more.

Best,
Sebastian


Re: Can't get keyed messages from Kafka

2017-06-14 Thread AndreaKinn
Thank's that works!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13725.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: At what point do watermarks get injected into the stream?

2017-06-14 Thread Ray Ruvinskiy
One more question: continuing with the same example but supposing that the Map 
operators in the diagram are followed by a KeyBy, which is then followed by 
another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further 
say that by luck of the draw, the keys of the records processed by Map(2) are 
such that all those records end up going to MapB(2) and none of them end up 
shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated 
from Map(2), even though it’s not getting the records that caused those 
watermarks to be generated?

Thanks,

Ray

From: Fabian Hueske 
Date: Monday, June 12, 2017 at 4:06 PM
To: Ray Ruvinskiy 
Cc: "user@flink.apache.org" 
Subject: Re: At what point do watermarks get injected into the stream?

Hi,
each operator keeps track of the latest (and therefore maximum) watermark 
received from each of its inputs and sets its own internal time to the minimum 
watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) 
first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it 
received a watermark with >= 33 from Map(2).
All operators use this logic for watermark propagation. So it does not matter 
whether this is a window operator or a CEP operator.
Let me know if you have further questions,
Fabian

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>:
Thanks!

I had a couple some follow-up questions to the example in the documentation. 
Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. 
If I understand correctly, map(1) will forward the watermark of 33 to window(1) 
and window(2), and map(2) will forward the watermark of 17 to the same window 
operators. I’m assuming there is nothing to prevent window(1) and window(2) 
from getting the watermark of 33 before the watermark of 17, right? In that 
case, how do window(1) and window(2) compute the minimum watermark to forward 
to the next operator downstream? Will it be a per-window watermark?

What would happen if instead if a window operator in that position, we had 
something like the CEP operator, which in effect maintains state and does 
aggregations without windowing (or another similar such operator)? How does it 
determine what the minimum watermark is at any given time, in light of the fact 
that, in principle, it might receive a watermark value smaller than anything 
it’s seen before from a parallel source?

Ray

From: Fabian Hueske mailto:fhue...@gmail.com>>
Date: Sunday, June 11, 2017 at 5:54 PM

To: Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: At what point do watermarks get injected into the stream?

Each parallel instance of a TimestampAssigner independently assigns timestamps.
After a shuffle, operators forward the minimum watermark across all input 
connections. For details have a look at the watermarks documentation [1].
Best, Fabian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams

2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>:
Thanks for the explanation, Fabian.

Suppose I have a parallel source that does not inject watermarks, and the first 
operation on the DataStream is assignTimestampsAndWatermarks. Does each 
parallel task that makes up the source independently inject watermarks for the 
records that it has read? Suppose I then call keyBy and a shuffle ensues. Will 
the resulting partitions after the shuffle have interleaved watermarks from the 
various source tasks?

More concretely, suppose s source has a degree of parallelism of two. One of 
the source tasks injects the watermarks 2 and 5, while the other injects 3 and 
10. There is then a shuffle, creating two different partitions. Will all the 
watermarks be broadcast to all the partitions? Or is it possible for, say, one 
partition to end up with watermarks 2 and 10 and another with 3 and 5? And 
after the shuffle, how do we ensure that the watermarks are processed in order 
by the operators receiving them?

Thanks,

Ray

From: Fabian Hueske mailto:fhue...@gmail.com>>
Date: Saturday, June 10, 2017 at 3:56 PM
To: Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: At what point do watermarks get injected into the stream?

Hi Ray,
in principle, watermarks can be injected anywhere in a stream by calling 
DataStream.assignTimestampsAndWatermarks().
However, timestamps are usually injected as soon as possible after a stream in 
ingested (before the first shuffle). The reason is that watermarks depend on 
the order of events (and their timestamps) in the stream. While Flink 
guarantees the order of events within a partition, a shuffle interleaves events 
of different partitions in an unpredictable way such

Re: At what point do watermarks get injected into the stream?

2017-06-14 Thread Fabian Hueske
Yes, I think so. @Aljoscha, please correct me if I describe something wrong
here.

The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all
connected subsequent tasks (window(1) and window(2)).
The window tasks update their respective watermarks based on the watermarks
received from their input tasks (map(1) and map(2)) and forward their
watermarks to all connected subsequent tasks.
Since map does not require a shuffle, mapB(1) will receive only watermarks
from window(1) and mapB(2) from window(2). However, since the watermarks of
the windows depend on their inputs (map(1) and map(2)), the mapB tasks will
get watermarks from all inputs.

Note that watermarks are not bound to specified records but define the
"logical time" of an operator.
Because watermarks are always broadcasted to all subsequent tasks, the
whole application is "synced" to the same time after the first keyBy() (or
anyother kind of full shuffle).
So the progress of time is determined by the "slowest" input.

Best, Fabian

2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy :

> One more question: continuing with the same example but supposing that the
> Map operators in the diagram are followed by a KeyBy, which is then
> followed by another Map, let’s say with operators name MapB(1) and MapB(2).
> Let’s further say that by luck of the draw, the keys of the records
> processed by Map(2) are such that all those records end up going to MapB(2)
> and none of them end up shuffled to MapB(1). Would MapB(1) still continue
> to get watermarks generated from Map(2), even though it’s not getting the
> records that caused those watermarks to be generated?
>
>
>
> Thanks,
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske 
> *Date: *Monday, June 12, 2017 at 4:06 PM
>
> *To: *Ray Ruvinskiy 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Hi,
>
> each operator keeps track of the latest (and therefore maximum) watermark
> received from each of its inputs and sets its own internal time to the
> minimum watermark of each input.
> In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1)
> first receives a WM 33 from Map(1) it won't emit a watermark with 33 until
> it received a watermark with >= 33 from Map(2).
>
> All operators use this logic for watermark propagation. So it does not
> matter whether this is a window operator or a CEP operator.
>
> Let me know if you have further questions,
>
> Fabian
>
>
>
> 2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy :
>
> Thanks!
>
>
>
> I had a couple some follow-up questions to the example in the
> documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends
> a watermark of 17. If I understand correctly, map(1) will forward the
> watermark of 33 to window(1) and window(2), and map(2) will forward the
> watermark of 17 to the same window operators. I’m assuming there is nothing
> to prevent window(1) and window(2) from getting the watermark of 33 before
> the watermark of 17, right? In that case, how do window(1) and window(2)
> compute the minimum watermark to forward to the next operator downstream?
> Will it be a per-window watermark?
>
>
>
> What would happen if instead if a window operator in that position, we had
> something like the CEP operator, which in effect maintains state and does
> aggregations without windowing (or another similar such operator)? How does
> it determine what the minimum watermark is at any given time, in light of
> the fact that, in principle, it might receive a watermark value smaller
> than anything it’s seen before from a parallel source?
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske 
> *Date: *Sunday, June 11, 2017 at 5:54 PM
>
>
> *To: *Ray Ruvinskiy 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Each parallel instance of a TimestampAssigner independently assigns
> timestamps.
>
> After a shuffle, operators forward the minimum watermark across all input
> connections. For details have a look at the watermarks documentation [1].
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/event_time.html#watermarks-in-parallel-streams
>
>
>
> 2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy :
>
> Thanks for the explanation, Fabian.
>
>
>
> Suppose I have a parallel source that does not inject watermarks, and the
> first operation on the DataStream is assignTimestampsAndWatermarks. Does
> each parallel task that makes up the source independently inject watermarks
> for the records that it has read? Suppose I then call keyBy and a shuffle
> ensues. Will the resulting partitions after the shuffle have interleaved
> watermarks from the various source tasks?
>
>
>
> More concretely, suppose s source has a degree of parallelism of two. One
> of the source tasks injects the watermarks 2 and 5, while the other injects
> 3 and 10. There is then a shuffle, creating two different partitions. Will
> all the waterm

Re: At what point do watermarks get injected into the stream?

2017-06-14 Thread Ray Ruvinskiy
Thanks again for the detailed explanation. There is one point I want to make 
sure I fully understand:

> Since map does not require a shuffle, mapB(1) will receive only watermarks 
> from window(1) and mapB(2) from window(2). However, since the watermarks of 
> the windows depend on their inputs (map(1) and map(2)), the mapB tasks will 
> get watermarks from all inputs.

As long as operators chaining is happening, watermarks will not be broadcast 
and will effectively be local to the chain, but as soon as a shuffle occurs 
(e.g., on a keyBy), all watermarks will be broadcast to all downstream tasks?

Ray

From: Fabian Hueske 
Date: Wednesday, June 14, 2017 at 8:44 AM
To: Ray Ruvinskiy 
Cc: "user@flink.apache.org" , Aljoscha Krettek 

Subject: Re: At what point do watermarks get injected into the stream?

Yes, I think so. @Aljoscha, please correct me if I describe something wrong 
here.

The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all 
connected subsequent tasks (window(1) and window(2)).
The window tasks update their respective watermarks based on the watermarks 
received from their input tasks (map(1) and map(2)) and forward their 
watermarks to all connected subsequent tasks.
Since map does not require a shuffle, mapB(1) will receive only watermarks from 
window(1) and mapB(2) from window(2). However, since the watermarks of the 
windows depend on their inputs (map(1) and map(2)), the mapB tasks will get 
watermarks from all inputs.
Note that watermarks are not bound to specified records but define the "logical 
time" of an operator.
Because watermarks are always broadcasted to all subsequent tasks, the whole 
application is "synced" to the same time after the first keyBy() (or anyother 
kind of full shuffle).
So the progress of time is determined by the "slowest" input.
Best, Fabian

2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>:
One more question: continuing with the same example but supposing that the Map 
operators in the diagram are followed by a KeyBy, which is then followed by 
another Map, let’s say with operators name MapB(1) and MapB(2). Let’s further 
say that by luck of the draw, the keys of the records processed by Map(2) are 
such that all those records end up going to MapB(2) and none of them end up 
shuffled to MapB(1). Would MapB(1) still continue to get watermarks generated 
from Map(2), even though it’s not getting the records that caused those 
watermarks to be generated?

Thanks,

Ray

From: Fabian Hueske mailto:fhue...@gmail.com>>
Date: Monday, June 12, 2017 at 4:06 PM

To: Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: At what point do watermarks get injected into the stream?

Hi,
each operator keeps track of the latest (and therefore maximum) watermark 
received from each of its inputs and sets its own internal time to the minimum 
watermark of each input.
In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1) 
first receives a WM 33 from Map(1) it won't emit a watermark with 33 until it 
received a watermark with >= 33 from Map(2).
All operators use this logic for watermark propagation. So it does not matter 
whether this is a window operator or a CEP operator.
Let me know if you have further questions,
Fabian

2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>:
Thanks!

I had a couple some follow-up questions to the example in the documentation. 
Suppose Source 1 sends a watermark of 33, and Source 2 sends a watermark of 17. 
If I understand correctly, map(1) will forward the watermark of 33 to window(1) 
and window(2), and map(2) will forward the watermark of 17 to the same window 
operators. I’m assuming there is nothing to prevent window(1) and window(2) 
from getting the watermark of 33 before the watermark of 17, right? In that 
case, how do window(1) and window(2) compute the minimum watermark to forward 
to the next operator downstream? Will it be a per-window watermark?

What would happen if instead if a window operator in that position, we had 
something like the CEP operator, which in effect maintains state and does 
aggregations without windowing (or another similar such operator)? How does it 
determine what the minimum watermark is at any given time, in light of the fact 
that, in principle, it might receive a watermark value smaller than anything 
it’s seen before from a parallel source?

Ray

From: Fabian Hueske mailto:fhue...@gmail.com>>
Date: Sunday, June 11, 2017 at 5:54 PM

To: Ray Ruvinskiy 
mailto:ray.ruvins...@arcticwolf.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: At what point do watermarks get injected into the stream?

Each parallel instance of a TimestampAssigner independently assigns timestamps.
After a shuffle, operators forward the minimum watermark across all input 
conn

Re: At what point do watermarks get injected into the stream?

2017-06-14 Thread Fabian Hueske
Yes that is correct. Watermarks are propagated to all outgoing channels of
an operator even if no records flows over one or more channels (for example
if data is already partitioned).

2017-06-14 14:51 GMT+02:00 Ray Ruvinskiy :

> Thanks again for the detailed explanation. There is one point I want to
> make sure I fully understand:
>
>
>
> > Since map does not require a shuffle, mapB(1) will receive only
> watermarks from window(1) and mapB(2) from window(2). However, since the
> watermarks of the windows depend on their inputs (map(1) and map(2)), the
> mapB tasks will get watermarks from all inputs.
>
>
>
> As long as operators chaining is happening, watermarks will not be
> broadcast and will effectively be local to the chain, but as soon as a
> shuffle occurs (e.g., on a keyBy), all watermarks will be broadcast to all
> downstream tasks?
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske 
> *Date: *Wednesday, June 14, 2017 at 8:44 AM
> *To: *Ray Ruvinskiy 
> *Cc: *"user@flink.apache.org" , Aljoscha Krettek <
> aljos...@apache.org>
>
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Yes, I think so. @Aljoscha, please correct me if I describe something
> wrong here.
>
> The first map tasks (Map(1) and Map(2)) broadcast their watermarks to all
> connected subsequent tasks (window(1) and window(2)).
>
> The window tasks update their respective watermarks based on the
> watermarks received from their input tasks (map(1) and map(2)) and forward
> their watermarks to all connected subsequent tasks.
>
> Since map does not require a shuffle, mapB(1) will receive only watermarks
> from window(1) and mapB(2) from window(2). However, since the watermarks of
> the windows depend on their inputs (map(1) and map(2)), the mapB tasks will
> get watermarks from all inputs.
>
> Note that watermarks are not bound to specified records but define the
> "logical time" of an operator.
> Because watermarks are always broadcasted to all subsequent tasks, the
> whole application is "synced" to the same time after the first keyBy() (or
> anyother kind of full shuffle).
> So the progress of time is determined by the "slowest" input.
>
> Best, Fabian
>
>
>
> 2017-06-14 13:51 GMT+02:00 Ray Ruvinskiy :
>
> One more question: continuing with the same example but supposing that the
> Map operators in the diagram are followed by a KeyBy, which is then
> followed by another Map, let’s say with operators name MapB(1) and MapB(2).
> Let’s further say that by luck of the draw, the keys of the records
> processed by Map(2) are such that all those records end up going to MapB(2)
> and none of them end up shuffled to MapB(1). Would MapB(1) still continue
> to get watermarks generated from Map(2), even though it’s not getting the
> records that caused those watermarks to be generated?
>
>
>
> Thanks,
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske 
> *Date: *Monday, June 12, 2017 at 4:06 PM
>
>
> *To: *Ray Ruvinskiy 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: At what point do watermarks get injected into the stream?
>
>
>
> Hi,
>
> each operator keeps track of the latest (and therefore maximum) watermark
> received from each of its inputs and sets its own internal time to the
> minimum watermark of each input.
> In the example, window(1) has two inputs (Map(1) and Map(2)). If Window(1)
> first receives a WM 33 from Map(1) it won't emit a watermark with 33 until
> it received a watermark with >= 33 from Map(2).
>
> All operators use this logic for watermark propagation. So it does not
> matter whether this is a window operator or a CEP operator.
>
> Let me know if you have further questions,
>
> Fabian
>
>
>
> 2017-06-12 21:55 GMT+02:00 Ray Ruvinskiy :
>
> Thanks!
>
>
>
> I had a couple some follow-up questions to the example in the
> documentation. Suppose Source 1 sends a watermark of 33, and Source 2 sends
> a watermark of 17. If I understand correctly, map(1) will forward the
> watermark of 33 to window(1) and window(2), and map(2) will forward the
> watermark of 17 to the same window operators. I’m assuming there is nothing
> to prevent window(1) and window(2) from getting the watermark of 33 before
> the watermark of 17, right? In that case, how do window(1) and window(2)
> compute the minimum watermark to forward to the next operator downstream?
> Will it be a per-window watermark?
>
>
>
> What would happen if instead if a window operator in that position, we had
> something like the CEP operator, which in effect maintains state and does
> aggregations without windowing (or another similar such operator)? How does
> it determine what the minimum watermark is at any given time, in light of
> the fact that, in principle, it might receive a watermark value smaller
> than anything it’s seen before from a parallel source?
>
>
>
> Ray
>
>
>
> *From: *Fabian Hueske 
> *Date: *Sunday, June 11, 2017 at 5:54 PM
>
>
> *To: *Ray Ruvinskiy 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: At what point do watermarks get inje

Re: Cannot write record to fresh sort buffer. Record too large.

2017-06-14 Thread Ted Yu
For the 'Requested array size exceeds VM limit' error, can you pastebin the
full stack trace ?

Thanks

On Wed, Jun 14, 2017 at 3:22 AM, Sebastian Neef <
gehax...@mailbox.tu-berlin.de> wrote:

> Hi,
>
> I removed the .distinct() and ran another test.
>
> Without filtering duplicate entries, the Job processes more data and
> runs much longer, but eventually fails with the following error:
>
> > java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>
> Even then playing around with the aforementioned Flink settings does not
> resolve the problem.
>
> I guess, I need to debug this some more.
>
> Best,
> Sebastian
>


Re: union followed by timestamp assignment / watermark generation

2017-06-14 Thread Aljoscha Krettek
Hi Petr,

I just stumbled across this (slightly older) mail. Your example on pastebin is 
not available anymore but I’m guessing you have roughly these two topologies:

1.

Source1 -> Map1 -> ExtractTimestamps -| 
  | ->  Map3 …
Source2 -> Map2 -> ExtractTimestamps -|

The union is not visible at the graph level, it’s implicit in the combination 
of the two input streams.

2.

Source1 -> Map1 -| 
  | -> ExtractTimestamps -> Map3 …
Source2 -> Map2 -|

The union is not visible at the graph level, it’s implicit in the combination 
of the two input streams.

I’m also guessing that you have a timestamp/watermark assigner where the 
watermark is the highest-seen timestamp minus some lateness bound. I think the 
behaviour is not necessarily an artefact of the Flink implementation (with maps 
and extractors being fused together) but results from the graph itself and how 
watermarks are defined and how the extractor works: in the first case, each 
stream (before the union) has its own watermark and the watermark at Map3 is 
the minimum over those watermarks. This explains why a lower watermark on the 
one stream holds back the watermark in total at Map3. In the second case, the 
two streams are unioned together before extracting a timestamp/watermark and 
the choice of timestamp extractor (which takes the highest-seen timestamp) 
means that the watermark now advances “faster” because there is logically not a 
slower, separate stream anymore.

Is that analysis correct? Does my description roughly make sense?

Best,
Aljoscha

> On 6. May 2017, at 15:00, Petr Novotnik  wrote:
> 
> Hello Flinkers,
> 
> Given this small example program:
> 
>> https://pastebin.com/30JbbgpH
> 
> I'd expect the output:
> 
>> one|three
>> two|four
> 
> However, I consistently receive ...
> 
>> one
>> two|four
> 
> ... due to "three" being considered a late-comer which then gets
> discarded. When I remove `assignTimestampsAndWatermarks` after the
> `union` and place it separately on each of the union's inputs, i.e.
> before the `union`, I get what I expect.
> 
> Now, after digging through Flink's source code, this behavior actually
> seems logical to me (since the `assignTimestampsAndWatermarks` and `map`
> operators form one task). Though, from a user/api perspective, it is at
> least surprising.
> 
> I wanted to ask whether kind of behavior is known, indented or maybe
> something to be improved to avoid the gotcha?
> 
> Many thanks in advance,
> Pete.
> 



Re: Queryable State

2017-06-14 Thread Nico Kruber
Hi Chet,
you should not see a 
org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation when querying an 
existing(!) key.
However, if you query a key the non-registered TaskManager is responsible for, 
I suppose this is the exception you will get. Unfortunately, the queryable 
state API still seems to be rough around the edges.

I suspect that the TaskManagers register their queryable state only after 
receiving data(?) and this causes the UnknownKvStateKeyGroupLocation instead 
of a UnknownKeyOrNamespace.


Nico

On Thursday, 4 May 2017 20:05:29 CEST Chet Masterson wrote:
> I found the issue. When parallelism = 3, my test data set was skewed such
> that data was only going to two of the three task managers (kafka partition
> = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a
> test data set with enough keys that spread across all three task managers,
> queryable state started working as expected. That is why only two KVStates
> were registered with the job manager, instead of three. 
> my FINAL :-) questionshould I be getting
> org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event
> only N-1 task managers have data in a parallelism of N situation? 
> Thanks for all the help!
>  
>  
> 04.05.2017, 11:24, "Ufuk Celebi" :
> Could you try KvStateRegistry#registerKvState please?
> 
> In the JM logs you should see something about the number of connected
> task managers and in the task manager logs that each one connects to a
> JM.
> 
> – Ufuk
> 
> 
> On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
>  wrote:
> 
>  Can do. Any advice on where the trace prints should go in the task manager
>  source code?
> 
>  BTW - How do I know I have a correctly configured cluster? Is there a set
> of messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
> 
>  Thanks!
> 
> 
>  02.05.2017, 06:03, "Ufuk Celebi" :
> 
>  Hey Chet! I'm wondering why you are only seeing 2 registration
>  messages for 3 task managers. Unfortunately, there is no log message
>  at the task managers when they send out the notification. Is it
>  possible for you to run a remote debugger with the task managers or
>  build a custom Flink version with the appropriate log messages on the
>  task manager side?
>  – Ufuk
> 
> 
>  On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>   wrote:
> 
> 
> 
>   Any insight here? I've got a situation where a key value state on a task
>   manager is being registered with the job manager, but when I try to query
>   it, the job manager responds it doesn't know the location of the key value
> state...
> 
> 
>   26.04.2017, 12:11, "Chet Masterson" :
> 
>   After setting the logging to DEBUG on the job manager, I learned four
>   things:
> 
>   (On the message formatting below, I have the Flink logs formatted into
> JSON so I can import them into Kibana)
> 
>   1. The appropriate key value state is registered in both parallelism = 1
>  and
>   parallelism = 3 environments. In parallelism = 1, I saw one registration
>   message in the log, in the parallelism = 3, I saw two registration
>  messages:
>   {"level":"DEBUG","time":"2017-04-26
> 
>  15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc
> ":"", "msg":"Key value state registered for job  under name
> "}
> 
>   2. When I issued the query in both parallelism = 1 and parallelism = 3
>   environments, I saw "Lookup key-value state for job  with
>   registration name ". In parallelism = 1, I saw 1 log message,
> in parallelism = 3, I saw two identical messages.
> 
>   3. I saw no other messages in the job manager log that seemed relevant.
> 
>   4. When issuing the query in parallelism = 3, I continued to get the
> error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>   of null.
> 
>   Thanks!
> 
> 
> 
> 
> 
>   26.04.2017, 09:52, "Ufuk Celebi" :
> 
>   Thanks! Your config looks good to me.
> 
>   Could you please set the log level org.apache.flink.runtime.jobmanager to
>   DEBUG?
> 
>   log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
> 
>   Then we can check whether the JobManager logs the registration of the
>   state instance with the respective name in the case of parallelism >
>   1?
> 
>   Expected output is something like this: "Key value state registered
>   for job ${msg.getJobId} under name ${msg.getRegistrationName}."
> 
>   – Ufuk
> 
>   On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>wrote:
> 
>Ok...more information.
> 
>1. Built a fresh cluster from the ground up. Started testing queryable
>   state
>at each step.
>2. When running under any configuration of task managers and job managers
> were parallelism = 1, the queries execute as expected.
>3. A

Re: Can't kill a job which contains a while loop in the main method before it be submitted

2017-06-14 Thread Aljoscha Krettek
Hi,

Could you post a short snipped so that we see how your code works. If the call 
to jedis is in the main() method before executing Flink then the problem is 
actually out of Flink’s power.

Best,
Aljoscha

> On 14. Jun 2017, at 06:09, XiangWei Huang  wrote:
> 
> Hi,
> I met a problem when use jedis in flink.When using jedis to get a connection 
> to redis  if the redis server is not available then jedis will keep trying 
> and 
> never end,the problem is that the job’s status is not set to RUNNING by 
> flink, that means it can’t be killed by flink.The only way to break this look 
> is to 
> restart jobmanager.
> Is there another way to solve this without restart jobmanager.



Re: Java parallel streams vs IterativeStream

2017-06-14 Thread Aljoscha Krettek
I see, I’m afraid that is not easily possible except by doing a custom stateful 
function that waits for all elements to arrive and combines them again.

Another thing you could look at is the async I/O operator: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
 
.
 You would not necessarily use it for I/O but spawn threads for processing your 
data in parallel.

Best,
Aljoscha

> On 13. Jun 2017, at 11:20, nragon  wrote:
> 
> That would work but after FlatMap, T> I would have to downstream
> all elements into one.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Java-parallel-streams-vs-IterativeStream-tp13655p13685.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Java parallel streams vs IterativeStream

2017-06-14 Thread nragon
I've mentioned java 8 stream beacuse avoids leaving map, thus decreasing
network io, if not chained, and takes advantage of multiple cpus. Guess will
have to test it.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Java-parallel-streams-vs-IterativeStream-tp13655p13735.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Java parallel streams vs IterativeStream

2017-06-14 Thread Aljoscha Krettek
Ah yes, I forgot Java8 streams. That could probably be your best option. Yes!

> On 14. Jun 2017, at 16:25, nragon  wrote:
> 
> I've mentioned java 8 stream beacuse avoids leaving map, thus decreasing
> network io, if not chained, and takes advantage of multiple cpus. Guess will
> have to test it.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Java-parallel-streams-vs-IterativeStream-tp13655p13735.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Java parallel streams vs IterativeStream

2017-06-14 Thread nragon
I'll test with java 8 streams.

Thanks,
Nuno



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Java-parallel-streams-vs-IterativeStream-tp13655p13737.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

2017-06-14 Thread Aljoscha Krettek
Hi Marc,

Does the problem still persist? Could you try running it with the custom Kryo 
serialiser that Flavio posted here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/com-esotericsoftware-kryo-KryoException-and-java-lang-IndexOutOfBoundsException-tp7459p7461.html
 


Or, better yet. Try implementing your custom type in such a way that it doesn’t 
need to Kryo serialiser but can work with the PojoSerializer. For this you need 
to ensure that the type as a zero-argument constructor and that all fields are 
either public or have public getters/setters.

Best,
Aljoscha

> On 8. May 2017, at 19:39, Kaepke, Marc  wrote:
> 
> Hi,
> 
> did some had an answer or solution?
> 
> Best
> Marc
> 
>> Am 05.05.2017 um 20:05 schrieb Kaepke, Marc > >:
>> 
>> Hi everyone,
>> 
>> what does mean that following exception, if I run my gelly program?
>> 
>> Exception in thread "main" 
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
>> at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
>> at 
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>> at 
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at 
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
>> at 
>> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>> at 
>> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>> at 
>> org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' 
>> terminated due to an exception: null
>> at 
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.lang.NullPointerException
>> at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
>> at java.util.ArrayList$SubList.size(ArrayList.java:1040)
>> at java.util.AbstractList.add(AbstractList.java:108)
>> at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>> at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>> at 
>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>> at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>> at 
>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>> at 
>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
>> at 
>> org.apache.flink.runtime.operators.util.ReaderIter

Re: Cannot write record to fresh sort buffer. Record too large.

2017-06-14 Thread Sebastian Neef
Hi Ted,

sure.

Here's the stack strace with .distinct() with the Exception in the
'SortMerger Reading Thread': [1]

Here's the stack strace without .distinct() and the 'Requested array
size exceeds VM limit' error: [2]

If you need anything else, I can more or less reliably reproduce the issue.

The best,
Sebastian

[1]
http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc=
[2]
http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ=


Re: ReduceFunction mechanism

2017-06-14 Thread nragon
just an fyi: currently i'm doing map -> keyby -> reduce which in fact could
only be keyby -> reduce since reduce can have the map logic.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ReduceFunction-mechanism-tp13651p13740.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Cannot write record to fresh sort buffer. Record too large.

2017-06-14 Thread Ted Yu
For #2, XmlInputFormat was involved.

Is it possible to prune (unneeded) field(s) so that heap requirement is
lower ?

On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <
gehax...@mailbox.tu-berlin.de> wrote:

> Hi Ted,
>
> sure.
>
> Here's the stack strace with .distinct() with the Exception in the
> 'SortMerger Reading Thread': [1]
>
> Here's the stack strace without .distinct() and the 'Requested array
> size exceeds VM limit' error: [2]
>
> If you need anything else, I can more or less reliably reproduce the issue.
>
> The best,
> Sebastian
>
> [1]
> http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/
> V91S55kWH3dwEuyAkc=
> [2]
> http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/
> r4YICRkLCH2HBNN9yQ=
>


How to divide streams on key basis and deliver them

2017-06-14 Thread AndreaKinn
Hi, this is my project purpose using Kafka and Flink:


 
In kafka topics there are streams representing sensor lectures of different
subjects. Each topic is reserved for a different sensor.
Every messages are attached with a key using kafka keyed messages. The key
represent a subject id and the attached sensor data belong to the
highlighted subject.

In Flink I want to:
- Get these streams
- Separate streams on key (subject) basis in order to build a node chain
which evaluates always same sensor values of same subjects.

Thanks to you, I have correctly implemented a custom deserializer in order
to get data and key from Kafka. So now I need to separate streams on key
basis. 
As you can see in schema image, in my mind each circle represents a
different physical machine in a cluster I the deserializer runs over the
bigger circles which separate streams and deliver them to different smaller
circles on key basis. 

I read the doc and I think I have to use keyBy() operator on DataStream in
order to obtain a KeyedStream. 
It carry me to my first question:
- I tried to print datastream and keyedstream.
The former give me this:


 

while the latter give me this:


 

What do the numbers before the record string means (the '3' in the latter
case)? 


Then:
- How can I 'deliver' the streams in following nodes (smaller circles) on
key basis?

Now I'm developing on a single machine just to try and learn but also I'm a
bit confused about how to develop it on cluster.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-divide-streams-on-key-basis-and-deliver-them-tp13743.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Clarification on state backend parameters

2017-06-14 Thread bowen.li
Hi guys, 
This is great clarification! 

An extended question from me is, what's the difference between
`state.checkpoints.dir` and the param you pass in to RocksDBStateBackend
constructor in`public RocksDBStateBackend(URI checkpointDataUri) throws
IOException`? They are really confusing. 

I specified checkpointDataUri but got error of `CheckpointConfig says to
persist periodic checkpoints, but no checkpoint directory has been
configured. You can configure configure one via key
'state.checkpoints.dir'.`. 

Thanks, 
Bowen



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-on-state-backend-parameters-tp11419p13744.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Clarification on state backend parameters

2017-06-14 Thread Bowen Li
FYI,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-on-state-backend-parameters-td11419.html
here's the context that discussed differences among:

state.backend.fs.checkpointdir
state.backend.rocksdb.checkpointdir
state.checkpoints.dir

On Wed, Jun 14, 2017 at 12:20 PM, bowen.li  wrote:

> Hi guys,
> This is great clarification!
>
> An extended question from me is, what's the difference between
> `state.checkpoints.dir` and the param you pass in to RocksDBStateBackend
> constructor in`public RocksDBStateBackend(URI checkpointDataUri) throws
> IOException`? They are really confusing.
>
> I specified checkpointDataUri but got error of `CheckpointConfig says
> to
> persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key
> 'state.checkpoints.dir'.`.
>
> Thanks,
> Bowen
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Clarification-
> on-state-backend-parameters-tp11419p13744.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: How to divide streams on key basis and deliver them

2017-06-14 Thread Carst Tankink
Hi,

Let me try to explain this from another user’s perspective ☺

When you run your application, Flink will map your logical/application topology 
onto a number of task slots (documented in more detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/job_scheduling.html).
 
Basically, if it is possible/unless told otherwise, Flink will create a number 
of copies of your functions that is 
On 6/14/17, 21:19, "AndreaKinn"  wrote:

Hi, this is my project purpose using Kafka and Flink:



 
In kafka topics there are streams representing sensor lectures of different
subjects. Each topic is reserved for a different sensor.
Every messages are attached with a key using kafka keyed messages. The key
represent a subject id and the attached sensor data belong to the
highlighted subject.

In Flink I want to:
- Get these streams
- Separate streams on key (subject) basis in order to build a node chain
which evaluates always same sensor values of same subjects.

Thanks to you, I have correctly implemented a custom deserializer in order
to get data and key from Kafka. So now I need to separate streams on key
basis. 
As you can see in schema image, in my mind each circle represents a
different physical machine in a cluster I the deserializer runs over the
bigger circles which separate streams and deliver them to different smaller
circles on key basis. 

I read the doc and I think I have to use keyBy() operator on DataStream in
order to obtain a KeyedStream. 
It carry me to my first question:
- I tried to print datastream and keyedstream.
The former give me this:



 

while the latter give me this:



 

What do the numbers before the record string means (the '3' in the latter
case)? 


Then:
- How can I 'deliver' the streams in following nodes (smaller circles) on
key basis?

Now I'm developing on a single machine just to try and learn but also I'm a
bit confused about how to develop it on cluster.





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-divide-streams-on-key-basis-and-deliver-them-tp13743.html
Sent from the Apache Flink User Mailing List archive. mailing list archive 
at Nabble.com.




Re: How to divide streams on key basis and deliver them

2017-06-14 Thread Carst Tankink
Ugh, accidentally pressed send already…. 

When you run your application, Flink will map your logical/application 
topology onto a number of task slots (documented in more detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/job_scheduling.html).
 
Basically, if it is possible/unless told otherwise, Flink will create a 
number of copies of your functions that is… 

… equal to the number of task slots: each copy of the function runs in a 
separate task slot.
 
KeyBy partitions your data for further processing, so applying a function to 
the KeyedStream makes that function apply to all elements of the stream that 
have the same key. In addition, the KeyedStream gets distributed to different 
task managers. 

This is an answer for your question 1: the number before the record string is 
the id/sequence number of the copy of the print sink function that is 
processing that record. 
In the first case, there is no key, so the records go to arbitrary printer 
instances. 
In the second case, all records have the same key (subect), so they are routed 
to the same copy of the print function, in this case with id=3.  If you had 
records with a different subject, changes are pretty good they would all be 
printed by a different print function.

Regarding your second question, I already answered this a bit, but you might 
want to look at 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html: 
after keying your stream, you define the window of elements you want to process 
at a time, and then apply a function to the elements in each window, for each 
separate key. These functions would be your smaller circles, I suppose.

As to local/cluster: since running Flink locally already gives you some 
parallelism (it defaults to the number of CPU cores on your machine, I 
believe), you already see a distributed version of your application. When you 
run on a cluster, the only thing that really changes is how you start the 
application  (See, e.g. 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/cluster_setup.html
 for a cluster setup, but it depends on what cluster you have available). Flink 
abstracts away the specifics of per-node communication in its API already.



Hope that helps,
Carst





On 6/15/17, 08:19, "Carst Tankink"  wrote:

Hi,

Let me try to explain this from another user’s perspective ☺

When you run your application, Flink will map your logical/application 
topology onto a number of task slots (documented in more detail here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/job_scheduling.html).
 
Basically, if it is possible/unless told otherwise, Flink will create a 
number of copies of your functions that is 
On 6/14/17, 21:19, "AndreaKinn"  wrote:

Hi, this is my project purpose using Kafka and Flink:



 
In kafka topics there are streams representing sensor lectures of 
different
subjects. Each topic is reserved for a different sensor.
Every messages are attached with a key using kafka keyed messages. The 
key
represent a subject id and the attached sensor data belong to the
highlighted subject.

In Flink I want to:
- Get these streams
- Separate streams on key (subject) basis in order to build a node chain
which evaluates always same sensor values of same subjects.

Thanks to you, I have correctly implemented a custom deserializer in 
order
to get data and key from Kafka. So now I need to separate streams on key
basis. 
As you can see in schema image, in my mind each circle represents a
different physical machine in a cluster I the deserializer runs over the
bigger circles which separate streams and deliver them to different 
smaller
circles on key basis. 

I read the doc and I think I have to use keyBy() operator on DataStream 
in
order to obtain a KeyedStream. 
It carry me to my first question:
- I tried to print datastream and keyedstream.
The former give me this:



 

while the latter give me this:



 

What do the numbers before the record string means (the '3' in the 
latter
case)? 


Then:
- How can I 'deliver' the streams in following nodes (smaller circles) 
on
key basis?

Now I'm developing on a single machine just to try and learn but also 
I'm a
bit confused about how to develop it on cluster.




   

Re: Cannot write record to fresh sort buffer. Record too large.

2017-06-14 Thread Stephan Ewen
Here are some pointers

  - You would rather need MORE managed memory, not less, because the sorter
uses that.

  - We added the "large record handler" to the sorter for exactly these use
cases. Can you check in the code whether it is enabled? You'll have to go
through a bit of the code to see that. It is an older Flink version, I am
not quite sure any more how exactly it was there.

Stephan


On Wed, Jun 14, 2017 at 8:59 PM, Ted Yu  wrote:

> For #2, XmlInputFormat was involved.
>
> Is it possible to prune (unneeded) field(s) so that heap requirement is
> lower ?
>
> On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <
> gehax...@mailbox.tu-berlin.de> wrote:
>
>> Hi Ted,
>>
>> sure.
>>
>> Here's the stack strace with .distinct() with the Exception in the
>> 'SortMerger Reading Thread': [1]
>>
>> Here's the stack strace without .distinct() and the 'Requested array
>> size exceeds VM limit' error: [2]
>>
>> If you need anything else, I can more or less reliably reproduce the
>> issue.
>>
>> The best,
>> Sebastian
>>
>> [1]
>> http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9
>> KCR48m+/V91S55kWH3dwEuyAkc=
>> [2]
>> http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN
>> 6xxApGk/r4YICRkLCH2HBNN9yQ=
>>
>
>