Re: Issue with back pressure and AsyncFunction

2017-11-10 Thread Ufuk Celebi
Hey Ken,

thanks for your message. Both your comments are correct (see inline).

On Fri, Nov 10, 2017 at 10:31 PM, Ken Krugler
 wrote:
> 1. A downstream function in the iteration was (significantly) increasing the
> number of tuples - it would get one in, and sometimes emit 100+.
>
> The output would loop back as input via the iteration.
>
> This eventually caused the network buffers to fill up, and that’s why the
> job got stuck.
>
> I had to add my own tracking/throttling in one of my custom function, to
> avoid having too many “active” tuples.
>
> So maybe something to note in documentation on iterations, if it’s not there
> already.

Yes, iterations are prone to deadlock due to the way that data is
exchanged between the sink and head nodes. There have been multiple
attempts to fix these shortcomings, but I don't know what the latest
state is. Maybe Aljoscha (CC'd) has some input...

> 2. The back pressure calculation doesn’t take into account AsyncIO

Correct, the back pressure monitoring only takes the main task thread
into account. Every operator that uses a separate thread to emit
records (like Async I/O oder Kafka source) is therefore not covered by
the back pressure monitoring.

– Ufuk


Issue with back pressure and AsyncFunction

2017-11-10 Thread Ken Krugler
Hi all,

I was debugging a curious problem with a streaming job that contained an 
iteration and several AsynFunctions.

The entire job would stall out, with no progress being made.

But when I checked back pressure, only one function showed it as being high - 
everything else was OK.

And when I dumped threads, the only bit of my code that was running was indeed 
that one function w/high back pressure, stuck while making a collect() call.

There were two issues here….

1. A downstream function in the iteration was (significantly) increasing the 
number of tuples - it would get one in, and sometimes emit 100+.

The output would loop back as input via the iteration.

This eventually caused the network buffers to fill up, and that’s why the job 
got stuck.

I had to add my own tracking/throttling in one of my custom function, to avoid 
having too many “active” tuples.

So maybe something to note in documentation on iterations, if it’s not there 
already.

2. The back pressure calculation doesn’t take into account AsyncIO

When I double-checked the thread dump, there were actually a number of threads 
(one for each of my AsyncFunctions) that were stuck calling collect().

These all were named "AsyncIO-Emitter-Thread (…). For 
example:

> "AsyncIO-Emitter-Thread (MyAsyncFunction -> ()) (1/1))" #125 daemon 
> prio=5 os_prio=31 tid=0x7fb191025800 nid=0xac0b in Object.wait() 
> [0x7000123f]
>java.lang.Thread.State: TIMED_WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224)
>   - locked <0x000773cb3ec0> (a java.util.ArrayDeque)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>   - locked <0x000773b98020> (a 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>   at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:140)
>   at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:42)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at 
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:132)
>   - locked <0x000773b1bb70> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>   at java.lang.Thread.run(Thread.java:748)



I’m assuming that when my AsyncFunction calls collect(), this hands off the 
tuple to this AsyncIO-Emitter-Thread thread, which is why none of my code 
(either AsyncFunctions or threads in my pool doing async stuff) shows up in the 
thread dump.

And I’m assuming that the back pressure calculation isn’t associating these 
threads with the source function, which is why they don’t show up in the GUI.

I’m hoping someone can confirm the above. If so, I’ll file an issue.

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-10 Thread Vergilio, Thalita
Hi Til,


Thank you very much for that. And thanks for your help. I have finally managed 
to get the multi-cloud setup on Docker Swarm working by tweaking the Flink 
image slightly to set these configuration options to known values. I have also 
used the Weave Net Docker plugin to create my cross-cloud network.


I am in the process of documenting my experience in a blog article, which I 
will share in this list so others can hopefully benefit from it.


Thank you and the rest of the Flink team once again for all your help and 
support.


Best wishes,


Thalita


From: Till Rohrmann 
Sent: 10 November 2017 12:15:00
To: Vergilio, Thalita
Cc: Piotr Nowojski; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes


Hi Thalita, yes you can use the mentioned configuration parameters to set the 
ports for the TaskManager and the BlobServer. However, you must make sure that 
there is at most one TM running on a host, otherwise you run into port 
collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till

​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita 
>
 wrote:

Hi All,


I just wanted to let you know that I have finally managed to get the 
multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker 
plugin called Weave to create the Swarm network, a public external IP address 
for each node and opened a range of ports, and I can now get my Google Cloud 
machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open 
for TaskManager communication in Flink. They seem to be getting assigned 
randomly at runtime, so I had to open a wide range of ports to allow the 
communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a 
constant value? Looking at the documentation, the suspects are:


  *   taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets 
the OS choose a free port).

  *   taskmanager.data.port: The task manager’s port used for data exchange 
operations (DEFAULT: 0, which lets the OS choose a free port).

  *   blob.server.port: Port definition for the blob server (serving user JARs) 
on the TaskManagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann

Cc: Piotr Nowojski; user@flink.apache.org; 
Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes


Hi Till,


I have made some progress with the name resolution for machines that are not in 
the same subnet. The problem I am facing now is Flink-specific, so I wonder if 
you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and 
the taskmanager in the Google cloud. However, when I scale the taskmanager up 
and it start running on Azure nodes as well, I get an Akka error which I 
presume means the taskmanagers can't talk to each other when parallelising the 
task.


Do you know what the IP address and port below are? Are they assigned by Flink?


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
(b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
(3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding 
after a timeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
at akka.dispatch.Recover.internal(Future.scala:268)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at 

Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Derek VerLee

  
  
I was about to ask this question myself.  I find myself re-keying
  by the same keys repeatedly.  I think in principle you could
  always just roll more work into one window operation with a more
  complex series of maps/folds/windowfunctions or processfunction. 
  However this doesn't always feel the most clean or convenient, or
  composible.  It would be great if there was a way to just express
  that you want to keep the same partitions as the last window, or
  that the new key is 1-to-1 with the previous one.  Even more
  generally, if the new key is "based" off the old key in a way that
  is one to one or one to many, in either case it may not be
  necessary to send data over the wire, although in the later case,
  there is a risk of hot-spotting , I suppose.

On 11/10/17 12:01 PM, Gwenhael
  Pasquiers wrote:


  
  
  
  
I think I finally found
a way to “simulate” a Timer thanks to the the
processWatermark function of the AbstractStreamOperator.
 
Sorry for the monologue.
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org'

Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Hello,
 
Finally, even after
creating my operator, I still get the error : “Timers can
only be used on keyed operators”.
 
Isn’t there any way
around this ? A way to “key” my stream without shuffling the
data ?
 

  
From: Gwenhael
Pasquiers

Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers ;
'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Maybe you don’t need to
bother with that question.
 
I’m currently
discovering AbstractStreamOperator, OneInputStreamOperator
and Triggerable.
 
That should do it :-)
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
Subject: Streaming : a way to "key by partition
id" without redispatching data
  

 
Hello,
 
(Flink 1.2.1)
 
For performances reasons
I’m trying to reduce the volume of data of my stream as soon
as possible by windowing/folding it for 15 minutes before
continuing to the rest of the chain that contains keyBys and
windows that will transfer data everywhere.
 
Because of the huge
volume of data, I want to avoid “moving” the data between
partitions as much as possible (not like a naïve KeyBy
does). I wanted to create a custom ProcessFunction (using
timer and state to fold data for X minutes) in order to fold
my data over itself before keying the stream but even
ProcessFunction needs a keyed stream…
 
Is there a specific
“key” value that would ensure me that my data won’t be moved
to another taskmanager (that it’s hashcode will match the
partition it is already in) ? I thought about the subtask id
but I doubt I’d be that lucky :-) 
 
Suggestions

  ·
  Wouldn’t
it be useful to be able to do a “partitionnedKeyBy” that
would not move data between nodes, for windowing operations
that can be parallelized.

  o  
  Something
like kafka => partitionnedKeyBy(0) => first folding
=> keyBy(0) => second folding => ….

  ·
  Finally,
aren’t all streams keyed ? Even if they’re keyed by a
totally arbitrary partition id until the user chooses its
own key, shouldn’t we be able to do a window (not windowAll)
or process over any normal Stream’s partition ?
 
B.R.
 
Gwenhaël PASQUIERS
  


  



RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
I think I finally found a way to "simulate" a Timer thanks to the the 
processWatermark function of the AbstractStreamOperator.

Sorry for the monologue.

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers 
>; 
'user@flink.apache.org' >
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
I have a couple of concerns.

1. Your logs seems to be incomplete. There are for example missing at the 
beginning configuration output (see attached example log). Also output file 
seems strange to me (like duplicated log file). Please submit full logs.

2. If your heap size is 1.5GB, how is it possible that on your screenshot you 
are showing memory usage ~40GBs with some process using 10GB?

Please analyse what is actually consuming all of that memory and ensure that 
your machine does not use swap. If the memory consumption comes from Flink, 
please check jvm's memory pools using jconsole, maybe something off heap is 
using the memory. Especially pay attention at your PermGen/Metaspace/Code 
pools, since they can cause class loading issues.

3. 1.5GB for heap is very low value. From your screenshots I assumed that you 
have set heap to some enormous value (htop screenshot showing ~40GB memory 
usage on the machine). It might be just too small value and you should increase 
it. Especially that you are trying to run multiple jobs at the same time with 
300 task slots. But increase it only after you solve your issue of  other 
things eating up your memory.

Piotrek


flink-pnowojski-taskmanager-9-piotr-mbp.log
Description: Binary data



> On 10 Nov 2017, at 16:05, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> 
> On 2017-11-10 18:01, ÇETİNKAYA EBRU ÇETİNKAYA EBRU wrote:
>> On 2017-11-10 17:50, Piotr Nowojski wrote:
>>> I do not see anything abnormal in the logs before this error :(
>>> What are your JVM settings and which java version are you running?
>>> What happens if you limit the heap size so that the swap is never
>>> used?
>>> Piotrek
 On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
  wrote:
 On 2017-11-10 13:14, Piotr Nowojski wrote:
> jobmanager1.log and taskmanager2.log are the same. Can you also submit
> files containing std output?
> Piotrek
>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>  wrote:
>> On 2017-11-10 11:04, Piotr Nowojski wrote:
>>> Hi,
>>> Thanks for the logs, however I do not see before mentioned exceptions
>>> in it. It ends with java.lang.InterruptedException
>>> Is it the correct log file? Also, could you attach the std output file
>>> of the failing TaskManager?
>>> Piotrek
 On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
  wrote:
 On 2017-11-09 20:08, Piotr Nowojski wrote:
> Hi,
> Could you attach full logs from those task managers? At first glance I
> don’t see a connection between those exceptions and any memory issue
> that you might had. It looks like a dependency issue in one (some?
> All?) of your jobs.
> Did you build your jars with -Pbuild-jar profile as described here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
> ?
> If that doesn’t help. Can you binary search which job is causing the
> problem? There might be some Flink incompatibility between different
> versions and rebuilding a job’s jar with a version matching to the
> cluster version might help.
> Piotrek
>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>  wrote:
>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>> Btw, Ebru:
>> I don’t agree that the main suspect is NetworkBufferPool. On your
>> screenshots it’s memory consumption was reasonable and stable:
>> 596MB
>> -> 602MB -> 597MB.
>> PoolThreadCache memory usage ~120MB is also reasonable.
>> Do you experience any problems, like Out Of Memory
>> errors/crashes/long
>> GC pauses? Or just JVM process is using more memory over time? You
>> are
>> aware that JVM doesn’t like to release memory back to OS once it
>> was
>> used? So increasing memory usage until hitting some limit (for
>> example
>> JVM max heap size) is expected behaviour.
>> Piotrek
>> On 8 Nov 2017, at 15:48, Piotr Nowojski 
>> wrote:
>> I don’t know if this is relevant to this issue, but I was
>> constantly getting failures trying to reproduce this leak using your
>> Job, because you were using non deterministic getKey function:
>> @Override
>> public Integer getKey(Integer event) {
>> Random randomGen = new Random((new Date()).getTime());
>> return randomGen.nextInt() % 8;
>> }
>> And quoting Java doc of KeySelector:
>> "If invoked multiple times on the same object, the returned key must
>> be the same.”
>> I’m trying to reproduce this issue with following job:

Re: Queryable State Python

2017-11-10 Thread Kostas Kloudas
Hi Martin,

I will try to reply to your questions inline:

> On Nov 10, 2017, at 1:59 PM, Martin Eden  wrote:
> 
> Hi,
> 
> Our team is looking at replacing Redis with Flink's own queryable state 
> mechanism. However our clients are using python.
> 
> 1. Is there a python integration with the Flink queryable state mechanism?
> Cannot seem to be able to find one.

There is no Python API for queryable state. Currently only Java is supported.

> 2. If not, is it on the roadmap?

I am not aware of any efforts towards that direction, although there are 
discussions about porting queryable state to REST, so 
that more clients (in any language) can be written.
> 
> 3. Our current solution is to write a Java RPC proxy and query that from 
> python. The whole point of using queryable state was to get rid of an extra 
> service (Redis) but now it seems we need to add another one. Is there an 
> easier way to call queryable state from Python without requiring an extra 
> service?

Unfortunately for now I am not aware of any easier way to do so.

> 
> 4. Is queryable state used in production by anyone? Can anyone share numbers, 
> experiences, case studies?

Queryable state is currently under heavy development. So APIs may change and 
features may be added. 
For queryable state users, I would recommend checking out the talks in previous 
Flink Forward editions. They are all on-line. 

> 
> 5. What is the direction that queryable state is going in for the next Flink 
> release? Features, api?

The next release is going to come soon (it is currently under testing), so you 
can already have a look on how Queryable State
is going to look like if you check out the current release-1.4 branch of Flink 
on GitHub.

> 
> 6. Is the Flink queryable state going to be supported/developed going forward 
> with the advent of Pravega which has caching like capabilities as well?

For this I am cc’ing Stephan. Probably he is more informed.

Hope this helps,
Kostas

> Thanks,
> M



RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Hello,

Finally, even after creating my operator, I still get the error : "Timers can 
only be used on keyed operators".

Isn't there any way around this ? A way to "key" my stream without shuffling 
the data ?

From: Gwenhael Pasquiers
Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers ; 
'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by partition id" without redispatching 
data

Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
>
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


Re: Flink memory leak

2017-11-10 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU

On 2017-11-10 17:50, Piotr Nowojski wrote:

I do not see anything abnormal in the logs before this error :(

What are your JVM settings and which java version are you running?
What happens if you limit the heap size so that the swap is never
used?

Piotrek

On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
 wrote:


On 2017-11-10 13:14, Piotr Nowojski wrote:
jobmanager1.log and taskmanager2.log are the same. Can you also 
submit

files containing std output?
Piotrek
On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
 wrote:

On 2017-11-10 11:04, Piotr Nowojski wrote:

Hi,
Thanks for the logs, however I do not see before mentioned 
exceptions

in it. It ends with java.lang.InterruptedException
Is it the correct log file? Also, could you attach the std output 
file

of the failing TaskManager?
Piotrek
On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
 wrote:

On 2017-11-09 20:08, Piotr Nowojski wrote:

Hi,
Could you attach full logs from those task managers? At first 
glance I
don’t see a connection between those exceptions and any memory 
issue
that you might had. It looks like a dependency issue in one 
(some?

All?) of your jobs.
Did you build your jars with -Pbuild-jar profile as described 
here:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
?
If that doesn’t help. Can you binary search which job is causing 
the
problem? There might be some Flink incompatibility between 
different
versions and rebuilding a job’s jar with a version matching to 
the

cluster version might help.
Piotrek

On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:
On 2017-11-08 18:30, Piotr Nowojski wrote:
Btw, Ebru:
I don’t agree that the main suspect is NetworkBufferPool. On 
your

screenshots it’s memory consumption was reasonable and stable:
596MB
-> 602MB -> 597MB.
PoolThreadCache memory usage ~120MB is also reasonable.
Do you experience any problems, like Out Of Memory
errors/crashes/long
GC pauses? Or just JVM process is using more memory over time? 
You

are
aware that JVM doesn’t like to release memory back to OS once it
was
used? So increasing memory usage until hitting some limit (for
example
JVM max heap size) is expected behaviour.
Piotrek
On 8 Nov 2017, at 15:48, Piotr Nowojski 


wrote:
I don’t know if this is relevant to this issue, but I was
constantly getting failures trying to reproduce this leak using 
your

Job, because you were using non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
Random randomGen = new Random((new Date()).getTime());
return randomGen.nextInt() % 8;
}
And quoting Java doc of KeySelector:
"If invoked multiple times on the same object, the returned key 
must

be the same.”
I’m trying to reproduce this issue with following job:
https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
Where IntegerSource is just an infinite source, DisardingSink is
well just discarding incoming data. I’m cancelling the job every 
5

seconds and so far (after ~15 minutes) my memory consumption is
stable, well below maximum java heap size.
Piotrek
On 8 Nov 2017, at 15:28, Javier Lopez 
wrote:
Yes, I tested with just printing the stream. But it could take a
lot of time to fail.
On Wednesday, 8 November 2017, Piotr Nowojski
 wrote:
Thanks for quick answer.
So it will also fail after some time with `fromElements` source
instead of Kafka, right?
Did you try it also without a Kafka producer?
Piotrek
On 8 Nov 2017, at 14:57, Javier Lopez 
wrote:
Hi,
You don't need data. With data it will die faster. I tested as
well with a small data set, using the fromElements source, but 
it

will take some time to die. It's better with some data.
On 8 November 2017 at 14:54, Piotr Nowojski
 wrote:
Hi,
Thanks for sharing this job.
Do I need to feed some data to the Kafka to reproduce this

issue with your script?

Does this OOM issue also happen when you are not using the

Kafka source/sink?

Piotrek
On 8 Nov 2017, at 14:08, Javier Lopez 

wrote:

Hi,
This is the test flink job we created to trigger this leak

https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6

And this is the python script we are using to execute the job

thousands of times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107

The cluster we used for this has this configuration:
Instance type: t2.large
Number of workers: 2
HeapMemory: 5500
Number of task slots per node: 4
TaskMangMemFraction: 0.5
NumberOfNetworkBuffers: 2000
We have tried several things, increasing the heap, reducing the

heap, more memory fraction, changes this value in the
taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
I do not see anything abnormal in the logs before this error :(

What are your JVM settings and which java version are you running? What happens 
if you limit the heap size so that the swap is never used? 

Piotrek

> On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> 
> On 2017-11-10 13:14, Piotr Nowojski wrote:
>> jobmanager1.log and taskmanager2.log are the same. Can you also submit
>> files containing std output?
>> Piotrek
>>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>  wrote:
>>> On 2017-11-10 11:04, Piotr Nowojski wrote:
 Hi,
 Thanks for the logs, however I do not see before mentioned exceptions
 in it. It ends with java.lang.InterruptedException
 Is it the correct log file? Also, could you attach the std output file
 of the failing TaskManager?
 Piotrek
> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> On 2017-11-09 20:08, Piotr Nowojski wrote:
>> Hi,
>> Could you attach full logs from those task managers? At first glance I
>> don’t see a connection between those exceptions and any memory issue
>> that you might had. It looks like a dependency issue in one (some?
>> All?) of your jobs.
>> Did you build your jars with -Pbuild-jar profile as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>> ?
>> If that doesn’t help. Can you binary search which job is causing the
>> problem? There might be some Flink incompatibility between different
>> versions and rebuilding a job’s jar with a version matching to the
>> cluster version might help.
>> Piotrek
>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>  wrote:
>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>> Btw, Ebru:
>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>> screenshots it’s memory consumption was reasonable and stable:
>>> 596MB
>>> -> 602MB -> 597MB.
>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>> Do you experience any problems, like Out Of Memory
>>> errors/crashes/long
>>> GC pauses? Or just JVM process is using more memory over time? You
>>> are
>>> aware that JVM doesn’t like to release memory back to OS once it
>>> was
>>> used? So increasing memory usage until hitting some limit (for
>>> example
>>> JVM max heap size) is expected behaviour.
>>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski 
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
>>> On 8 Nov 2017, at 15:28, Javier Lopez 
>>> wrote:
>>> Yes, I tested with just printing the stream. But it could take a
>>> lot of time to fail.
>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>>  wrote:
>>> Thanks for quick answer.
>>> So it will also fail after some time with `fromElements` source
>>> instead of Kafka, right?
>>> Did you try it also without a Kafka producer?
>>> Piotrek
>>> On 8 Nov 2017, at 14:57, Javier Lopez 
>>> wrote:
>>> Hi,
>>> You don't need data. With data it will die faster. I tested as
>>> well with a small data set, using the fromElements source, but it
>>> will take some time to die. It's better with some data.
>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>>  wrote:
>>> Hi,
>>> Thanks for sharing this job.
>>> Do I need to feed some data to the Kafka to reproduce this
>> issue with your script?
 Does this OOM issue also happen when you are not using the
>> Kafka source/sink?
 Piotrek
 On 8 Nov 2017, at 14:08, Javier Lopez 
>> wrote:
 Hi,
 This is the test flink job we created to trigger 

Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-10 Thread Till Rohrmann
Hi Thalita, yes you can use the mentioned configuration parameters to set
the ports for the TaskManager and the BlobServer. However, you must make
sure that there is at most one TM running on a host, otherwise you run into
port collisions.

For taskmanager.rpc.port and blob.server.port you can define a range.

Cheers,
Till
​

On Fri, Nov 10, 2017 at 11:47 AM, Vergilio, Thalita <
t.vergilio4...@student.leedsbeckett.ac.uk> wrote:

> Hi All,
>
>
> I just wanted to let you know that I have finally managed to get the
> multi-cloud setup working!! I honestly can't believe my eyes. I used a
> Docker plugin called Weave to create the Swarm network, a public external
> IP address for each node and opened a range of ports, and I can now get my
> Google Cloud machine to connect to the Azure machines.
>
>
> There are still some minor issues, i.e. I don't know which exact ports to
> open for TaskManager communication in Flink. They seem to be getting
> assigned randomly at runtime, so I had to open a wide range of ports to
> allow the communication to happen, which is far from ideal.
>
>
> Is there a way of finding out what these ports are and setting them to a
> constant value? Looking at the documentation, the suspects are:
>
>
>
>-
>
>taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which
>lets the OS choose a free port).
>-
>
>taskmanager.data.port: The task manager’s port used for data exchange
>operations (DEFAULT: 0, which lets the OS choose a free port).
>-
>
>blob.server.port: Port definition for the blob server (serving user
>JARs) on the TaskManagers. By default the port is set to 0, which means
>that the operating system is picking an ephemeral port. Flink also accepts
>a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of
>both. It is recommended to set a range of ports to avoid collisions when
>multiple JobManagers are running on the same machine.
>
> Many thanks,
>
>
> Thalita
>
> --
> *From:* Vergilio, Thalita
> *Sent:* 09 November 2017 22:04:24
> *To:* Till Rohrmann
>
> *Cc:* Piotr Nowojski; user@flink.apache.org; Patrick Lucas
> *Subject:* Re: Docker-Flink Project: TaskManagers can't talk to
> JobManager if they are on different nodes
>
>
> Hi Till,
>
>
> I have made some progress with the name resolution for machines that are
> not in the same subnet. The problem I am facing now is Flink-specific, so I
> wonder if you could help me.
>
>
> It is all running fine in a multi-cloud setup with the jobmanager in Azure
> and the taskmanager in the Google cloud. However, when I scale the
> taskmanager up and it start running on Azure nodes as well, I get an Akka
> error which I presume means the taskmanagers can't talk to each other when
> parallelising the task.
>
>
> Do you know what the IP address and port below are? Are they assigned by
> Flink?
>
>
> Thank you very much.
>
>
> Thalita
>
>
> java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
> Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
> ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
> (b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
> (3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not 
> responding after a timeout of 1 ms
>   at 
> org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
>   at akka.dispatch.Recover.internal(Future.scala:268)
>   at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
>   at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
>   at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
>   at scala.util.Try$.apply(Try.scala:161)
>   at scala.util.Failure.recover(Try.scala:185)
>   at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>   at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] 

readFile, DataStream

2017-11-10 Thread Juan Miguel Cejuela
Hi there,

I’m trying to watch a directory for new incoming files (with
StreamExecutionEnvironment#readFile) with a subsecond latency (interval
watch of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY
).

If many files come in within (under) the interval watching time, flink
doesn’t seem to get notice of the files, and as a result, the files do not
get processed. The behavior also seems undeterministic, as it likely
depends on timeouts and so on. For example, 10 new files come in
immediately (that is, essentially in parallel) and perhaps 3 files get
processed, but the rest 7 don’t.

I’ve extended and created my own FileInputFormat, for which I don’t do much
more than in the open function, log when a new file comes in. That’s how I
know that many fails get lost.

On the other hand, when I restart flink, *all* the files in the directory
are immediately processed. This is the expected behavior and works fine.

The situation of unprocessed files is a bummer.

Am I doing something wrong? Do I need to set something in the
configuration? Is it a bug in Flink?

Hopefully I described my problem clearly.

Thank you.
​


Re: Testing / Configuring event windows with Table API and SQL

2017-11-10 Thread Fabian Hueske
Hi Colin,

Flink's SQL runner does not support handling of late data yet. At the
moment, late events are simply dropped.
We plan to add support for late data in a future release.

The "withIdleStateRetentionTime" parameter only applies to non-windowed
aggregation functions and controls when they can evict state for inactive
keys.

Best, Fabian


2017-11-10 4:17 GMT+01:00 Colin Williams :

> Hello,
>
> I've been given some flink application code and asked to implement and
> ensure that our query is updated for late arriving entries. We're currently
> creating a table using a Tumbling SQL query similar to the first example in
>
>  https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/table/sql.html#group-windows
>
> We then turn the result table back into a datastream using toAppendStream,
> and eventually add a derivative stream to a sink. We've configured
> TimeCharacteristic to event-time processing.
>
> From reading the documentation I was trying to configure using
> withIdleStateRetentionTime, with the expectation that this setting would
> allow me to deal with late arrivals past a given watermark time, but within
> the retention time.
>
> Then to test this I created a simple source which triggers the watermark,
> so that I'd have next a late arrival. However so far the watermark seems to
> cause something to discriminate the late arrival. Then in my test sink
> where I'm trying to capture all emitted outputs, and hopefully the updated
> value I don't find one.
>
> So it seems that my understanding of how to deal with late events, or my
> test platform is wrong. Can anyone recognize what I'm doing wrong?
>
>
> Best,
>
> Colin Williams
>
>
>
>


Re: Docker-Flink Project: TaskManagers can't talk to JobManager if they are on different nodes

2017-11-10 Thread Vergilio, Thalita
Hi All,


I just wanted to let you know that I have finally managed to get the 
multi-cloud setup working!! I honestly can't believe my eyes. I used a Docker 
plugin called Weave to create the Swarm network, a public external IP address 
for each node and opened a range of ports, and I can now get my Google Cloud 
machine to connect to the Azure machines.


There are still some minor issues, i.e. I don't know which exact ports to open 
for TaskManager communication in Flink. They seem to be getting assigned 
randomly at runtime, so I had to open a wide range of ports to allow the 
communication to happen, which is far from ideal.


Is there a way of finding out what these ports are and setting them to a 
constant value? Looking at the documentation, the suspects are:


  *   taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets 
the OS choose a free port).

  *   taskmanager.data.port: The task manager’s port used for data exchange 
operations (DEFAULT: 0, which lets the OS choose a free port).

  *   blob.server.port: Port definition for the blob server (serving user JARs) 
on the TaskManagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.

Many thanks,


Thalita



From: Vergilio, Thalita
Sent: 09 November 2017 22:04:24
To: Till Rohrmann
Cc: Piotr Nowojski; user@flink.apache.org; Patrick Lucas
Subject: Re: Docker-Flink Project: TaskManagers can't talk to JobManager if 
they are on different nodes


Hi Till,


I have made some progress with the name resolution for machines that are not in 
the same subnet. The problem I am facing now is Flink-specific, so I wonder if 
you could help me.


It is all running fine in a multi-cloud setup with the jobmanager in Azure and 
the taskmanager in the Google cloud. However, when I scale the taskmanager up 
and it start running on Azure nodes as well, I get an Akka error which I 
presume means the taskmanagers can't talk to each other when parallelising the 
task.


Do you know what the IP address and port below are? Are they assigned by Flink?


Thank you very much.


Thalita


java.lang.Exception: Cannot deploy task Source: Read(UnboundedKafkaSource) -> 
Flat Map -> KafkaPuePipelineProcessor/Window.Into()/Window.Assign.out -> 
ParMultiDo(Anonymous) -> ToKeyedWorkItem (2/3) 
(b9f31626fb7d83d39e24e570e034f03e) - TaskManager 
(3a9c37463c88510a44097df0c99b5f90 @ 172.18.0.3 (dataPort=38963)) not responding 
after a timeout of 1 ms
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:437)
at 
org.apache.flink.runtime.executiongraph.Execution$2.apply(Execution.java:429)
at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
at akka.dispatch.Recover.internal(Future.scala:268)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://flink@172.18.0.3:37959/user/taskmanager#364916492]] after 
[1 ms]
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)
at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
at 

RE: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Gwenhael Pasquiers
Maybe you don't need to bother with that question.

I'm currently discovering AbstractStreamOperator, OneInputStreamOperator and 
Triggerable.

That should do it :-)

From: Gwenhael Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]
Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
Subject: Streaming : a way to "key by partition id" without redispatching data

Hello,

(Flink 1.2.1)

For performances reasons I'm trying to reduce the volume of data of my stream 
as soon as possible by windowing/folding it for 15 minutes before continuing to 
the rest of the chain that contains keyBys and windows that will transfer data 
everywhere.

Because of the huge volume of data, I want to avoid "moving" the data between 
partitions as much as possible (not like a naïve KeyBy does). I wanted to 
create a custom ProcessFunction (using timer and state to fold data for X 
minutes) in order to fold my data over itself before keying the stream but even 
ProcessFunction needs a keyed stream...

Is there a specific "key" value that would ensure me that my data won't be 
moved to another taskmanager (that it's hashcode will match the partition it is 
already in) ? I thought about the subtask id but I doubt I'd be that lucky :-)

Suggestions

· Wouldn't it be useful to be able to do a "partitionnedKeyBy" that 
would not move data between nodes, for windowing operations that can be 
parallelized.

o   Something like kafka => partitionnedKeyBy(0) => first folding => keyBy(0) 
=> second folding => 

· Finally, aren't all streams keyed ? Even if they're keyed by a 
totally arbitrary partition id until the user chooses its own key, shouldn't we 
be able to do a window (not windowAll) or process over any normal Stream's 
partition ?

B.R.

Gwenhaël PASQUIERS


Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
jobmanager1.log and taskmanager2.log are the same. Can you also submit files 
containing std output?

Piotrek

> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> 
> On 2017-11-10 11:04, Piotr Nowojski wrote:
>> Hi,
>> Thanks for the logs, however I do not see before mentioned exceptions
>> in it. It ends with java.lang.InterruptedException
>> Is it the correct log file? Also, could you attach the std output file
>> of the failing TaskManager?
>> Piotrek
>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>>  wrote:
>>> On 2017-11-09 20:08, Piotr Nowojski wrote:
 Hi,
 Could you attach full logs from those task managers? At first glance I
 don’t see a connection between those exceptions and any memory issue
 that you might had. It looks like a dependency issue in one (some?
 All?) of your jobs.
 Did you build your jars with -Pbuild-jar profile as described here:
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
 ?
 If that doesn’t help. Can you binary search which job is causing the
 problem? There might be some Flink incompatibility between different
 versions and rebuilding a job’s jar with a version matching to the
 cluster version might help.
 Piotrek
> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>  wrote:
> On 2017-11-08 18:30, Piotr Nowojski wrote:
> Btw, Ebru:
> I don’t agree that the main suspect is NetworkBufferPool. On your
> screenshots it’s memory consumption was reasonable and stable:
> 596MB
> -> 602MB -> 597MB.
> PoolThreadCache memory usage ~120MB is also reasonable.
> Do you experience any problems, like Out Of Memory
> errors/crashes/long
> GC pauses? Or just JVM process is using more memory over time? You
> are
> aware that JVM doesn’t like to release memory back to OS once it
> was
> used? So increasing memory usage until hitting some limit (for
> example
> JVM max heap size) is expected behaviour.
> Piotrek
> On 8 Nov 2017, at 15:48, Piotr Nowojski 
> wrote:
> I don’t know if this is relevant to this issue, but I was
> constantly getting failures trying to reproduce this leak using your
> Job, because you were using non deterministic getKey function:
> @Override
> public Integer getKey(Integer event) {
> Random randomGen = new Random((new Date()).getTime());
> return randomGen.nextInt() % 8;
> }
> And quoting Java doc of KeySelector:
> "If invoked multiple times on the same object, the returned key must
> be the same.”
> I’m trying to reproduce this issue with following job:
> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
> Where IntegerSource is just an infinite source, DisardingSink is
> well just discarding incoming data. I’m cancelling the job every 5
> seconds and so far (after ~15 minutes) my memory consumption is
> stable, well below maximum java heap size.
> Piotrek
> On 8 Nov 2017, at 15:28, Javier Lopez 
> wrote:
> Yes, I tested with just printing the stream. But it could take a
> lot of time to fail.
> On Wednesday, 8 November 2017, Piotr Nowojski
>  wrote:
> Thanks for quick answer.
> So it will also fail after some time with `fromElements` source
> instead of Kafka, right?
> Did you try it also without a Kafka producer?
> Piotrek
> On 8 Nov 2017, at 14:57, Javier Lopez 
> wrote:
> Hi,
> You don't need data. With data it will die faster. I tested as
> well with a small data set, using the fromElements source, but it
> will take some time to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski
>  wrote:
> Hi,
> Thanks for sharing this job.
> Do I need to feed some data to the Kafka to reproduce this
 issue with your script?
>> Does this OOM issue also happen when you are not using the
 Kafka source/sink?
>> Piotrek
>> On 8 Nov 2017, at 14:08, Javier Lopez 
 wrote:
>> Hi,
>> This is the test flink job we created to trigger this leak
 https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>> And this is the python script we are using to execute the job
 thousands of times to get the OOM problem
 https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>> The cluster we used for this has this configuration:
>> Instance type: t2.large
>> Number of workers: 2
>> HeapMemory: 5500
>> Number of task slots per node: 4
>> TaskMangMemFraction: 0.5
>> NumberOfNetworkBuffers: 2000
>> We have tried several things, 

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
Hi,

Thanks for the logs, however I do not see before mentioned exceptions in it. It 
ends with java.lang.InterruptedException

Is it the correct log file? Also, could you attach the std output file of the 
failing TaskManager?

Piotrek

> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>  wrote:
> 
> On 2017-11-09 20:08, Piotr Nowojski wrote:
>> Hi,
>> Could you attach full logs from those task managers? At first glance I
>> don’t see a connection between those exceptions and any memory issue
>> that you might had. It looks like a dependency issue in one (some?
>> All?) of your jobs.
>> Did you build your jars with -Pbuild-jar profile as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>> ?
>> If that doesn’t help. Can you binary search which job is causing the
>> problem? There might be some Flink incompatibility between different
>> versions and rebuilding a job’s jar with a version matching to the
>> cluster version might help.
>> Piotrek
>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>  wrote:
>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>> Btw, Ebru:
>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>> screenshots it’s memory consumption was reasonable and stable:
>>> 596MB
>>> -> 602MB -> 597MB.
>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>> Do you experience any problems, like Out Of Memory
>>> errors/crashes/long
>>> GC pauses? Or just JVM process is using more memory over time? You
>>> are
>>> aware that JVM doesn’t like to release memory back to OS once it
>>> was
>>> used? So increasing memory usage until hitting some limit (for
>>> example
>>> JVM max heap size) is expected behaviour.
>>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski 
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
>>> On 8 Nov 2017, at 15:28, Javier Lopez 
>>> wrote:
>>> Yes, I tested with just printing the stream. But it could take a
>>> lot of time to fail.
>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>>  wrote:
>>> Thanks for quick answer.
>>> So it will also fail after some time with `fromElements` source
>>> instead of Kafka, right?
>>> Did you try it also without a Kafka producer?
>>> Piotrek
>>> On 8 Nov 2017, at 14:57, Javier Lopez 
>>> wrote:
>>> Hi,
>>> You don't need data. With data it will die faster. I tested as
>>> well with a small data set, using the fromElements source, but it
>>> will take some time to die. It's better with some data.
>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>>  wrote:
>>> Hi,
>>> Thanks for sharing this job.
>>> Do I need to feed some data to the Kafka to reproduce this
>> issue with your script?
 Does this OOM issue also happen when you are not using the
>> Kafka source/sink?
 Piotrek
 On 8 Nov 2017, at 14:08, Javier Lopez 
>> wrote:
 Hi,
 This is the test flink job we created to trigger this leak
>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
 And this is the python script we are using to execute the job
>> thousands of times to get the OOM problem
>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
 The cluster we used for this has this configuration:
 Instance type: t2.large
 Number of workers: 2
 HeapMemory: 5500
 Number of task slots per node: 4
 TaskMangMemFraction: 0.5
 NumberOfNetworkBuffers: 2000
 We have tried several things, increasing the heap, reducing the
>> heap, more memory fraction, changes this value in the
>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>> work.
 Thanks for your help.
 On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>  wrote:
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>> Hi Ebru and Javier,
>>> Yes, if you could share this example job it would be helpful.
>>> Ebru: could you