Re: Multiple consumers and custom triggers

2016-12-15 Thread Jamie Grier
To be more clear...

A single source in a Flink program is a logical concept.  Flink jobs are
run with some level of parallelism meaning that multiple copies of your
source (and all other) functions are run distributed across a cluster.  So
if you have a streaming program with two sources and you run with a
parallelism of 8 there are actually a total of 16 source functions
executing on the cluster -- 8 instances of each of the two source operators
you've defined in your Flink job.

For more info on this you may want to read through the following:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/concepts/concepts.html

On Thu, Dec 15, 2016 at 3:21 PM, Jamie Grier 
wrote:

> All streams can be parallelized in Flink even with only one source.  You
> can have multiple sinks as well.
>
> On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
> vr1meghash...@gmail.com> wrote:
>
>> 1. If we have multiple sources, can the streams be parallelized ?
>> 2. Can we have multiple sinks as well?
>>
>> On Dec 14, 2016 10:46 PM,  wrote:
>>
>>> Got it. Thanks!
>>>
>>> On Dec 15, 2016, at 02:58, Jamie Grier  wrote:
>>>
>>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>>> like. They can be combined in multiple ways, via things like joins, or
>>> connect(), etc. They can also be completely independent — in other words
>>> the data flow graph can be completely disjoint. You never to need to call
>>> execute() more than once. Just define you program, with as many sources as
>>> you want, and then call execute().
>>>
>>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>>
>>> stream1
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> stream2
>>>   .map(...)
>>>   .addSink(...)
>>>
>>> env.execute() // this is all you need
>>>
>>> ​
>>>
>>> On Wed, Dec 14, 2016 at 4:02 PM, Matt  wrote:
>>>
 Hey Jamie,

 Ok with #1. I guess #2 is just not possible.

 I got it about #3. I just checked the code for the tumbling window
 assigner and I noticed it's just its default trigger that gets overwritten
 when using a custom trigger, not the way it assigns windows, it makes sense
 now.

 Regarding #4, after doing some more tests I think it's more complex
 than I first thought. I'll probably create another thread explaining more
 that specific question.

 Thanks,
 Matt

 On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier 
 wrote:

> For #1 there are a couple of ways to do this.  The easiest is probably
> stream1.connect(stream2).map(...) where the MapFunction maps the two
> input types to a common type that you can then process uniformly.
>
> For #3 There must always be a WindowAssigner specified.  There are
> some convenient ways to do this in the API such at timeWindow(), or
> window(TumblingProcessingTimeWindows.of(...)), etc, however you
> always must do this whether your provide your own trigger implementation 
> or
> not.  The way to use window(...) with and customer trigger is just:
>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
> similar.  Not sure if I answered your question though..
>
> For #4: If I understand you correctly that is exactly what
> CountWindow(10, 1) does already.  For example if your input data was a
> sequence of integers starting with 0 the output would be:
>
> (0)
> (0, 1)
> (0, 1, 2)
> (0, 1, 2, 3)
> (0, 1, 2, 3, 4)
> (0, 1, 2, 3, 4, 5)
> (0, 1, 2, 3, 4, 5, 6)
> (0, 1, 2, 3, 4, 5, 6, 7)
> (0, 1, 2, 3, 4, 5, 6, 7, 8)
> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
> ...
> etc
>
> -Jamie
>
>
> On Wed, Dec 14, 2016 at 9:17 AM, Matt  wrote:
>
>> Hello people,
>>
>> I've written down some quick questions for which I couldn't find much
>> or anything in the documentation. I hope you can answer some of them!
>>
>> *# Multiple consumers*
>>
>> *1.* Is it possible to .union() streams of different classes? It is
>> useful to create a consumer that counts elements on different topics for
>> example, using a key such as the class name of the element, and a 
>> tumbling
>> window of 5 mins let's say.
>>
>> *2.* In case #1 is not possible, I need to launch multiple consumers
>> to achieve the same effect. However, I'm getting a "Factory already
>> initialized" error if I run environment.execute() for two consumers on
>> different threads. How do you .execute() more than one consumer on the 
>> same
>> application?
>>
>> *# Custom triggers*
>>
>> *3.* If a custom .trigger() overwrites the trigger of the
>> WindowAssigner used previously, why do we have to specify a 
>> WindowAssigner
>> (such as TumblingProcessing

Re: Multiple consumers and custom triggers

2016-12-15 Thread Jamie Grier
All streams can be parallelized in Flink even with only one source.  You
can have multiple sinks as well.

On Thu, Dec 15, 2016 at 7:56 AM, Meghashyam Sandeep V <
vr1meghash...@gmail.com> wrote:

> 1. If we have multiple sources, can the streams be parallelized ?
> 2. Can we have multiple sinks as well?
>
> On Dec 14, 2016 10:46 PM,  wrote:
>
>> Got it. Thanks!
>>
>> On Dec 15, 2016, at 02:58, Jamie Grier  wrote:
>>
>> Ahh, sorry, for #2: A single Flink job can have as many sources as you
>> like. They can be combined in multiple ways, via things like joins, or
>> connect(), etc. They can also be completely independent — in other words
>> the data flow graph can be completely disjoint. You never to need to call
>> execute() more than once. Just define you program, with as many sources as
>> you want, and then call execute().
>>
>> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>>
>> stream1
>>   .map(...)
>>   .addSink(...)
>>
>> stream2
>>   .map(...)
>>   .addSink(...)
>>
>> env.execute() // this is all you need
>>
>> ​
>>
>> On Wed, Dec 14, 2016 at 4:02 PM, Matt  wrote:
>>
>>> Hey Jamie,
>>>
>>> Ok with #1. I guess #2 is just not possible.
>>>
>>> I got it about #3. I just checked the code for the tumbling window
>>> assigner and I noticed it's just its default trigger that gets overwritten
>>> when using a custom trigger, not the way it assigns windows, it makes sense
>>> now.
>>>
>>> Regarding #4, after doing some more tests I think it's more complex than
>>> I first thought. I'll probably create another thread explaining more that
>>> specific question.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier 
>>> wrote:
>>>
 For #1 there are a couple of ways to do this.  The easiest is probably
 stream1.connect(stream2).map(...) where the MapFunction maps the two
 input types to a common type that you can then process uniformly.

 For #3 There must always be a WindowAssigner specified.  There are some
 convenient ways to do this in the API such at timeWindow(), or
 window(TumblingProcessingTimeWindows.of(...)), etc, however you always
 must do this whether your provide your own trigger implementation or not.
 The way to use window(...) with and customer trigger is just:
  stream.keyBy(...).window(...).trigger(...).apply(...) or something
 similar.  Not sure if I answered your question though..

 For #4: If I understand you correctly that is exactly what
 CountWindow(10, 1) does already.  For example if your input data was a
 sequence of integers starting with 0 the output would be:

 (0)
 (0, 1)
 (0, 1, 2)
 (0, 1, 2, 3)
 (0, 1, 2, 3, 4)
 (0, 1, 2, 3, 4, 5)
 (0, 1, 2, 3, 4, 5, 6)
 (0, 1, 2, 3, 4, 5, 6, 7)
 (0, 1, 2, 3, 4, 5, 6, 7, 8)
 (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
 (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
 (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
 (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
 ...
 etc

 -Jamie


 On Wed, Dec 14, 2016 at 9:17 AM, Matt  wrote:

> Hello people,
>
> I've written down some quick questions for which I couldn't find much
> or anything in the documentation. I hope you can answer some of them!
>
> *# Multiple consumers*
>
> *1.* Is it possible to .union() streams of different classes? It is
> useful to create a consumer that counts elements on different topics for
> example, using a key such as the class name of the element, and a tumbling
> window of 5 mins let's say.
>
> *2.* In case #1 is not possible, I need to launch multiple consumers
> to achieve the same effect. However, I'm getting a "Factory already
> initialized" error if I run environment.execute() for two consumers on
> different threads. How do you .execute() more than one consumer on the 
> same
> application?
>
> *# Custom triggers*
>
> *3.* If a custom .trigger() overwrites the trigger of the
> WindowAssigner used previously, why do we have to specify a WindowAssigner
> (such as TumblingProcessingTimeWindows) in order to be able to specify a
> custom trigger? Shouldn't it be possible to send a trigger to .window()?
>
> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
> that may take more than 10 hours fill for the first time, but in the
> meanwhile I want to process whatever elements already generated. I guess
> the way to do this is to create a custom trigger that fires on every new
> element, with up to 10 elements at a time. The result would be windows of
> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10,  Is there a way to
> achieve this with predefined triggers or a custom trigger is the only way
> to go here?
>
> Best regards,
> Matt
>



 --

 Jamie Grier
 data Artisans, Director of Applications Engineerin

Flink 1.1.3 web UI is loading very slowly

2016-12-15 Thread Yury Ruchin
Hi,

I'm seeing an issue with the load speed of Flink Web UI when running in
YARN session. Initial load takes several minutes or even more, although
according to the browser console there are only a couple of MBs to
download. When the loading is complete, the UI itself is quite responsive.

I don't see such an issue with YARN UI loading from the same cluster over
the same network. I suspected that YARN proxy may cause the slowdown and
hit the direct Flink web UI URL - the same story. CPU, load average and
memory consumption on the node hosting job manager are low. I gave
jobmanager from 4Gb to 10Gb heap - no effect.

Any ideas on how to approach the problem are appreciated.

Thanks,
Yury


Re: more complex patterns for CEP (was: CEP two transitions to the same state)

2016-12-15 Thread Dima Arbuzin
Hey there,

I was investigating CEP functionality and realized that I'm missing some,
which are discussed here, especially: accessing fields from previous
events.

Any progress regarding this question?

I'm working with streaming car location data trying to analyze different
traffic patterns.
Consider the following use-case:
1) detect a traffic jam (cluster) at one spot (event) and then
2) detect another traffic jam (another cluster) later (another event). The
question would be: did they move in space?

So is there is a way to compare these two clusters using CEP? I guess
customizing equals() function would help to some extend, but then what if I
want to compare some extra fields like location?

How hard is it to overload Filter Function storing previously triggered
Event information?

P.S.  Performance is not that critical in this case.

On Tue, Oct 11, 2016 at 5:39 PM,  wrote:

> Thanks, Till. I will wait for your response.
> - LF
>
>
>
>
> --
> *From:* Till Rohrmann 
> *To:* user@flink.apache.org; lg...@yahoo.com
> *Sent:* Tuesday, October 11, 2016 2:49 AM
>
> *Subject:* Re: more complex patterns for CEP (was: CEP two transitions to
> the same state)
>
> The timeline is hard to predict to be honest. It depends a little bit on
> how fast the community can proceed with these things. At the moment I'm
> personally involved in other issues and, thus, cannot work on the CEP
> library. I hope to get back to it soon.
>
> Cheers,
> Till
>
> On Sat, Oct 8, 2016 at 12:42 AM,  wrote:
>
> hi Till,
>
> Thanks for the detailed response.
>
> I'm looking forward to seeing these features implemented in Flink. Can
> anyone provide timelines for the 3 tickets that you mentioned in your
> response?
>
>
> - LF
>
>
>
>
> --
> *From:* Till Rohrmann 
> *To:* user@flink.apache.org
> *Sent:* Tuesday, September 20, 2016 7:13 AM
> *Subject:* Re: more complex patterns for CEP (was: CEP two transitions to
> the same state)
>
> Hi Frank,
>
> thanks for sharing your analysis. It indeed pinpoints some of the current
> CEP library's shortcomings.
>
> Let me address your points:
>
> 1. Lack of not operator
>
> The functionality to express events which must not occur in a pattern is
> missing. We've currently a JIRA [1] which addresses exactly this. For the
> notFollowedBy operator, we should discard all patterns where we've seen a
> matching event for the not state. I think it could be implemented like a
> special terminal state where we prune the partial pattern.
>
> For the notNext operator, we could think about keeping the event which has
> not matched the notNext state and return it as part of the fully matched
> pattern. Alternatively, we could simply forget about it once we've assured
> that it does not match.
>
> 2. Allow functions to access fields of previous events
>
> This hasn't been implemented yet because it is a quite expensive
> operation. Before calling the filter function you always have to
> reconstruct the current partial pattern and then give it to the filter
> function. But I agree that the user should be allowed to use such a
> functionality (and then pay the price for it in terms of efficiency).
> Giving access to the partially matched fields via a Map would be a way to
> solve the problem on the API level.
>
> I think that almost all functionality for this feature is already in
> place. We simply would have to check the filter condition whether they
> require access to previous events and then compute the partial pattern.
>
> 3. Support for recursive patterns
>
> The underlying SharedBuffer implementation should allow recursive event
> patterns. Once we have support for branching CEP patterns [2] which allow
> to connect different states this should also be possible with some minor
> changes.
>
> However, a more interesting way to specify recursive CEP patterns is to
> use regular expression syntax (Kleene star, bounded occurrences) to express
> recursive parts of a pattern. I think this makes specifying such a pattern
> easier and more intuitive for the user. We've also a JIRA issue to track
> the process there [3] and Ivan is already working on this.
>
> If you want to get involved in Flink's CEP development, then feel free to
> take over any free JIRA issue or create one yourself :-)
>
> [1] https://issues.apache.org/ jira/browse/FLINK-3320
> 
> [2] https://issues.apache.org/ jira/browse/FLINK-4641
> 
> [3] https://issues.apache.org/ jira/browse/FLINK-3318
> 
>
> Cheers,
> Till
>
> On Fri, Sep 16, 2016 at 10:04 PM, Frank Dekervel  wrote:
>
> Hello,
>
> i did some more analysis wrt the problem i'm facing and the flink CEP api.
>
> In order to complete the problem i'm facing using flink CEP i would need 3
> additions to the API (i think). I tried to understand the NFA logic, and i
> think 2 of them should be do

benchmarking flink streaming

2016-12-15 Thread Meghashyam Sandeep V
Hi There,

We are evaluating Flink streaming for real time data analysis. I have my
flink job running in EMR with Yarn. What are the possible benchmarking
tools that work best with Flink? I couldn't find this information in the
Apache website.

Thanks,
Sandeep


Re: Multiple consumers and custom triggers

2016-12-15 Thread Meghashyam Sandeep V
1. If we have multiple sources, can the streams be parallelized ?
2. Can we have multiple sinks as well?

On Dec 14, 2016 10:46 PM,  wrote:

> Got it. Thanks!
>
> On Dec 15, 2016, at 02:58, Jamie Grier  wrote:
>
> Ahh, sorry, for #2: A single Flink job can have as many sources as you
> like. They can be combined in multiple ways, via things like joins, or
> connect(), etc. They can also be completely independent — in other words
> the data flow graph can be completely disjoint. You never to need to call
> execute() more than once. Just define you program, with as many sources as
> you want, and then call execute().
>
> val stream1 = env.addSource(...)val stream2 = env.addSource(...)
>
> stream1
>   .map(...)
>   .addSink(...)
>
> stream2
>   .map(...)
>   .addSink(...)
>
> env.execute() // this is all you need
>
> ​
>
> On Wed, Dec 14, 2016 at 4:02 PM, Matt  wrote:
>
>> Hey Jamie,
>>
>> Ok with #1. I guess #2 is just not possible.
>>
>> I got it about #3. I just checked the code for the tumbling window
>> assigner and I noticed it's just its default trigger that gets overwritten
>> when using a custom trigger, not the way it assigns windows, it makes sense
>> now.
>>
>> Regarding #4, after doing some more tests I think it's more complex than
>> I first thought. I'll probably create another thread explaining more that
>> specific question.
>>
>> Thanks,
>> Matt
>>
>> On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier 
>> wrote:
>>
>>> For #1 there are a couple of ways to do this.  The easiest is probably
>>> stream1.connect(stream2).map(...) where the MapFunction maps the two
>>> input types to a common type that you can then process uniformly.
>>>
>>> For #3 There must always be a WindowAssigner specified.  There are some
>>> convenient ways to do this in the API such at timeWindow(), or
>>> window(TumblingProcessingTimeWindows.of(...)), etc, however you always
>>> must do this whether your provide your own trigger implementation or not.
>>> The way to use window(...) with and customer trigger is just:
>>>  stream.keyBy(...).window(...).trigger(...).apply(...) or something
>>> similar.  Not sure if I answered your question though..
>>>
>>> For #4: If I understand you correctly that is exactly what
>>> CountWindow(10, 1) does already.  For example if your input data was a
>>> sequence of integers starting with 0 the output would be:
>>>
>>> (0)
>>> (0, 1)
>>> (0, 1, 2)
>>> (0, 1, 2, 3)
>>> (0, 1, 2, 3, 4)
>>> (0, 1, 2, 3, 4, 5)
>>> (0, 1, 2, 3, 4, 5, 6)
>>> (0, 1, 2, 3, 4, 5, 6, 7)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8)
>>> (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
>>> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
>>> (2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
>>> (3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
>>> (4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
>>> ...
>>> etc
>>>
>>> -Jamie
>>>
>>>
>>> On Wed, Dec 14, 2016 at 9:17 AM, Matt  wrote:
>>>
 Hello people,

 I've written down some quick questions for which I couldn't find much
 or anything in the documentation. I hope you can answer some of them!

 *# Multiple consumers*

 *1.* Is it possible to .union() streams of different classes? It is
 useful to create a consumer that counts elements on different topics for
 example, using a key such as the class name of the element, and a tumbling
 window of 5 mins let's say.

 *2.* In case #1 is not possible, I need to launch multiple consumers
 to achieve the same effect. However, I'm getting a "Factory already
 initialized" error if I run environment.execute() for two consumers on
 different threads. How do you .execute() more than one consumer on the same
 application?

 *# Custom triggers*

 *3.* If a custom .trigger() overwrites the trigger of the
 WindowAssigner used previously, why do we have to specify a WindowAssigner
 (such as TumblingProcessingTimeWindows) in order to be able to specify a
 custom trigger? Shouldn't it be possible to send a trigger to .window()?

 *4.* I need a stream with a CountWindow (size 10, slide 1 let's say)
 that may take more than 10 hours fill for the first time, but in the
 meanwhile I want to process whatever elements already generated. I guess
 the way to do this is to create a custom trigger that fires on every new
 element, with up to 10 elements at a time. The result would be windows of
 sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10,  Is there a way to
 achieve this with predefined triggers or a custom trigger is the only way
 to go here?

 Best regards,
 Matt

>>>
>>>
>>>
>>> --
>>>
>>> Jamie Grier
>>> data Artisans, Director of Applications Engineering
>>> @jamiegrier 
>>> ja...@data-artisans.com
>>>
>>>
>>
>
>
> --
>
> Jamie Grier
> data Artisans, Director of Applications Engineering
> @jamiegrier 
> ja...@data-artisans.com
>
>


Re: Jar hell when running streaming job in YARN session

2016-12-15 Thread Yury Ruchin
Hi Kidong, Stephan,

First of all, you've saved me days of investigation - thanks a lot! The
problem is solved now. More details follow.

I use the official Flink 1.1.3 + Hadoop 2.7 distribution. My problem was
indeed caused by clash of classes under "com.google" in my fat jar and in
the YARN library directories. The shaded Guava classes in Flink
distribution didn't hurt. Initially I took the wrong way - I tried to
change class loading order. Instead, I should have used the same shading
approach that Flink uses and that Kidong described above - simply relocate
problematic classes to other package in fat jar.

Thanks again,
Yury

2016-12-15 14:21 GMT+03:00 Stephan Ewen :

> Hi Yuri!
>
> Flink should hide Hadoop's Guava, to avoid this issue.
>
> Did you build Flink yourself from source? Maybe you are affected by this
> issue: https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/setup/building.html#dependency-shading
>
> Stephan
>
>
> On Thu, Dec 15, 2016 at 11:18 AM, Kidong Lee  wrote:
>
>> To avoid guava conflict, I use maven shade plugin to package my fat jar.
>> If you use maven, the shade plugin looks like this:
>> ...
>>
>> 
>>org.apache.maven.plugins
>>maven-shade-plugin
>>2.4.2
>>
>>   false
>>   true
>>   flink-job
>>   
>>  
>> com.google
>> yourpackage.shaded.google
>>  
>>   
>>   
>>  > implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>>  > implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
>> META-INF/spring.handlers
>>  
>>  > implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
>> META-INF/spring.schemas
>>  
>>   
>>   
>>  
>> *:*
>> 
>>org/datanucleus/**
>>META-INF/*.SF
>>META-INF/*.DSA
>>META-INF/*.RSA
>> 
>>  
>>   
>>
>> 
>> ...
>>
>>
>> To package fat jar:
>>
>> mvn -e -DskipTests=true clean install shade:shade;
>>
>>
>> I hope, it helps.
>>
>> - Kidong Lee.
>>
>>
>>
>>
>>
>> 2016-12-15 19:04 GMT+09:00 Yury Ruchin :
>>
>>> Hi,
>>>
>>> I have run into a classpath issue when running Flink streaming job in
>>> YARN session. I package my app into a fat jar with all the dependencies
>>> needed. One of them is Google Guava. I then submit the jar to the session.
>>> The task managers pre-created by the session build their classpath from the
>>> FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
>>> dated Guava version pulled along with Hadoop dependencies which conflicts
>>> with the version my app needs. Even worse, the Flink lib dir and Hadoop
>>> libraries take precedence over my jar.
>>>
>>> If I remember correctly, in Spark there is an option meaning "user
>>> classpath goes first when looking for classes". I couldn't find anything
>>> similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
>>> the hope to extend the classpath but the old Guava version takes precedence
>>> anyway.
>>>
>>> How else can I bump "priority" of my app jar during the load process?
>>>
>>> Thanks,
>>> Yury
>>>
>>
>>
>


Re: Question about Memory Manage in the Streaming mode

2016-12-15 Thread Stephan Ewen
Hi!

I do slightly disagree with Timo. Custom memory management is always
useful, also in the Streaming API. It makes execution more robust.

If you use RocksDB as a state backend, you get memory management from
RocksDB - effectively all your program key/value state is off-heap.

Flink's own state backends have not yet implemented custom memory
management (it is quite a bit more complex in a true streaming environment
than in batch), but it will come as a feature (though not officially
tracked as a jira).

Stephan



On Thu, Dec 15, 2016 at 10:43 AM, Tao Meng  wrote:

> Thanks a lot.
>
> On 12月 15 2016, at 5:39 下午, Timo Walther  wrote:
>
>> Hi Tao,
>>
>> no, streaming jobs do not use managed memory yet. Managed memory is
>> useful for sorting, joining and grouping bounded data. Unbounded stream do
>> not need that.
>>
>> It could be used in the future e.g. to store state or for new operators,
>> but is this is not on the roadmap so far.
>>
>> Regards,
>> Timo
>>
>>
>> Am 15/12/16 um 10:30 schrieb Tao Meng:
>>
>> Hi all,
>>
>>   I have some questions about memory management in the Streaming mode.
>>
>>   Do the Streaming jobs use the memory management module ?
>> If they don't,  for what considerations do not ?  Because Data exchange
>> is too frequent ?
>> Is there a plan to let streaming job use it ?
>>
>> Thanks a lot.
>>
>>
>>


Re: Jar hell when running streaming job in YARN session

2016-12-15 Thread Stephan Ewen
Hi Yuri!

Flink should hide Hadoop's Guava, to avoid this issue.

Did you build Flink yourself from source? Maybe you are affected by this
issue:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/building.html#dependency-shading

Stephan


On Thu, Dec 15, 2016 at 11:18 AM, Kidong Lee  wrote:

> To avoid guava conflict, I use maven shade plugin to package my fat jar.
> If you use maven, the shade plugin looks like this:
> ...
>
> 
>org.apache.maven.plugins
>maven-shade-plugin
>2.4.2
>
>   false
>   true
>   flink-job
>   
>  
> com.google
> yourpackage.shaded.google
>  
>   
>   
>   implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>   implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> META-INF/spring.handlers
>  
>   implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> META-INF/spring.schemas
>  
>   
>   
>  
> *:*
> 
>org/datanucleus/**
>META-INF/*.SF
>META-INF/*.DSA
>META-INF/*.RSA
> 
>  
>   
>
> 
> ...
>
>
> To package fat jar:
>
> mvn -e -DskipTests=true clean install shade:shade;
>
>
> I hope, it helps.
>
> - Kidong Lee.
>
>
>
>
>
> 2016-12-15 19:04 GMT+09:00 Yury Ruchin :
>
>> Hi,
>>
>> I have run into a classpath issue when running Flink streaming job in
>> YARN session. I package my app into a fat jar with all the dependencies
>> needed. One of them is Google Guava. I then submit the jar to the session.
>> The task managers pre-created by the session build their classpath from the
>> FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
>> dated Guava version pulled along with Hadoop dependencies which conflicts
>> with the version my app needs. Even worse, the Flink lib dir and Hadoop
>> libraries take precedence over my jar.
>>
>> If I remember correctly, in Spark there is an option meaning "user
>> classpath goes first when looking for classes". I couldn't find anything
>> similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
>> the hope to extend the classpath but the old Guava version takes precedence
>> anyway.
>>
>> How else can I bump "priority" of my app jar during the load process?
>>
>> Thanks,
>> Yury
>>
>
>


Re: Flink 1.1.3 RollingSink - understanding output blocks/parallelism

2016-12-15 Thread Aljoscha Krettek
Hi Dominik,
I think having a single output file is only possible if you set the
parallelism of the sink to 1. AFAIK it is not possible to concurrently
write to a single HDFS file from multiple clients.

Cheers,
Aljoscha

On Wed, 14 Dec 2016 at 20:57 Dominik Safaric 
wrote:

> Hi everyone,
>
> although this question might sound trivial, I’ve been curious about the
> following. Given a Flink topology with parallelism level set to 6 for
> example and outputting the data stream to HDFS using an instance
> RollingSink, how is the output file structured? By structured, I refer to
> the fact that this will result in 6 distinct block files, whereas I would
> like to have a single file containing all of the output values from the
> DataStream.
>
> Regards,
> Dominik


Re: In 1.2-SNAPSHOT, EventTimeSessionWindows are not firing untill the whole stream is processed

2016-12-15 Thread Aljoscha Krettek
Hi,
right now, the only way of shutting down a running pipeline is to cancel
it. You can do that in the JobManager dashboard or using the bin/flink
command. And the watermark extraction period does not depend on the watch
interval. It can be configured using
env.getConfig().setAutoWatermarkInterval(long).

Cheers,
Aljoscha

On Thu, 15 Dec 2016 at 00:00 Yassine MARZOUGUI 
wrote:

> Hi Aljoscha,
>
> Thanks a lot for the explanation. Using readFile with PROCESS_CONTINUOUSLY
> solves it. Two more questions though:
>
> 1. Is it possible to gracefully stop the job once it has read the input
> once?
> 2. Does the watermark extraction period depend on the watch interval, or
> should any watch interval (except -1L) work the same way?
>
> In my case the input is indeed finite and static, but contains hundreds of
> GBs, which made the window state grow quickly beyond the memory capacity,
> and the fact that the window contents were fired periodically helped
> keeping it small.
>
> Best,
> Yassine
>
> 2016-12-14 10:38 GMT+01:00 Aljoscha Krettek :
>
> Hi Yassine,
> for a bit more detailed explanation: We internally changed how the timer
> system works, this timer system is also used to periodically extract
> watermarks. Due to this change, in your case we don't extract watermarks
> anymore.
>
> Internally, your call resolves to something like this:
>
> Env.readFile(FileInputFormat inputFormat, String
> filePath, FileProcessingMode watchType, long interval)
>
> with the FileProcessingMode being set to PROCESS_ONCE.
>
> To get back the old behaviour you can call this method directly with
> PROCESS_CONTINUOUSLY. This will keep the pipeline running and will also
> ensure that watermarks keep being extracted.
>
> In your case, it is not strictly wrong to emit only one large watermark in
> the end because your processing is finite. I admit that the change from
> Flink 1.1 seems a bit strange but this should only occur in toy examples
> where the data is finite.
>
> Does that help?
>
> Cheers,
> Aljoscha
>
> On Tue, 13 Dec 2016 at 18:17 Aljoscha Krettek  wrote:
>
> Hi Yassine,
> I managed to reproduce the problem. The cause is that we recently changed
> how the timer service is being cleaned up and now the watermark timers are
> not firing anymore.
>
> I'll keep you posted and hope to find a solution fast.
>
> Cheers,
> Aljoscha
>
> On Sun, 11 Dec 2016 at 22:10 Yassine MARZOUGUI 
> wrote:
>
> Hi Aljoscha,
>
> Please excuse me for the late response; I've been busy for the whole
> previous week.
> I used the custom watermark debugger (with 1.1, I changed 
> super.processWatermark(mark)
> to super.output.emitWatermark(mark)), surprisingly with 1.2, only one
> watremark is printed at the end of the stream with the value WM: Watermark
> @ 9223372036854775807 (Long.MAX_VALUE), whereas with 1.1, watermarks are
> printed periodically. I am  using the following revision of 1.2-SNAPSHOT :
> https://github.com/apache/flink/tree/4e336c692b74f218ba09844a46f49534e3a210e9
> .
>
> I uploaded the dataset I'm using as an input here :
> https://drive.google.com/file/d/0BzERCAJnxXocNGpMTGMzX09id1U/view?usp=sharing
>  ,the first column corresponds to the timestamp.
>
> You can find the code below. Thanks you for your help.
>
> import com.opencsv.CSVParser;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
> import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
> import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.util.Collector;
> import java.util.*;
>
> /**
>  * Created by ymarzougui on 11/1/2016.
>  */
> public class SortedSessionsAssigner {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> DataStream> waterMarked =
> env.readTextFile("file:///E:\\data\\anonymized.csv")
> .flatMap(new RichFlatMapFunction Tuple

Re: Jar hell when running streaming job in YARN session

2016-12-15 Thread Kidong Lee
To avoid guava conflict, I use maven shade plugin to package my fat jar.
If you use maven, the shade plugin looks like this:
...


   org.apache.maven.plugins
   maven-shade-plugin
   2.4.2
   
  false
  true
  flink-job
  
 
com.google
yourpackage.shaded.google
 
  
  
 
 
META-INF/spring.handlers
 
 
META-INF/spring.schemas
 
  
  
 
*:*

   org/datanucleus/**
   META-INF/*.SF
   META-INF/*.DSA
   META-INF/*.RSA

 
  
   

...


To package fat jar:

mvn -e -DskipTests=true clean install shade:shade;


I hope, it helps.

- Kidong Lee.





2016-12-15 19:04 GMT+09:00 Yury Ruchin :

> Hi,
>
> I have run into a classpath issue when running Flink streaming job in YARN
> session. I package my app into a fat jar with all the dependencies needed.
> One of them is Google Guava. I then submit the jar to the session. The task
> managers pre-created by the session build their classpath from the
> FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
> dated Guava version pulled along with Hadoop dependencies which conflicts
> with the version my app needs. Even worse, the Flink lib dir and Hadoop
> libraries take precedence over my jar.
>
> If I remember correctly, in Spark there is an option meaning "user
> classpath goes first when looking for classes". I couldn't find anything
> similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
> the hope to extend the classpath but the old Guava version takes precedence
> anyway.
>
> How else can I bump "priority" of my app jar during the load process?
>
> Thanks,
> Yury
>


Jar hell when running streaming job in YARN session

2016-12-15 Thread Yury Ruchin
Hi,

I have run into a classpath issue when running Flink streaming job in YARN
session. I package my app into a fat jar with all the dependencies needed.
One of them is Google Guava. I then submit the jar to the session. The task
managers pre-created by the session build their classpath from the
FLINK_LIB_DIR and Hadoop / YARN lib directories. Unfortunately, there is a
dated Guava version pulled along with Hadoop dependencies which conflicts
with the version my app needs. Even worse, the Flink lib dir and Hadoop
libraries take precedence over my jar.

If I remember correctly, in Spark there is an option meaning "user
classpath goes first when looking for classes". I couldn't find anything
similar for Flink. I tried "flink run -C file:///path/to/my/libraries" in
the hope to extend the classpath but the old Guava version takes precedence
anyway.

How else can I bump "priority" of my app jar during the load process?

Thanks,
Yury


Re: Question about Memory Manage in the Streaming mode

2016-12-15 Thread Tao Meng
Thanks a lot.![](https://link.nylas.com/open/f0mvqfd5d2e8i8vyg0632ikp4/local-
af73d072-e4ca?r=dXNlckBmbGluay5hcGFjaGUub3Jn)

  
On 12月 15 2016, at 5:39 下午, Timo Walther  wrote:  

> Hi Tao,  
  
no, streaming jobs do not use managed memory yet. Managed memory is useful for
sorting, joining and grouping bounded data. Unbounded stream do not need that.  
  
It could be used in the future e.g. to store state or for new operators, but
is this is not on the roadmap so far.  
  
Regards,  
Timo  
  
  
Am 15/12/16 um 10:30 schrieb Tao Meng:  

>

>> Hi all,

>>

>>  

>>

>>   I have some questions about memory management in the Streaming mode.

>>

>>  

>>

>>   Do the Streaming jobs use the memory management module ?

>>

>> If they don't,  for what considerations do not ?  Because Data exchange is
too frequent ?

>>

>> Is there a plan to let streaming job use it ?

>>

>>  

>>

>> Thanks a lot.

>>

>>  

>>

>> ![](https://link.nylas.com/open/f0mvqfd5d2e8i8vyg0632ikp4/local-82438d7a-
3fd8?r=dXNlckBmbGluay5hcGFjaGUub3Jn)

>

>  



Re: Question about Memory Manage in the Streaming mode

2016-12-15 Thread Timo Walther

Hi Tao,

no, streaming jobs do not use managed memory yet. Managed memory is 
useful for sorting, joining and grouping bounded data. Unbounded stream 
do not need that.


It could be used in the future e.g. to store state or for new operators, 
but is this is not on the roadmap so far.


Regards,
Timo


Am 15/12/16 um 10:30 schrieb Tao Meng:

Hi all,

  I have some questions about memory management in the Streaming mode.

  Do the Streaming jobs use the memory management module ?
If they don't,  for what considerations do not ?  Because Data 
exchange is too frequent ?

Is there a plan to let streaming job use it ?

Thanks a lot.





Question about Memory Manage in the Streaming mode

2016-12-15 Thread Tao Meng
Hi all,

  

  I have some questions about memory management in the Streaming mode.

  

  Do the Streaming jobs use the memory management module ?

If they don't,  for what considerations do not ?  Because Data exchange is too
frequent ?

Is there a plan to let streaming job use it ?

  

Thanks a lot.

  

![](https://link.nylas.com/open/f0mvqfd5d2e8i8vyg0632ikp4/local-82438d7a-
3fd8?r=dXNlckBmbGluay5hcGFjaGUub3Jn)



RE: Standalone cluster layout

2016-12-15 Thread Avihai Berkovitz
Thank you for the answers. The cluster will run in Azure, so I will be using 
HDFS over Azure Blob Store, as outlined in 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Azure-Blob-Storage-Connector-td8536.html
I got pretty good performance in my tests, and it should handle scaling well. 
We will see how it performs under real production loads.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Wednesday, December 14, 2016 4:59 PM
To: user@flink.apache.org
Subject: Re: Standalone cluster layout

Hi Avihai,

1. As much as possible (I would leave the operating system at least 1 GB of 
memory). It depends also on the workload you have. For streaming workload with 
very small state, you can use Flink with 1-2 GB of heap space and still get 
very good performance.
2. Yes, I would recommend to run one large Taskmanager per machine, because you 
save on "management overhead" and you benefit from faster data transfers 
locally.
3. If you give your Taskmanagers say 10 GB of heap, its likely that the process 
in the OS is using ~12 GB of memory in total (our network stack is also using 
some offheap memory). You can fine-tune the (memory) behavior of Rocks, but by 
default its not using a lot of memory.

4. I would give it at least 2 GB, if you run multiple jobs or larger jobs (high 
parallelism, many machines, many tasks), than maybe even more.


The layout of the standalone cluster looks good.
Where are you planning to write the state checkpoints to? Given that you have 
500 Gb of state, you should consider how you want to store that state somewhere 
reliably. For larger states, its recommended to have a good network connection 
between the workers (machines running TMs) and the distributed file system (say 
S3, HDFS, ...).



On Tue, Dec 13, 2016 at 5:41 PM, Avihai Berkovitz 
mailto:avihai.berkov...@microsoft.com>> wrote:
Hi folks,

I am setting up a Flink cluster for testing, and I have a few questions 
regarding memory allocations:

  1.  Is there a recommended limit to the size of a TaskManager heap? I saw 
that Flink uses G1GC, so we can use dozens of GB.
  2.  Following the above question, should I use only one TaskManager process 
per machine, and give it all the available memory (minus a couple of GB for the 
OS)?
  3.  Should I reserve some memory for RocksDB? The partitioned state will be 
around 500GB in size, and to my understanding RocksDB runs in native code and 
so uses off-heap memory.
  4.  What is the recommended heap size of a JobManager? I expect that the 
cluster will run only 2 jobs at the same time.

The planned layout of the standalone cluster is:

  *   3 small JobManager machines, running:

 *   1 process of Zookeeper peer
 *   1 JobManager process

  *   N large TaskManager machines, each running 1 TM process

Thanks!
Avihai