RE: flink-storm FlinkLocalCluster issue

2016-02-26 Thread #ZHANG SHUHAO#
Thanks for the confirmation.
When will 1.0 be ready in maven repo?

From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Friday, February 26, 2016 9:07 PM
To: user@flink.apache.org
Subject: Re: flink-storm FlinkLocalCluster issue

Hi!

On 0.10.x, the Storm compatibility layer does not properly configure the Local 
Flink Executor to have the right parallelism.

In 1.0 that is fixed. If you try the latest snapshot, or the 
1.0-Release-Candidate-1, it should work.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# 
> wrote:
Hi till,

Thanks for your reply.
But it appears that it only started with #slot of 1.
I have traced down to the source code of flink step by step, where I have 
confirmed it.

I'm using flink 0.10.2, source code downloaded from flink website. Nothing have 
been changed. I simply try to run the flink-Storm word count local example.

It just failed to work.


Sent from my iPhone

On 26 Feb 2016, at 6:16 PM, Till Rohrmann 
> wrote:

Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility 
layer and not Flink itself. When you run your job locally, the 
LocalFlinkMiniCluster should be started with as many slots as your maximum 
degree of parallelism is in your topology. You can check this in 
FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then 
you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till
​

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# 
> wrote:
Hi everyone,

I’m a student researcher working on Flink recently.

I’m trying out the flink-storm example project, version 0.10.2, 
flink-storm-examples, word-count-local.

But, I got the following error:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not 
enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) 
- [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing 
group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, 
b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. 
Resources available to scheduler: Number of instances=1, total number of 
slots=1, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I notice that by default, task manager has only one slot, changing the setting 
in flink-conf does not help as I want to debug locally through 
FlinkLocalCluster (not to submit it locally).

I have try the following:


Import backtype.storm.Config;




Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, 

Re: Job Manager HA manual setup

2016-02-26 Thread Welly Tambunan
typos


We have tried this one the job manager can failover, but the task manager
CAN'T be relocated to the new task manager. Is there some settings for this
one ? Or is the task manager also can be relocate to the new job manager ?

Cheers

On Sat, Feb 27, 2016 at 7:27 AM, Welly Tambunan  wrote:

> Hi All,
>
> We have already try to setup the Job Manager HA based on the documentation
> and using script and provided zookeeper. It works.
>
> However currently everything is done using start-cluster script that I
> believe will require passwordlress ssh between node. We are restricted with
> our environment so this one is not possible.
>
> Is it possible to setup the Job Manager HA manually ? By starting each job
> manager with in each node and task manager. We have our zookeeper and hdfs
> cluster already.
>
> We have tried this one the job manager can failover, but the task manager
> can be relocated to the new task manager. Is there some settings for this
> one ? Or is the task manager also can be relocate to the new job manager ?
>
> Any more details on the mechanism used on Job Manager HA and interaction
> with Zookeeper ?
>
> Is task manager also registered on Zookeeper ? How they find the right job
> manager master ?
>
>
> Thanks for your help.
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>



-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Job Manager HA manual setup

2016-02-26 Thread Welly Tambunan
Hi All,

We have already try to setup the Job Manager HA based on the documentation
and using script and provided zookeeper. It works.

However currently everything is done using start-cluster script that I
believe will require passwordlress ssh between node. We are restricted with
our environment so this one is not possible.

Is it possible to setup the Job Manager HA manually ? By starting each job
manager with in each node and task manager. We have our zookeeper and hdfs
cluster already.

We have tried this one the job manager can failover, but the task manager
can be relocated to the new task manager. Is there some settings for this
one ? Or is the task manager also can be relocate to the new job manager ?

Any more details on the mechanism used on Job Manager HA and interaction
with Zookeeper ?

Is task manager also registered on Zookeeper ? How they find the right job
manager master ?


Thanks for your help.

Cheers
-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Watermarks with repartition

2016-02-26 Thread Aljoscha Krettek
Yes, that would be perfect. Thanks!

--
Aljoscha
> On 26 Feb 2016, at 20:53, Zach Cox  wrote:
> 
> Sure, want me to open a jira issue and then PR a new page into 
> https://github.com/apache/flink/tree/master/docs/internals, following these 
> instructions? http://flink.apache.org/contribute-documentation.html
> 
> -Zach
> 
> 
> On Fri, Feb 26, 2016 at 1:13 PM Aljoscha Krettek  wrote:
> Cool, that’s a nice write up. Would you maybe be interested in integrating 
> this as some sort of internal documentation in Flink? So that prospective 
> contributors can get to know this stuff.
> 
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 18:32, Zach Cox  wrote:
> >
> > Thanks for the confirmation Aljoscha! I wrote up results from my little 
> > experiment: https://github.com/zcox/flink-repartition-watermark-example
> >
> > -Zach
> >
> >
> > On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek  
> > wrote:
> > Hi,
> > yes, your description is spot on!
> >
> > Cheers,
> > Aljoscha
> > > On 26 Feb 2016, at 00:19, Zach Cox  wrote:
> > >
> > > I think I found the information I was looking for:
> > >
> > > RecordWriter broadcasts each emitted watermark to all outgoing channels 
> > > [1].
> > >
> > > StreamInputProcessor tracks the max watermark received on each incoming 
> > > channel separately, and computes the task's watermark as the min of all 
> > > incoming watermarks [2].
> > >
> > > Is this an accurate summary of Flink's watermark propagation?
> > >
> > > So in my previous example, each window count task is building up a count 
> > > for each window based on incoming event's timestamp, and when all 
> > > incoming watermarks have progressed beyond the end of the window, the 
> > > count is emitted. So if one partition's watermark lags behind the other, 
> > > it just means the window output is triggered based on this lagging 
> > > watermark.
> > >
> > > -Zach
> > >
> > > [1] 
> > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > > [2] 
> > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> > >
> > >
> > > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox  wrote:
> > > Hi - how are watermarks passed along parallel tasks where there is a 
> > > repartition? For example, say I have a simple streaming job computing 
> > > hourly counts per key, something like this:
> > >
> > > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > > environment.setParallelism(2)
> > > environment.setStreamTimeCharacteristic(EventTime)
> > > environment.getConfig.enableTimestamps()
> > > environment
> > >   .addSource(...)
> > >   .assignAscendingTimestamps(_.timestamp)
> > >   .keyBy("someField")
> > >   .timeWindow(Time.hours(1))
> > >   .fold(0, (count, element) => count + 1)
> > >   .addSink(...)
> > > environment.execute("example")
> > >
> > > Say the source has 2 parallel partitions (e.g. Kafka topic) and the 
> > > events from the source contain timestamps, but over time the 2 source 
> > > tasks diverge in event time (maybe 1 Kafka topic partition has many more 
> > > events than the other).
> > >
> > > The job graph looks like this: http://imgur.com/hxEpF6b
> > >
> > > From what I can tell, the execution graph, with parallelism=2, would look 
> > > like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to 
> > > be used, so that events with the same key end up at the same window 
> > > subtask, regardless of which source partition they came from.
> > >
> > > Since the watermarks are skewed between the parallel pipelines, what 
> > > happens when differing watermarks are sent to the window count operators? 
> > > Is something tracking the min incoming watermark there? Could anyone 
> > > point me to Flink code that implements this? I'd really like to learn 
> > > more about how this works.
> > >
> > > Thanks,
> > > Zach
> > >
> > >
> >
> 



Re: Watermarks with repartition

2016-02-26 Thread Aljoscha Krettek
Cool, that’s a nice write up. Would you maybe be interested in integrating this 
as some sort of internal documentation in Flink? So that prospective 
contributors can get to know this stuff.

Cheers,
Aljoscha
> On 26 Feb 2016, at 18:32, Zach Cox  wrote:
> 
> Thanks for the confirmation Aljoscha! I wrote up results from my little 
> experiment: https://github.com/zcox/flink-repartition-watermark-example
> 
> -Zach
> 
> 
> On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek  wrote:
> Hi,
> yes, your description is spot on!
> 
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 00:19, Zach Cox  wrote:
> >
> > I think I found the information I was looking for:
> >
> > RecordWriter broadcasts each emitted watermark to all outgoing channels [1].
> >
> > StreamInputProcessor tracks the max watermark received on each incoming 
> > channel separately, and computes the task's watermark as the min of all 
> > incoming watermarks [2].
> >
> > Is this an accurate summary of Flink's watermark propagation?
> >
> > So in my previous example, each window count task is building up a count 
> > for each window based on incoming event's timestamp, and when all incoming 
> > watermarks have progressed beyond the end of the window, the count is 
> > emitted. So if one partition's watermark lags behind the other, it just 
> > means the window output is triggered based on this lagging watermark.
> >
> > -Zach
> >
> > [1] 
> > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > [2] 
> > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> >
> >
> > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox  wrote:
> > Hi - how are watermarks passed along parallel tasks where there is a 
> > repartition? For example, say I have a simple streaming job computing 
> > hourly counts per key, something like this:
> >
> > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > environment.setParallelism(2)
> > environment.setStreamTimeCharacteristic(EventTime)
> > environment.getConfig.enableTimestamps()
> > environment
> >   .addSource(...)
> >   .assignAscendingTimestamps(_.timestamp)
> >   .keyBy("someField")
> >   .timeWindow(Time.hours(1))
> >   .fold(0, (count, element) => count + 1)
> >   .addSink(...)
> > environment.execute("example")
> >
> > Say the source has 2 parallel partitions (e.g. Kafka topic) and the events 
> > from the source contain timestamps, but over time the 2 source tasks 
> > diverge in event time (maybe 1 Kafka topic partition has many more events 
> > than the other).
> >
> > The job graph looks like this: http://imgur.com/hxEpF6b
> >
> > From what I can tell, the execution graph, with parallelism=2, would look 
> > like this: http://imgur.com/pSX8ov5. The keyBy causes a hash partition to 
> > be used, so that events with the same key end up at the same window 
> > subtask, regardless of which source partition they came from.
> >
> > Since the watermarks are skewed between the parallel pipelines, what 
> > happens when differing watermarks are sent to the window count operators? 
> > Is something tracking the min incoming watermark there? Could anyone point 
> > me to Flink code that implements this? I'd really like to learn more about 
> > how this works.
> >
> > Thanks,
> > Zach
> >
> >
> 



Re: Counting tuples within a window in Flink Stream

2016-02-26 Thread Stephan Ewen
Yes, Gyula, that should work. I would make the random key across a range of
10 * parallelism.




On Fri, Feb 26, 2016 at 7:16 PM, Gyula Fóra  wrote:

> Hey,
>
> I am wondering if the following code will result in identical but more
> efficient (parallel):
>
> input.keyBy(assignRandomKey).window(Time.seconds(10)
> ).reduce(count).timeWindowAll(Time.seconds(10)).reduce(count)
>
> Effectively just assigning random keys to do the preaggregation and then
> do a window on the pre-aggregated values. I wonder if this actually leads
> to correct results or how does it interplay with the time semantics.
>
> Cheers,
> Gyula
>
> Stephan Ewen  ezt írta (időpont: 2016. febr. 26., P,
> 19:10):
>
>> True, at this point it does not pre-aggregate in parallel, that is
>> actually a feature on the list but not yet added...
>>
>> On Fri, Feb 26, 2016 at 7:08 PM, Saiph Kappa 
>> wrote:
>>
>>> That code will not run in parallel right? So, a map-reduce task would
>>> yield better performance no?
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 6:06 PM, Stephan Ewen  wrote:
>>>
 Then go for:

 input.timeWindowAll(Time.seconds(10)).fold(0, new
 FoldFunction, Integer>() { @Override public
 Integer fold(Integer integer, Tuple2 o) throws Exception
 { return integer + 1; } });

 Try to explore the API a bit, most things should be quite intuitive.
 There are also some docs:
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams

 On Fri, Feb 26, 2016 at 4:07 PM, Saiph Kappa 
 wrote:

> Why the ".keyBy"? I don't want to count tuples by Key. I simply want
> to count all tuples that are contained in a window.
>
> On Fri, Feb 26, 2016 at 9:14 AM, Till Rohrmann 
> wrote:
>
>> Hi Saiph,
>>
>> you can do it the following way:
>>
>> input.keyBy(0).timeWindow(Time.seconds(10)).fold(0, new 
>> FoldFunction, Integer>() {
>> @Override
>> public Integer fold(Integer integer, Tuple2 o) 
>> throws Exception {
>> return integer + 1;
>> }
>> });
>>
>> Cheers,
>> Till
>> ​
>>
>> On Thu, Feb 25, 2016 at 7:58 PM, Saiph Kappa 
>> wrote:
>>
>>> Hi,
>>>
>>> In Flink Stream what's the best way of counting the number of tuples
>>> within a window of 10 seconds? Using a map-reduce task? Asking because 
>>> in
>>> spark there is the method rawStream.countByWindow(Seconds(x)).
>>>
>>> Thanks.
>>>
>>
>>
>

>>>
>>


Re: Counting tuples within a window in Flink Stream

2016-02-26 Thread Stephan Ewen
Then go for:

input.timeWindowAll(Time.seconds(10)).fold(0, new
FoldFunction, Integer>() { @Override public
Integer fold(Integer integer, Tuple2 o) throws Exception
{ return integer + 1; } });

Try to explore the API a bit, most things should be quite intuitive.
There are also some docs:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams

On Fri, Feb 26, 2016 at 4:07 PM, Saiph Kappa  wrote:

> Why the ".keyBy"? I don't want to count tuples by Key. I simply want to
> count all tuples that are contained in a window.
>
> On Fri, Feb 26, 2016 at 9:14 AM, Till Rohrmann 
> wrote:
>
>> Hi Saiph,
>>
>> you can do it the following way:
>>
>> input.keyBy(0).timeWindow(Time.seconds(10)).fold(0, new 
>> FoldFunction, Integer>() {
>> @Override
>> public Integer fold(Integer integer, Tuple2 o) throws 
>> Exception {
>> return integer + 1;
>> }
>> });
>>
>> Cheers,
>> Till
>> ​
>>
>> On Thu, Feb 25, 2016 at 7:58 PM, Saiph Kappa 
>> wrote:
>>
>>> Hi,
>>>
>>> In Flink Stream what's the best way of counting the number of tuples
>>> within a window of 10 seconds? Using a map-reduce task? Asking because in
>>> spark there is the method rawStream.countByWindow(Seconds(x)).
>>>
>>> Thanks.
>>>
>>
>>
>


Re: Counting tuples within a window in Flink Stream

2016-02-26 Thread Stephan Ewen
True, at this point it does not pre-aggregate in parallel, that is actually
a feature on the list but not yet added...

On Fri, Feb 26, 2016 at 7:08 PM, Saiph Kappa  wrote:

> That code will not run in parallel right? So, a map-reduce task would
> yield better performance no?
>
>
>
> On Fri, Feb 26, 2016 at 6:06 PM, Stephan Ewen  wrote:
>
>> Then go for:
>>
>> input.timeWindowAll(Time.seconds(10)).fold(0, new
>> FoldFunction, Integer>() { @Override public
>> Integer fold(Integer integer, Tuple2 o) throws Exception
>> { return integer + 1; } });
>>
>> Try to explore the API a bit, most things should be quite intuitive.
>> There are also some docs:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams
>>
>> On Fri, Feb 26, 2016 at 4:07 PM, Saiph Kappa 
>> wrote:
>>
>>> Why the ".keyBy"? I don't want to count tuples by Key. I simply want to
>>> count all tuples that are contained in a window.
>>>
>>> On Fri, Feb 26, 2016 at 9:14 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Saiph,

 you can do it the following way:

 input.keyBy(0).timeWindow(Time.seconds(10)).fold(0, new 
 FoldFunction, Integer>() {
 @Override
 public Integer fold(Integer integer, Tuple2 o) 
 throws Exception {
 return integer + 1;
 }
 });

 Cheers,
 Till
 ​

 On Thu, Feb 25, 2016 at 7:58 PM, Saiph Kappa 
 wrote:

> Hi,
>
> In Flink Stream what's the best way of counting the number of tuples
> within a window of 10 seconds? Using a map-reduce task? Asking because in
> spark there is the method rawStream.countByWindow(Seconds(x)).
>
> Thanks.
>


>>>
>>
>


Re: Counting tuples within a window in Flink Stream

2016-02-26 Thread Saiph Kappa
That code will not run in parallel right? So, a map-reduce task would yield
better performance no?



On Fri, Feb 26, 2016 at 6:06 PM, Stephan Ewen  wrote:

> Then go for:
>
> input.timeWindowAll(Time.seconds(10)).fold(0, new
> FoldFunction, Integer>() { @Override public
> Integer fold(Integer integer, Tuple2 o) throws Exception
> { return integer + 1; } });
>
> Try to explore the API a bit, most things should be quite intuitive.
> There are also some docs:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams
>
> On Fri, Feb 26, 2016 at 4:07 PM, Saiph Kappa 
> wrote:
>
>> Why the ".keyBy"? I don't want to count tuples by Key. I simply want to
>> count all tuples that are contained in a window.
>>
>> On Fri, Feb 26, 2016 at 9:14 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Saiph,
>>>
>>> you can do it the following way:
>>>
>>> input.keyBy(0).timeWindow(Time.seconds(10)).fold(0, new 
>>> FoldFunction, Integer>() {
>>> @Override
>>> public Integer fold(Integer integer, Tuple2 o) throws 
>>> Exception {
>>> return integer + 1;
>>> }
>>> });
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Thu, Feb 25, 2016 at 7:58 PM, Saiph Kappa 
>>> wrote:
>>>
 Hi,

 In Flink Stream what's the best way of counting the number of tuples
 within a window of 10 seconds? Using a map-reduce task? Asking because in
 spark there is the method rawStream.countByWindow(Seconds(x)).

 Thanks.

>>>
>>>
>>
>


Re: Watermarks with repartition

2016-02-26 Thread Zach Cox
Thanks for the confirmation Aljoscha! I wrote up results from my little
experiment: https://github.com/zcox/flink-repartition-watermark-example

-Zach


On Fri, Feb 26, 2016 at 2:58 AM Aljoscha Krettek 
wrote:

> Hi,
> yes, your description is spot on!
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 00:19, Zach Cox  wrote:
> >
> > I think I found the information I was looking for:
> >
> > RecordWriter broadcasts each emitted watermark to all outgoing channels
> [1].
> >
> > StreamInputProcessor tracks the max watermark received on each incoming
> channel separately, and computes the task's watermark as the min of all
> incoming watermarks [2].
> >
> > Is this an accurate summary of Flink's watermark propagation?
> >
> > So in my previous example, each window count task is building up a count
> for each window based on incoming event's timestamp, and when all incoming
> watermarks have progressed beyond the end of the window, the count is
> emitted. So if one partition's watermark lags behind the other, it just
> means the window output is triggered based on this lagging watermark.
> >
> > -Zach
> >
> > [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java#L103
> > [2]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L147
> >
> >
> > On Thu, Feb 25, 2016 at 3:31 PM Zach Cox  wrote:
> > Hi - how are watermarks passed along parallel tasks where there is a
> repartition? For example, say I have a simple streaming job computing
> hourly counts per key, something like this:
> >
> > val environment = StreamExecutionEnvironment.getExecutionEnvironment
> > environment.setParallelism(2)
> > environment.setStreamTimeCharacteristic(EventTime)
> > environment.getConfig.enableTimestamps()
> > environment
> >   .addSource(...)
> >   .assignAscendingTimestamps(_.timestamp)
> >   .keyBy("someField")
> >   .timeWindow(Time.hours(1))
> >   .fold(0, (count, element) => count + 1)
> >   .addSink(...)
> > environment.execute("example")
> >
> > Say the source has 2 parallel partitions (e.g. Kafka topic) and the
> events from the source contain timestamps, but over time the 2 source tasks
> diverge in event time (maybe 1 Kafka topic partition has many more events
> than the other).
> >
> > The job graph looks like this: http://imgur.com/hxEpF6b
> >
> > From what I can tell, the execution graph, with parallelism=2, would
> look like this: http://imgur.com/pSX8ov5. The keyBy causes a hash
> partition to be used, so that events with the same key end up at the same
> window subtask, regardless of which source partition they came from.
> >
> > Since the watermarks are skewed between the parallel pipelines, what
> happens when differing watermarks are sent to the window count operators?
> Is something tracking the min incoming watermark there? Could anyone point
> me to Flink code that implements this? I'd really like to learn more about
> how this works.
> >
> > Thanks,
> > Zach
> >
> >
>
>


Re: Need some help to understand the cause of the error

2016-02-26 Thread Nirmalya Sengupta
Hello  Aljoscha,

I am using Flink 0.10.1 (see below) and flinkspector (0.1-SNAPSHOT).

-


org.apache.flink
flink-scala
0.10.1


org.apache.flink
flink-streaming-scala
0.10.1


org.apache.flink
flink-clients
0.10.1


  com.dataArtisans
  flink-training-exercises
  0.1


   org.flinkspector
   flinkspector-datastream
   0.1-SNAPSHOT




-

Many thanks in advance for your effort.

-- Nirmalya


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Frequent exceptions killing streaming job

2016-02-26 Thread Nick Dimiduk
Sorry I wasn't clear. No, the lock contention is not in Flink.

On Friday, February 26, 2016, Stephan Ewen  wrote:

> Was the contended lock part of Flink's runtime, or the application code?
> If it was part of the Flink Runtime, can you share what you found?
>
> On Thu, Feb 25, 2016 at 6:03 PM, Nick Dimiduk  > wrote:
>
>> For what it's worth, I dug into the TM logs and found that this exception
>> was not the root cause, merely a symptom of other backpressure building in
>> the flow (actually, lock contention in another part of the stack). While
>> Flink was helpful in finding and bubbling up this stack to the UI, it was
>> ultimately missleading, caused me to overlook proper evaluation of the
>> failure.
>>
>> On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger > > wrote:
>>
>>> Hey Nick,
>>>
>>> I had a discussion with Stephan Ewen on how we could resolve the issue.
>>> I filed a JIRA with our suggested approach:
>>> https://issues.apache.org/jira/browse/FLINK-3264
>>>
>>> By handling this directly in the KafkaConsumer, we would avoid fetching
>>> data we can not handle anyways (discarding in the deserialization schema
>>> would be more inefficient).
>>>
>>> Let us know what you think about our suggested approach.
>>>
>>> Sadly, it seems that the Kafka 0.9 consumer API does not yet support
>>> requesting the latest offset of a TopicPartition. I'll ask about this on
>>> their ML.
>>>
>>>
>>>
>>>
>>> On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk >> > wrote:
>>>
 On Sunday, January 17, 2016, Stephan Ewen > wrote:

> I agree, real time streams should never go down.
>

  Glad to hear that :)


> [snip] Both should be supported.
>

 Agreed.


> Since we interpret streaming very broadly (also including analysis of
> historic streams or timely data), the "backpressure/catch-up" mode seemed
> natural as the first one to implement.
>

 Indeed, this is what my job is doing. I have set it to, lacking a valid
 offset, start from the beginning. I have to presume that in my case the
 stream data is expiring faster than my consumers can keep up. However I
 haven't investigated proper monitoring yet.


> The "load shedding" variant can probably even be realized in the Kafka
> consumer, without complex modifications to the core Flink runtime itself.
>

 I agree here as well. Indeed, this exception is being thrown from the
 consumer, not the runtime.



> On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk 
> wrote:
>
>> This goes back to the idea that streaming applications should never
>> go down. I'd much rather consume at max capacity and knowingly drop some
>> portion of the incoming pipe than have the streaming job crash. Of 
>> course,
>> once the job itself is robust, I still need the runtime to be robust --
>> YARN vs (potential) Mesos vs standalone cluster will be my next
>> consideration.
>>
>> I can share some details about my setup, but not at this time; in
>> part because I don't have my metrics available at the moment and in part
>> because this is a public, archived list.
>>
>> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen 
>> wrote:
>>
>>> @Robert: Is it possible to add a "fallback" strategy to the
>>> consumer? Something like "if offsets cannot be found, use latest"?
>>>
>>> I would make this an optional feature to activate. I would think it
>>> is quite surprising to users if records start being skipped in certain
>>> situations. But I can see that this would be desirable sometimes.
>>>
>>> More control over skipping the records could be something to
>>> implement in an extended version of the Kafka Consumer. A user could 
>>> define
>>> a policy that, in case consumer falls behind producer more than X
>>> (offsets), it starts requesting the latest offsets (rather than the
>>> following), thereby skipping a bunch of records.
>>>
>>>
>>>
>>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger >> > wrote:
>>>
 Hi Nick,

 I'm sorry you ran into the issue. Is it possible that Flink's Kafka
 consumer falls back in the topic so far that the offsets it's 
 requesting
 are invalid?

 For that, the retention time of Kafka has to be pretty short.

 Skipping records under load is something currently not supported by
 Flink itself. The only idea I had for handling this would be to give 
 

Re: Need some help to understand the cause of the error

2016-02-26 Thread Aljoscha Krettek
Hi,
which version of Flink are you using, by the way? This would help me narrow 
down on possible causes of the problem.

Cheers,
Aljoscha
> On 26 Feb 2016, at 10:34, Nirmalya Sengupta  
> wrote:
> 
> Hello Aljoscha,
> 
> I have also tried by using the field's name in the sum("field3") function 
> (like you have suggested), but this time the exception is different:
> 
> 
> 
> Exception in thread "main" java.lang.ExceptionInInitializerError
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IllegalArgumentException: Key expressions are only 
> supported on POJO types and Tuples. A type is considered a POJO if all its 
> fields are public, or have both getters and setters defined
>   at 
> org.apache.flink.streaming.util.FieldAccessor$PojoFieldAccessor.(FieldAccessor.java:175)
>   at 
> org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:92)
>   at 
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:49)
>   at 
> org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:390)
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala:63)
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala)
>   ... 6 more
> 
> -
> 
> I have used both of the styles of calling sum() in earlier, but I never got 
> this exception. That's why I am a little confused. 'Reading' is a _case 
> class_ and hence, a POJO as I understand. Yet, the message seems to say that 
> it is not.
> 
> --  Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."



Iterations problem in command line

2016-02-26 Thread Marcela Charfuelan

Hello,

I implemented an algorithm that includes iterations (EM algorithm) and I 
am getting different results when running in eclipse (Luna Release 
(4.4.0)) and when running in the command line using Flink run; the 
program does not crash is just that after the first iteration the 
results are different (wrong in the command line).


The solution I am getting in eclipse, for each iteration, is the same 
that I would get if running the algorithm in octave for example, so I am 
sure the solution is correct.


I have tried using java plus the command-line arguments of eclipse on 
the command line and that also works ok (local in ubuntu).


Has anybody experienced something similar? Any idea why this could 
happen? how can I fix this?


Regards,
Marcela.


Re: Counting tuples within a window in Flink Stream

2016-02-26 Thread Saiph Kappa
Why the ".keyBy"? I don't want to count tuples by Key. I simply want to
count all tuples that are contained in a window.

On Fri, Feb 26, 2016 at 9:14 AM, Till Rohrmann  wrote:

> Hi Saiph,
>
> you can do it the following way:
>
> input.keyBy(0).timeWindow(Time.seconds(10)).fold(0, new 
> FoldFunction, Integer>() {
> @Override
> public Integer fold(Integer integer, Tuple2 o) throws 
> Exception {
> return integer + 1;
> }
> });
>
> Cheers,
> Till
> ​
>
> On Thu, Feb 25, 2016 at 7:58 PM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> In Flink Stream what's the best way of counting the number of tuples
>> within a window of 10 seconds? Using a map-reduce task? Asking because in
>> spark there is the method rawStream.countByWindow(Seconds(x)).
>>
>> Thanks.
>>
>
>


Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-26 Thread Aljoscha Krettek
Hi Hung,
after some discussion the way that window functions are used will change back 
to the way it was in 0.10.x, i.e. the Iterable is always part of the apply 
function.

Sorry for the inconvenience this has caused.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:48, Aljoscha Krettek  wrote:
> 
> Hi,
> yes that seems to have been the issue. The Math.max() is used to ensure that 
> the timestamp does never decrease, because this is not allowed for a 
> watermark.
> 
> Cheers,
> Aljoscha
>> On 26 Feb 2016, at 11:11, HungChang  wrote:
>> 
>> Ah! My incorrect code segment made the Watermark not going forward and always
>> stay at the same moment in the past. Is that true and the issue?
>> 
>> Cheers,
>> 
>> Hung
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5186.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: Compile issues with Flink 1.0-SNAPSHOT and Scala 2.11

2016-02-26 Thread Stephan Ewen
Hi Cory,

there is also a new release candidate which should be clean dependency
wise. I hope it is feasible for you to stay on stable versions.

The CI infrastructure still seems to have issues that mix Scala versions
between snapshot builds. We are looking into this...

Stephan


On Wed, Feb 24, 2016 at 7:03 PM, Till Rohrmann  wrote:

> I just tested building a Flink job using the latest SNAPSHOT version and
> the flink-connector-kafka-0.8/flink-connector-kafka-0.9 Kafka connector.
> The compilation succeeded with SBT.
>
> Could you maybe share your build.sbt with me. This would help me to
> figure out the problem you’re experiencing.
>
> Cheers,
> Till
> ​
>
> On Wed, Feb 24, 2016 at 6:37 PM, Cory Monty 
> wrote:
>
>> What Dan posted on 2/22 is the current error we're seeing. As he stated,
>> using the 1.0.0-rc0 version works, but switching back to SNAPSHOT does not
>> compile. We can try clearing the ivy cache, but that has had no affect in
>> the past.
>>
>> On Wed, Feb 24, 2016 at 11:34 AM, Till Rohrmann 
>> wrote:
>>
>>> What is currently the error you observe? It might help to clear
>>> org.apache.flink in the ivy cache once in a while.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Feb 24, 2016 at 6:09 PM, Cory Monty >> > wrote:
>>>
 We're still seeing this issue in the latest SNAPSHOT version. Do you
 have any suggestions to resolve the error?

 On Mon, Feb 22, 2016 at 3:41 PM, Dan Kee  wrote:

> Hello,
>
> I'm not sure if this related, but we recently started seeing this when
> using `1.0-SNAPSHOT` in the `snapshots` repository:
>
> [error] Modules were resolved with conflicting cross-version suffixes in 
> {file:/home/ubuntu/bt/}flinkproject:
> [error]org.apache.kafka:kafka _2.10, _2.11
> java.lang.RuntimeException: Conflicting cross-version suffixes in: 
> org.apache.kafka:kafka
>   at scala.sys.package$.error(package.scala:27)
>   at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
>   at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
>   at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1164)
>   at sbt.Classpaths$$anonfun$66.apply(Defaults.scala:1161)
>   at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>   at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
>   at sbt.std.Transform$$anon$4.work(System.scala:63)
>   at 
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>   at 
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
>   at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
>   at sbt.Execute.work(Execute.scala:235)
>   at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>   at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
>   at 
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
>   at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>
> We switched our project to use `1.0.0` in the `orgapacheflink-1062`
> repository and that works.  Just wanted to let you know about the error we
> seeing with the snapshot version.
>
> Thanks!
>
> —Dan
>
> On Fri, Feb 12, 2016 at 8:41 AM, Cory Monty <
> cory.mo...@getbraintree.com> wrote:
>
>> Thanks, Stephan.
>>
>> Everything is back to normal for us.
>>
>> Cheers,
>>
>> Cory
>>
>> On Fri, Feb 12, 2016 at 6:54 AM, Stephan Ewen 
>> wrote:
>>
>>> Hi Cory!
>>>
>>> We found the problem. There is a development fork of Flink for
>>> Stream SQL, whose CI infrastructure accidentally also deployed snapshots
>>> and overwrote some of the proper master branch snapshots.
>>>
>>> That's why the snapshots got inconsistent. We fixed that, and newer
>>> snapshots should be online.
>>> Hope that this is resolved now.
>>>
>>> Sorry for the inconvenience,
>>> Stephan
>>>
>>>
>>> On Fri, Feb 12, 2016 at 12:51 AM, Stephan Ewen 
>>> wrote:
>>>
 Hi!

 The CI system has just finished uploading an new snapshot. In that
 one, the scalatest dependency is now correctly at 2.11 again.


 

Re: Graph with stream of updates

2016-02-26 Thread Vasiliki Kalavri
Hi Ankur,

you can have custom state in your Flink operators, including a graph. There
is no graph state abstraction provided at the moment, but it shouldn't be
too hard for you to implement your own.
If your use-case only requires processing edge additions only, then you
might want to take a look into gelly-stream [1]. Is it a single-pass graph
streaming API, processing edge additions, and operating on graph summaries.

Cheers,
-Vasia.

[1]: https://github.com/vasia/gelly-streaming

On 26 February 2016 at 14:59, Ankur Sharma 
wrote:

> Hello,
>
> Thanks for reply.
>
> I want to create a graph from stream and query it. You got it right.
>
> Stream may be edges that are getting added or removed from the graph.
>
> Is there a way to create a empty global graph that can be transformed
> using a stream of updates?
>
> Best,
> *Ankur Sharma*
> *3.15 E1.1 Universität des Saarlandes*
> *66123, Saarbrücken Germany*
> *Email: ankur.sha...@mpi-inf.mpg.de  *
> *an...@stud.uni-saarland.de *
>
> On 26 Feb 2016, at 14:55, Robert Metzger  wrote:
>
> Hi Ankur,
>
> Can you provide a bit more information on what you are trying to achieve?
>
> Do you want to keep a graph build from an stream of events within Flink
> and query that?
> Or you you want to change the dataflow graph of Flink while a job is
> running?
>
> Regards,
> Robert
>
>
> On Thu, Feb 25, 2016 at 11:19 PM, Ankur Sharma  > wrote:
>
>> Hello,
>>
>> Is it possible to create and update graph with streaming edge and vertex
>> data in flink?
>>
>> Best,
>> *Ankur Sharma*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sha...@mpi-inf.mpg.de  *
>> *an...@stud.uni-saarland.de *
>>
>>
>
>


Re: Graph with stream of updates

2016-02-26 Thread Ankur Sharma
Hello,

Thanks for reply.

I want to create a graph from stream and query it. You got it right.

Stream may be edges that are getting added or removed from the graph.

Is there a way to create a empty global graph that can be transformed using a 
stream of updates?

Best,
Ankur Sharma
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sha...@mpi-inf.mpg.de  
an...@stud.uni-saarland.de 
> On 26 Feb 2016, at 14:55, Robert Metzger  wrote:
> 
> Hi Ankur,
> 
> Can you provide a bit more information on what you are trying to achieve?
> 
> Do you want to keep a graph build from an stream of events within Flink and 
> query that?
> Or you you want to change the dataflow graph of Flink while a job is running?
> 
> Regards,
> Robert
> 
> 
> On Thu, Feb 25, 2016 at 11:19 PM, Ankur Sharma  > wrote:
> Hello,
> 
> Is it possible to create and update graph with streaming edge and vertex data 
> in flink?
> 
> Best,
> Ankur Sharma
> 3.15 E1.1 Universität des Saarlandes
> 66123, Saarbrücken Germany
> Email: ankur.sha...@mpi-inf.mpg.de  
> an...@stud.uni-saarland.de 
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Graph with stream of updates

2016-02-26 Thread Robert Metzger
Hi Ankur,

Can you provide a bit more information on what you are trying to achieve?

Do you want to keep a graph build from an stream of events within Flink and
query that?
Or you you want to change the dataflow graph of Flink while a job is
running?

Regards,
Robert


On Thu, Feb 25, 2016 at 11:19 PM, Ankur Sharma 
wrote:

> Hello,
>
> Is it possible to create and update graph with streaming edge and vertex
> data in flink?
>
> Best,
> *Ankur Sharma*
> *3.15 E1.1 Universität des Saarlandes*
> *66123, Saarbrücken Germany*
> *Email: ankur.sha...@mpi-inf.mpg.de  *
> *an...@stud.uni-saarland.de *
>
>


Re: streaming hdfs sub folders

2016-02-26 Thread Stephan Ewen
Hi!

Have a look at the class-level comments in "InputFormat". They should
describe how input formats first generate splits (for parallelization) on
the master, and the workers open each split.

So you need something like this:

AvroInputFormat avroInputFormat = new
AvroInputFormat(new
Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s),
EndSongCleanedPq.class);
avroInputFormat.setReuseAvroValue(false);

for (FileInputSplit split : avroInputFormat.createInputSplits()) {
avroInputFormat.open(split);

while (! avroInputFormat.reachedEnd()){
EndSongCleanedPq res =
avroInputFormat.nextRecord(new EndSongCleanedPq());
if (res != null) collector.collect(res);
}}}


Hope that helps.

Stephan



On Tue, Feb 23, 2016 at 12:04 PM, Martin Neumann  wrote:

> I'm not very familiar with the inner workings of the InputFomat's. calling
> .open() got rid of the Nullpointer but the stream still produces no output.
>
> As a temporary solution I wrote a batch job that just unions all the
> different datasets and puts them (sorted) into a single folder.
>
> cheers Martin
>
> On Fri, Feb 19, 2016 at 2:39 PM, Robert Metzger 
> wrote:
>
>> Hi Martin,
>>
>> where is the null pointer exception thrown?
>> I think you didn't call the open() method of the AvroInputFormat. Maybe
>> that's the issue.
>>
>> On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann  wrote:
>>
>>> I tried to implement your idea but I'm getting NullPointer exceptions
>>> from the AvroInputFormat any Idea what I'm doing wrong?
>>> See the code below:
>>>
>>> public static void main(String[] args) throws Exception {
>>>
>>> // set up the execution environment
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setParallelism(1);
>>>
>>> env.fromElements("00", "01", "02","03","22","23")
>>> .flatMap(new FileExtractor())
>>> .filter(new LocationFiter())
>>> .flatMap(new PreProcessEndSongClean())
>>> .writeAsCsv(outPath);
>>>
>>>
>>> env.execute("something");
>>> }
>>>
>>> private static class FileExtractor implements 
>>> FlatMapFunction{
>>>
>>> @Override
>>> public void flatMap(String s, Collector collector) 
>>> throws Exception {
>>> AvroInputFormat avroInputFormat = new 
>>> AvroInputFormat(new 
>>> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), 
>>> EndSongCleanedPq.class);
>>> avroInputFormat.setReuseAvroValue(false);
>>> while (! avroInputFormat.reachedEnd()){
>>> EndSongCleanedPq res = avroInputFormat.nextRecord(new 
>>> EndSongCleanedPq());
>>> if (res != null) collector.collect(res);
>>> }
>>> }
>>> }
>>>
>>>
>>> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann 
>>> wrote:
>>>
 I guess I need to set the parallelism for the FlatMap to 1 to make sure
 I read one file at a time. The downside I see with this is that I will be
 not able to read in parallel from HDFS (and the files are Huge).

 I give it a try and see how much performance I loose.

 cheers Martin

 On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:

> Martin,
>
> I think you can approximate this in an easy way like this:
>
>   - On the client, you traverse your directories to collect all files
> that you need, collect all file paths in a list.
>   - Then you have a source "env.fromElements(paths)".
>   - Then you flatMap and in the FlatMap, run the Avro input format
> (open it per path, then call it to get all elements)
>
> That gives you pretty much full control about in which order the files
> should be processed.
>
> What do you think?
>
> Stephan
>
>
> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann 
> wrote:
>
>> I forgot to mention I'm using an AvroInputFormat to read the file
>> (that might be relevant how the flag needs to be applied)
>> See the code Snipped below:
>>
>> DataStream inStream =
>> env.readFile(new AvroInputFormat(new 
>> Path(filePath), EndSongCleanedPq.class), filePath);
>>
>>
>> On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
>> wrote:
>>
>>> The program is a DataStream program, it usually it gets the data
>>> from kafka. It's an anomaly detection program that learns from the 
>>> stream
>>> itself. The reason I want to read from files is to test different 
>>> settings
>>> of the algorithm and compare them.
>>>
>>> I think I don't need to reply things in the exact order (wich is not
>>> possible with parallel reads anyway) and I have written the program so 
>>> it
>>> can deal with out of 

Re: Error in import of flink-streaming-examples project [StockPrices.java]

2016-02-26 Thread Stephan Ewen
I think this example refers to a much older version (0.8) and is no longer
compatible

On Wed, Feb 24, 2016 at 4:02 PM, subash basnet  wrote:

> Hello there,
>
> I imported the flink-streaming-examples project [
> https://github.com/mbalassi/flink/tree/stockprices/flink-staging/flink-streaming/flink-streaming-examples]
> into eclipse but it shows me error in StockSource and TweetSource class
> within StockPries.java.
> I have attached the screenshot for more clarity.
>
> When I point to the error symbol beside *StockSource* I get the following
> message:
> Multiple markers at this line
> - The type StockPrices.StockSource must implement the inherited abstract
> method
> SourceFunction.cancel()
> - The type StockPrices.StockSource must implement the inherited abstract
> method
>
> SourceFunction.run(SourceFunction.SourceContext)
>
> And for *reachEnd() *overriden method:
> The method reachedEnd() of type StockPrices.StockSource must override or
> implement a supertype method
>
> Do I need to import some another project, attach as source to run the
> StockPries.java.
>
>
> Best Regards,
> Subash Basnet
>


Re: flink-storm FlinkLocalCluster issue

2016-02-26 Thread Stephan Ewen
Hi!

On 0.10.x, the Storm compatibility layer does not properly configure the
Local Flink Executor to have the right parallelism.

In 1.0 that is fixed. If you try the latest snapshot, or the
1.0-Release-Candidate-1, it should work.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 12:16 PM, #ZHANG SHUHAO# 
wrote:

> Hi till,
>
> Thanks for your reply.
> But it appears that it only started with #slot of 1.
> I have traced down to the source code of flink step by step, where I have
> confirmed it.
>
> I'm using flink 0.10.2, source code downloaded from flink website. Nothing
> have been changed. I simply try to run the flink-Storm word count local
> example.
>
> It just failed to work.
>
>
> Sent from my iPhone
>
> On 26 Feb 2016, at 6:16 PM, Till Rohrmann  wrote:
>
> Hi Shuhao,
>
> the configuration you’re providing is only used for the storm
> compatibility layer and not Flink itself. When you run your job locally,
> the LocalFlinkMiniCluster should be started with as many slots as your
> maximum degree of parallelism is in your topology. You can check this in
> FlinkLocalCluster.java:96. When you submit your job to a remote cluster,
> then you have to define the number of slots in the flink-conf.yaml file.
>
> Cheers,
> Till
> ​
>
> On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# 
> wrote:
>
>> Hi everyone,
>>
>>
>>
>> I’m a student researcher working on Flink recently.
>>
>>
>>
>> I’m trying out the flink-storm example project, version 0.10.2,
>> flink-storm-examples, word-count-local.
>>
>>
>>
>> But, I got the following error:
>>
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decrease the
>> operator parallelism or increase the number of slots per TaskManager in the
>> configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @
>> (unassigned) - [SCHEDULED] > with groupID <
>> b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup
>> [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72,
>> cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler:
>> Number of instances=1, total number of slots=1, available slots=0
>>
>> at
>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
>>
>> at
>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>>
>> at
>> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>>
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>>
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>
>> at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>
>> at
>> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>> I notice that by default, task manager has only one slot, changing the
>> setting in flink-conf does not help as I want to debug locally through
>> FlinkLocalCluster (not to submit it locally).
>>
>>
>>
>> I have try the following:
>>
>>
>>
>> Import backtype.storm.Config;
>>
>>
>>
>>
>>
>> *Config config *= new Config();
>> *config*.put(ConfigConstants.*TASK_MANAGER_NUM_TASK_SLOTS*, 1024);
>> cluster.submitTopology(*topologyId*, *config*, ft);
>>
>>
>>
>>
>>
>> But it’s not 

Re: Flink streaming throughput

2016-02-26 Thread おぎばやしひろのり
Stephan,

Thank you for your quick response.
I will try and post the result later.

Regards,
Hironori

2016-02-26 19:45 GMT+09:00 Stephan Ewen :
> Hi!
>
> I would try and dig bit by bit into what the bottleneck is:
>
>  1) Disable the checkpointing, see what difference that makes
>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
> is limiting
>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
> easily dominate the entire pipeline.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり  wrote:
>>
>> Hello,
>>
>> I started evaluating Flink and tried simple performance test.
>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> think this is quite low and wondering if it is a reasonable result.
>> If someone could check it, it would be great.
>>
>> Here is the detail:
>>
>> [servers]
>> - 3 Kafka broker with 3 partitions
>> - 3 Flink TaskManager + 1 JobManager
>> - 1 Elasticsearch
>> All of them are separate VM with 8vCPU, 8GB memory
>>
>> [test case]
>> The application counts access log by URI with in 1 minute window and
>> send the result to Elasticsearch. The actual code is below.
>> I used '-p 3' option to flink run command, so the task was distributed
>> to 3 TaskManagers.
>> In the test, I sent about 5000 logs/sec to Kafka.
>>
>> [result]
>> - From Elasticsearch records, the total access count for all URI was
>> about 260,000/min = 4300/sec. This is the entire throughput.
>> - Kafka consumer lag was keep growing.
>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>> command output, Flink java process was using 100%(1 CPU full)
>>
>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>
>> Here is the application code.
>> ---
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.enableCheckpointing(1000)
>> ...
>> val stream = env
>>   .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> SimpleStringSchema(), properties))
>>   .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> AnyRef]] }
>>   .map{ x => x.get("uri") match {
>> case Some(y) => (y.asInstanceOf[String],1)
>> case None => ("", 1)
>>   }}
>>   .keyBy(0)
>>   .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>   .sum(1)
>>   .map{ x => (System.currentTimeMillis(), x)}
>>   .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>> override def createIndexRequest(element: Tuple2[Long,
>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>   val json = new HashMap[String, AnyRef]
>>   json.put("@timestamp", new Timestamp(element._1))
>>   json.put("uri", element._2._1)
>>   json.put("count", element._2._2: java.lang.Integer)
>>   println("SENDING: " + element)
>>
>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>> }
>>   }))
>> ---
>>
>> Regards,
>> Hironori Ogibayashi
>
>


Re: Frequent exceptions killing streaming job

2016-02-26 Thread Stephan Ewen
Was the contended lock part of Flink's runtime, or the application code? If
it was part of the Flink Runtime, can you share what you found?

On Thu, Feb 25, 2016 at 6:03 PM, Nick Dimiduk  wrote:

> For what it's worth, I dug into the TM logs and found that this exception
> was not the root cause, merely a symptom of other backpressure building in
> the flow (actually, lock contention in another part of the stack). While
> Flink was helpful in finding and bubbling up this stack to the UI, it was
> ultimately missleading, caused me to overlook proper evaluation of the
> failure.
>
> On Wed, Jan 20, 2016 at 2:59 AM, Robert Metzger 
> wrote:
>
>> Hey Nick,
>>
>> I had a discussion with Stephan Ewen on how we could resolve the issue.
>> I filed a JIRA with our suggested approach:
>> https://issues.apache.org/jira/browse/FLINK-3264
>>
>> By handling this directly in the KafkaConsumer, we would avoid fetching
>> data we can not handle anyways (discarding in the deserialization schema
>> would be more inefficient).
>>
>> Let us know what you think about our suggested approach.
>>
>> Sadly, it seems that the Kafka 0.9 consumer API does not yet support
>> requesting the latest offset of a TopicPartition. I'll ask about this on
>> their ML.
>>
>>
>>
>>
>> On Sun, Jan 17, 2016 at 8:28 PM, Nick Dimiduk  wrote:
>>
>>> On Sunday, January 17, 2016, Stephan Ewen  wrote:
>>>
 I agree, real time streams should never go down.

>>>
>>>  Glad to hear that :)
>>>
>>>
 [snip] Both should be supported.

>>>
>>> Agreed.
>>>
>>>
 Since we interpret streaming very broadly (also including analysis of
 historic streams or timely data), the "backpressure/catch-up" mode seemed
 natural as the first one to implement.

>>>
>>> Indeed, this is what my job is doing. I have set it to, lacking a valid
>>> offset, start from the beginning. I have to presume that in my case the
>>> stream data is expiring faster than my consumers can keep up. However I
>>> haven't investigated proper monitoring yet.
>>>
>>>
 The "load shedding" variant can probably even be realized in the Kafka
 consumer, without complex modifications to the core Flink runtime itself.

>>>
>>> I agree here as well. Indeed, this exception is being thrown from the
>>> consumer, not the runtime.
>>>
>>>
>>>
 On Sun, Jan 17, 2016 at 12:42 AM, Nick Dimiduk 
 wrote:

> This goes back to the idea that streaming applications should never go
> down. I'd much rather consume at max capacity and knowingly drop some
> portion of the incoming pipe than have the streaming job crash. Of course,
> once the job itself is robust, I still need the runtime to be robust --
> YARN vs (potential) Mesos vs standalone cluster will be my next
> consideration.
>
> I can share some details about my setup, but not at this time; in part
> because I don't have my metrics available at the moment and in part 
> because
> this is a public, archived list.
>
> On Sat, Jan 16, 2016 at 8:23 AM, Stephan Ewen 
> wrote:
>
>> @Robert: Is it possible to add a "fallback" strategy to the consumer?
>> Something like "if offsets cannot be found, use latest"?
>>
>> I would make this an optional feature to activate. I would think it
>> is quite surprising to users if records start being skipped in certain
>> situations. But I can see that this would be desirable sometimes.
>>
>> More control over skipping the records could be something to
>> implement in an extended version of the Kafka Consumer. A user could 
>> define
>> a policy that, in case consumer falls behind producer more than X
>> (offsets), it starts requesting the latest offsets (rather than the
>> following), thereby skipping a bunch of records.
>>
>>
>>
>> On Sat, Jan 16, 2016 at 3:14 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> I'm sorry you ran into the issue. Is it possible that Flink's Kafka
>>> consumer falls back in the topic so far that the offsets it's requesting
>>> are invalid?
>>>
>>> For that, the retention time of Kafka has to be pretty short.
>>>
>>> Skipping records under load is something currently not supported by
>>> Flink itself. The only idea I had for handling this would be to give the
>>> DeserializationSchema a call back to request the latest offset from 
>>> Kafka
>>> to determine the lag. With that, the schema could determine a "dropping
>>> rate" to catch up.
>>> What would you as an application developer expect to handle the
>>> situation?
>>>
>>>
>>> Just out of curiosity: What's the throughput you have on the Kafka
>>> topic?
>>>
>>>
>>> On Fri, Jan 15, 2016 at 10:13 PM, Nick Dimiduk 
>>> 

Re: flink-storm FlinkLocalCluster issue

2016-02-26 Thread #ZHANG SHUHAO#
Hi till,

Thanks for your reply.
But it appears that it only started with #slot of 1.
I have traced down to the source code of flink step by step, where I have 
confirmed it.

I'm using flink 0.10.2, source code downloaded from flink website. Nothing have 
been changed. I simply try to run the flink-Storm word count local example.

It just failed to work.


Sent from my iPhone

On 26 Feb 2016, at 6:16 PM, Till Rohrmann 
> wrote:


Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility 
layer and not Flink itself. When you run your job locally, the 
LocalFlinkMiniCluster should be started with as many slots as your maximum 
degree of parallelism is in your topology. You can check this in 
FlinkLocalCluster.java:96. When you submit your job to a remote cluster, then 
you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till

​

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# 
> wrote:
Hi everyone,

I’m a student researcher working on Flink recently.

I’m trying out the flink-storm example project, version 0.10.2, 
flink-storm-examples, word-count-local.

But, I got the following error:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not 
enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) 
- [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing 
group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, 
b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. 
Resources available to scheduler: Number of instances=1, total number of 
slots=1, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I notice that by default, task manager has only one slot, changing the setting 
in flink-conf does not help as I want to debug locally through 
FlinkLocalCluster (not to submit it locally).

I have try the following:


Import backtype.storm.Config;




Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);


But it’s not working.


Is there any way to work around?

Many thanks.

shuhao zhang (Tony).



Re: The way to itearte instances in AllWindowFunction in current Master branch

2016-02-26 Thread Aljoscha Krettek
Hi,
yes that seems to have been the issue. The Math.max() is used to ensure that 
the timestamp does never decrease, because this is not allowed for a watermark.

Cheers,
Aljoscha
> On 26 Feb 2016, at 11:11, HungChang  wrote:
> 
> Ah! My incorrect code segment made the Watermark not going forward and always
> stay at the same moment in the past. Is that true and the issue?
> 
> Cheers,
> 
> Hung
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5186.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Flink streaming throughput

2016-02-26 Thread Stephan Ewen
Hi!

I would try and dig bit by bit into what the bottleneck is:

 1) Disable the checkpointing, see what difference that makes
 2) Use a dummy sink (discarding) rather than elastic search, to see if
that is limiting
 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
easily dominate the entire pipeline.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり  wrote:

> Hello,
>
> I started evaluating Flink and tried simple performance test.
> The result was just about 4000 messages/sec with 300% CPU usage. I
> think this is quite low and wondering if it is a reasonable result.
> If someone could check it, it would be great.
>
> Here is the detail:
>
> [servers]
> - 3 Kafka broker with 3 partitions
> - 3 Flink TaskManager + 1 JobManager
> - 1 Elasticsearch
> All of them are separate VM with 8vCPU, 8GB memory
>
> [test case]
> The application counts access log by URI with in 1 minute window and
> send the result to Elasticsearch. The actual code is below.
> I used '-p 3' option to flink run command, so the task was distributed
> to 3 TaskManagers.
> In the test, I sent about 5000 logs/sec to Kafka.
>
> [result]
> - From Elasticsearch records, the total access count for all URI was
> about 260,000/min = 4300/sec. This is the entire throughput.
> - Kafka consumer lag was keep growing.
> - The CPU usage of each TaskManager machine was about 13-14%. From top
> command output, Flink java process was using 100%(1 CPU full)
>
> So I thought the bottleneck here was CPU used by Flink Tasks.
>
> Here is the application code.
> ---
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(1000)
> ...
> val stream = env
>   .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
> SimpleStringSchema(), properties))
>   .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
> AnyRef]] }
>   .map{ x => x.get("uri") match {
> case Some(y) => (y.asInstanceOf[String],1)
> case None => ("", 1)
>   }}
>   .keyBy(0)
>   .timeWindow(Time.of(1, TimeUnit.MINUTES))
>   .sum(1)
>   .map{ x => (System.currentTimeMillis(), x)}
>   .addSink(new ElasticsearchSink(config, transports, new
> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
> override def createIndexRequest(element: Tuple2[Long,
> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>   val json = new HashMap[String, AnyRef]
>   json.put("@timestamp", new Timestamp(element._1))
>   json.put("uri", element._2._1)
>   json.put("count", element._2._2: java.lang.Integer)
>   println("SENDING: " + element)
>
> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
> }
>   }))
> ---
>
> Regards,
> Hironori Ogibayashi
>


Flink streaming throughput

2016-02-26 Thread おぎばやしひろのり
Hello,

I started evaluating Flink and tried simple performance test.
The result was just about 4000 messages/sec with 300% CPU usage. I
think this is quite low and wondering if it is a reasonable result.
If someone could check it, it would be great.

Here is the detail:

[servers]
- 3 Kafka broker with 3 partitions
- 3 Flink TaskManager + 1 JobManager
- 1 Elasticsearch
All of them are separate VM with 8vCPU, 8GB memory

[test case]
The application counts access log by URI with in 1 minute window and
send the result to Elasticsearch. The actual code is below.
I used '-p 3' option to flink run command, so the task was distributed
to 3 TaskManagers.
In the test, I sent about 5000 logs/sec to Kafka.

[result]
- From Elasticsearch records, the total access count for all URI was
about 260,000/min = 4300/sec. This is the entire throughput.
- Kafka consumer lag was keep growing.
- The CPU usage of each TaskManager machine was about 13-14%. From top
command output, Flink java process was using 100%(1 CPU full)

So I thought the bottleneck here was CPU used by Flink Tasks.

Here is the application code.
---
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
...
val stream = env
  .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
SimpleStringSchema(), properties))
  .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
  .map{ x => x.get("uri") match {
case Some(y) => (y.asInstanceOf[String],1)
case None => ("", 1)
  }}
  .keyBy(0)
  .timeWindow(Time.of(1, TimeUnit.MINUTES))
  .sum(1)
  .map{ x => (System.currentTimeMillis(), x)}
  .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
override def createIndexRequest(element: Tuple2[Long,
Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
  val json = new HashMap[String, AnyRef]
  json.put("@timestamp", new Timestamp(element._1))
  json.put("uri", element._2._1)
  json.put("count", element._2._2: java.lang.Integer)
  println("SENDING: " + element)
  Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
}
  }))
---

Regards,
Hironori Ogibayashi


Re: Kafka issue

2016-02-26 Thread Gyula Fóra
Thanks Robert, so apparently the snapshot version was screwed up somehow
and included the 2.11 dependencies.

Now it works.

Cheers,
Gyula

Gyula Fóra  ezt írta (időpont: 2016. febr. 26., P,
11:09):

> That actually seemed to be the issue, not that I compiled my own version
> it doesnt have these wrond jars in the dependency tree...
>
> Gyula Fóra  ezt írta (időpont: 2016. febr. 26., P,
> 11:01):
>
>> I was using the snapshot repo in this case, let me try building my own
>> version...
>>
>> Maybe this is interesting:
>> mvn dependency:tree | grep 2.11
>> [INFO] |  \- org.apache.kafka:kafka_2.11:jar:0.8.2.2:compile
>> [INFO] | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.2:compile
>> [INFO] | +-
>> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.2:compile
>>
>>
>> Robert Metzger  ezt írta (időpont: 2016. febr. 26.,
>> P, 10:56):
>>
>>> Are you building 1.0-SNAPSHOT yourself or are you relying on the
>>> snapshot repository?
>>>
>>> We had issues in the past that jars in the snapshot repo were incorrect
>>>
>>> On Fri, Feb 26, 2016 at 10:45 AM, Gyula Fóra 
>>> wrote:
>>>
 I am not sure what is happening. I tried running against a Flink
 cluster that is definitely running the correct Scala version (2.10) and I
 still got the error. So it might be something with the pom.xml but we just
 don't see how it is different from the correct one.

 Gyula

 Till Rohrmann  ezt írta (időpont: 2016. febr.
 26., P, 10:42):

> Hi Gyula,
>
> could it be that you compiled against a different Scala version than
> the one you're using for running the job? This usually happens when you
> compile against 2.10 and let it run with version 2.11.
>
> Cheers,
> Till
>
> On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra 
> wrote:
>
>> Hey,
>>
>> For one of our jobs we ran into this issue. It's probably some
>> dependency issue but we cant figure it out as a very similar setup works
>> without issues for a different program.
>>
>> java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>> at
>> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
>> at
>> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
>> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>> at
>> kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>> at
>> com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
>> at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>
>> Any insights?
>>
>> Cheers,
>> Gyula
>>
>
>
>>>


Re: flink-storm FlinkLocalCluster issue

2016-02-26 Thread Till Rohrmann
Hi Shuhao,

the configuration you’re providing is only used for the storm compatibility
layer and not Flink itself. When you run your job locally, the
LocalFlinkMiniCluster should be started with as many slots as your maximum
degree of parallelism is in your topology. You can check this in
FlinkLocalCluster.java:96. When you submit your job to a remote cluster,
then you have to define the number of slots in the flink-conf.yaml file.

Cheers,
Till
​

On Fri, Feb 26, 2016 at 10:34 AM, #ZHANG SHUHAO# 
wrote:

> Hi everyone,
>
>
>
> I’m a student researcher working on Flink recently.
>
>
>
> I’m trying out the flink-storm example project, version 0.10.2,
> flink-storm-examples, word-count-local.
>
>
>
> But, I got the following error:
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @
> (unassigned) - [SCHEDULED] > with groupID <
> b86aa7eb76c417c63afb577ea46ddd72 > in sharing group < SlotSharingGroup
> [0ea70b6a3927c02f29baf9de8f59575b, b86aa7eb76c417c63afb577ea46ddd72,
> cbc10505364629b45b28e79599c2f6b8] >. Resources available to scheduler:
> Number of instances=1, total number of slots=1, available slots=0
>
> at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
>
> at
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
>
> at
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
>
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
>
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>
> at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
> I notice that by default, task manager has only one slot, changing the
> setting in flink-conf does not help as I want to debug locally through
> FlinkLocalCluster (not to submit it locally).
>
>
>
> I have try the following:
>
>
>
> Import backtype.storm.Config;
>
>
>
>
>
> *Config config *= new Config();
> *config*.put(ConfigConstants.*TASK_MANAGER_NUM_TASK_SLOTS*, 1024);
> cluster.submitTopology(*topologyId*, *config*, ft);
>
>
>
>
>
> But it’s not working.
>
>
>
>
>
> Is there any way to work around?
>
>
>
> Many thanks.
>
>
>
> shuhao zhang (Tony).
>


Re: Kafka issue

2016-02-26 Thread Gyula Fóra
That actually seemed to be the issue, not that I compiled my own version it
doesnt have these wrond jars in the dependency tree...

Gyula Fóra  ezt írta (időpont: 2016. febr. 26., P,
11:01):

> I was using the snapshot repo in this case, let me try building my own
> version...
>
> Maybe this is interesting:
> mvn dependency:tree | grep 2.11
> [INFO] |  \- org.apache.kafka:kafka_2.11:jar:0.8.2.2:compile
> [INFO] | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.2:compile
> [INFO] | +-
> org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.2:compile
>
>
> Robert Metzger  ezt írta (időpont: 2016. febr. 26.,
> P, 10:56):
>
>> Are you building 1.0-SNAPSHOT yourself or are you relying on the snapshot
>> repository?
>>
>> We had issues in the past that jars in the snapshot repo were incorrect
>>
>> On Fri, Feb 26, 2016 at 10:45 AM, Gyula Fóra 
>> wrote:
>>
>>> I am not sure what is happening. I tried running against a Flink cluster
>>> that is definitely running the correct Scala version (2.10) and I still got
>>> the error. So it might be something with the pom.xml but we just don't see
>>> how it is different from the correct one.
>>>
>>> Gyula
>>>
>>> Till Rohrmann  ezt írta (időpont: 2016. febr.
>>> 26., P, 10:42):
>>>
 Hi Gyula,

 could it be that you compiled against a different Scala version than
 the one you're using for running the job? This usually happens when you
 compile against 2.10 and let it run with version 2.11.

 Cheers,
 Till

 On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra 
 wrote:

> Hey,
>
> For one of our jobs we ran into this issue. It's probably some
> dependency issue but we cant figure it out as a very similar setup works
> without issues for a different program.
>
> java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at
> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
> at
> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> at
> kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
> at
> com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
> at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Any insights?
>
> Cheers,
> Gyula
>


>>


Re: Kafka issue

2016-02-26 Thread Gyula Fóra
I was using the snapshot repo in this case, let me try building my own
version...

Maybe this is interesting:
mvn dependency:tree | grep 2.11
[INFO] |  \- org.apache.kafka:kafka_2.11:jar:0.8.2.2:compile
[INFO] | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.2:compile
[INFO] | +-
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.2:compile


Robert Metzger  ezt írta (időpont: 2016. febr. 26., P,
10:56):

> Are you building 1.0-SNAPSHOT yourself or are you relying on the snapshot
> repository?
>
> We had issues in the past that jars in the snapshot repo were incorrect
>
> On Fri, Feb 26, 2016 at 10:45 AM, Gyula Fóra  wrote:
>
>> I am not sure what is happening. I tried running against a Flink cluster
>> that is definitely running the correct Scala version (2.10) and I still got
>> the error. So it might be something with the pom.xml but we just don't see
>> how it is different from the correct one.
>>
>> Gyula
>>
>> Till Rohrmann  ezt írta (időpont: 2016. febr. 26.,
>> P, 10:42):
>>
>>> Hi Gyula,
>>>
>>> could it be that you compiled against a different Scala version than the
>>> one you're using for running the job? This usually happens when you compile
>>> against 2.10 and let it run with version 2.11.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra 
>>> wrote:
>>>
 Hey,

 For one of our jobs we ran into this issue. It's probably some
 dependency issue but we cant figure it out as a very similar setup works
 without issues for a different program.

 java.lang.NoSuchMethodError:
 scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
 at
 kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
 at
 kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
 at
 kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at
 kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
 at
 kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
 at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
 at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
 at
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
 at com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
 at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
 at
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
 at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
 at
 org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
 at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
 at
 org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
 at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)

 Any insights?

 Cheers,
 Gyula

>>>
>>>
>


Re: Kafka issue

2016-02-26 Thread Robert Metzger
Are you building 1.0-SNAPSHOT yourself or are you relying on the snapshot
repository?

We had issues in the past that jars in the snapshot repo were incorrect

On Fri, Feb 26, 2016 at 10:45 AM, Gyula Fóra  wrote:

> I am not sure what is happening. I tried running against a Flink cluster
> that is definitely running the correct Scala version (2.10) and I still got
> the error. So it might be something with the pom.xml but we just don't see
> how it is different from the correct one.
>
> Gyula
>
> Till Rohrmann  ezt írta (időpont: 2016. febr. 26.,
> P, 10:42):
>
>> Hi Gyula,
>>
>> could it be that you compiled against a different Scala version than the
>> one you're using for running the job? This usually happens when you compile
>> against 2.10 and let it run with version 2.11.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra 
>> wrote:
>>
>>> Hey,
>>>
>>> For one of our jobs we ran into this issue. It's probably some
>>> dependency issue but we cant figure it out as a very similar setup works
>>> without issues for a different program.
>>>
>>> java.lang.NoSuchMethodError:
>>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>>> at
>>> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
>>> at
>>> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
>>> at
>>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>>> at
>>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>>> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
>>> at
>>> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
>>> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>>> at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>>> at com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
>>> at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>
>>> Any insights?
>>>
>>> Cheers,
>>> Gyula
>>>
>>
>>


Re: Kafka issue

2016-02-26 Thread Gyula Fóra
I am not sure what is happening. I tried running against a Flink cluster
that is definitely running the correct Scala version (2.10) and I still got
the error. So it might be something with the pom.xml but we just don't see
how it is different from the correct one.

Gyula

Till Rohrmann  ezt írta (időpont: 2016. febr. 26., P,
10:42):

> Hi Gyula,
>
> could it be that you compiled against a different Scala version than the
> one you're using for running the job? This usually happens when you compile
> against 2.10 and let it run with version 2.11.
>
> Cheers,
> Till
>
> On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra  wrote:
>
>> Hey,
>>
>> For one of our jobs we ran into this issue. It's probably some dependency
>> issue but we cant figure it out as a very similar setup works without
>> issues for a different program.
>>
>> java.lang.NoSuchMethodError:
>> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
>> at
>> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
>> at
>> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
>> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
>> at
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
>> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>> at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>> at com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
>> at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>
>> Any insights?
>>
>> Cheers,
>> Gyula
>>
>
>


Re: Kafka issue

2016-02-26 Thread Till Rohrmann
Hi Gyula,

could it be that you compiled against a different Scala version than the
one you're using for running the job? This usually happens when you compile
against 2.10 and let it run with version 2.11.

Cheers,
Till

On Fri, Feb 26, 2016 at 10:09 AM, Gyula Fóra  wrote:

> Hey,
>
> For one of our jobs we ran into this issue. It's probably some dependency
> issue but we cant figure it out as a very similar setup works without
> issues for a different program.
>
> java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at
> kafka.consumer.FetchRequestAndResponseMetrics.(FetchRequestAndResponseStats.scala:32)
> at
> kafka.consumer.FetchRequestAndResponseStats.(FetchRequestAndResponseStats.scala:46)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$$anonfun$2.apply(FetchRequestAndResponseStats.scala:59)
> at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
> at
> kafka.consumer.FetchRequestAndResponseStatsRegistry$.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:63)
> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> at kafka.javaapi.consumer.SimpleConsumer.(SimpleConsumer.scala:34)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:518)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:218)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
> at com.king.deduplo.source.EventSource.readRawInput(EventSource.java:46)
> at com.king.deduplo.DeduploProgram.main(DeduploProgram.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Any insights?
>
> Cheers,
> Gyula
>


Need some help to understand the cause of the error

2016-02-26 Thread Nirmalya Sengupta
Hello Aljoscha,

I have also tried by using the field's name in the sum("field3") function
(like you have suggested), but this time the exception is different:



Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IllegalArgumentException: Key expressions are only
supported on POJO types and Tuples. A type is considered a POJO if all its
fields are public, or have both getters and setters defined
at
org.apache.flink.streaming.util.FieldAccessor$PojoFieldAccessor.(FieldAccessor.java:175)
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:92)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:49)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:390)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala)
... 6 more

-

I have used both of the styles of calling sum() in earlier, but I never got
this exception. That's why I am a little confused. 'Reading' is a _case
class_ and hence, a POJO as I understand. Yet, the message seems to say
that it is not.

--  Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


flink-storm FlinkLocalCluster issue

2016-02-26 Thread #ZHANG SHUHAO#
Hi everyone,

I'm a student researcher working on Flink recently.

I'm trying out the flink-storm example project, version 0.10.2, 
flink-storm-examples, word-count-local.

But, I got the following error:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not 
enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #0 (tokenizer (2/4)) @ (unassigned) 
- [SCHEDULED] > with groupID < b86aa7eb76c417c63afb577ea46ddd72 > in sharing 
group < SlotSharingGroup [0ea70b6a3927c02f29baf9de8f59575b, 
b86aa7eb76c417c63afb577ea46ddd72, cbc10505364629b45b28e79599c2f6b8] >. 
Resources available to scheduler: Number of instances=1, total number of 
slots=1, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:255)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:992)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:972)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I notice that by default, task manager has only one slot, changing the setting 
in flink-conf does not help as I want to debug locally through 
FlinkLocalCluster (not to submit it locally).

I have try the following:


Import backtype.storm.Config;




Config config = new Config();
config.put(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1024);
cluster.submitTopology(topologyId, config, ft);


But it's not working.


Is there any way to work around?

Many thanks.

shuhao zhang (Tony).


Re: Need some help to understand the cause of the error

2016-02-26 Thread Aljoscha Krettek
Hi,
as far as I can see it the problem is in this line:
k.sum(3)

using field indices is only valid for Tuple Types. In your case you should be 
able to use this:
k.sum(“field3”)

because this is a field of your Reading type.

Cheers,
Aljoscha
> On 26 Feb 2016, at 02:44, Nirmalya Sengupta  
> wrote:
> 
> Hello Flinksters,
> 
> I am trying to use Flinkspector in a Scala code snippet of mine and Flink is 
> complaining. The code is here:
> 
> ---
> 
> case class Reading(field1:String,field2:String,field3:Int)
> 
> object MultiWindowing {
> 
>   def main(args: Array[String]) {}
> 
>   //  WindowFunction
> 
>   class WindowPrinter extends WindowFunction[Reading, String, String, 
> TimeWindow] {
> 
>   //  .
> }
>   }
> 
>   val env = DataStreamTestEnvironment.createTestEnvironment(1)
> 
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
>   val input: EventTimeInput[Reading]  =
> EventTimeInputBuilder
> .startWith(Reading("hans", "elephant", 15))
> .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
> .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))
> 
>   //acquire data source from input
>   val stream = env.fromInput(input)
> 
>   //apply transformation
>   val k = stream.keyBy(new KeySelector [Reading,String] {
> def getKey(r:Reading) =  r.field2
>   })
> .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
> 
> k.sum(3)
> .print()
> 
>   env.execute()
> 
> }
> 
> ---
> 
> And at runtime, I get this error:
> 
> 
> 
> Exception in thread "main" java.lang.ExceptionInInitializerError
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a 
> simple type (non-tuple, non-array).
>   at 
> org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
>   at 
> org.apache.flink.streaming.api.functions.aggregation.SumAggregator.(SumAggregator.java:37)
>   at 
> org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala:63)
>   at 
> org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.(MultiWindowing.scala)
>   ... 6 more
> 
> 
> ---
> 
> Can someone help me by pointing out the mistake I am making?
> 
> -- Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."



Re: suggestion for Quickstart

2016-02-26 Thread Stefano Baghino
Hi Tara,

thank you so much for reporting the issues you had, I'll open a ticket and
start working on it.

Best,
Stefano

On Fri, Feb 26, 2016 at 2:08 AM, Tara Athan  wrote:

> On
> https://ci.apache.org/projects/flink/flink-docs-master/quickstart/setup_quickstart.html
>
> some of the instructions have been updated to 1.0-SNAPSHOT, but not all.
>
> The download link goes to
> http://www.apache.org/dyn/closer.cgi/flink/flink-0.9.1/flink-0.9.1-bin-hadoop1.tgz
>
> and all the links there are broken.
>
> Also,
> $ bin/flink run ./examples/WordCount.jar file://`pwd`/hamlet.txt file://
> `pwd`/wordcount-result.txt
>
> should be
>
> $ bin/flink run ./examples/batch/WordCount.jar
>
> Or if you want to give the example of explicit arguments, then it needs to
> be
>
> $ bin/flink run ./examples/batch/WordCount.jar --input 
> file:///`pwd`/hamlet.txt
> --output file:///`pwd`/wordcount-result.txt
>
> Similarly, you could add
>
> $ bin/flink run ./examples/streaming/WordCount.jar --output file:///
> `pwd`/streaming-wordcount-result.txt
>
> Finally, to complete the demo please add the instruction
>
> $ bin/stop-local.sh
>
> Best, Tara
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit