Trying to comprehend rolling windows + event time

2016-02-19 Thread Nirmalya Sengupta
Hello lofifnc 

I am keen to hear more about this particular thread of discussion. However,
just a silly question: in the first case, why do you say that 'Each 5
times, as expected'! What causes them to appear 5 times? I don't see any
_repeat()_ or _repeatAll()_ in the gist you have shared.

What am I missing?

--- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Using numberOfTaskSlots to control parallelism

2016-02-19 Thread Zach Cox
What would the differences be between these scenarios?

1) one task manager with numberOfTaskSlots=1 and one job with parallelism=1

2) one task manager with numberOfTaskSlots=10 and one job with
parallelism=10

In both cases all of the job's tasks get executed within the one task
manager's jvm. Are there any downsides to doing #2 instead of #1?

I ask this question because of current issues related to watermarks with
Kafka sources [1] [2] and changing parallelism with savepoints [3]. I'm
writing a Flink job that processes events from Kafka topics that have 12
partitions. I'm wondering if I should just set the job parallelism=12 and
make numberOfTaskSlots sum to 12 across however many task managers I set
up. It seems like watermarks would work properly then, and I could
effectively change job parallelism using the number of task managers (e.g.
1 TM with slots=12, or 2 TMs with slots=6, or 12 TMs with slots=1, etc).

Am I missing any important details that would make this a bad idea? It
seems like a bit of abuse of numberOfTaskSlots, but also seems like a
fairly simple solution to a few current issues.

Thanks,
Zach

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tt4782.html
[2] https://issues.apache.org/jira/browse/FLINK-3375
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-parallelism-tt4967.html


Re: How to increase akka heartbeat?

2016-02-19 Thread Stephan Ewen
Hi Saiph!

What is the problem that is happening? The log actually looks like the Job
is successfully sent to the JobManager.

Stephan



On Fri, Feb 19, 2016 at 8:49 PM, Robert Metzger  wrote:

> Hi,
> can you maybe (if you want also private) send me the full logs of the
> jobmanager? The messages you've posted here are logged at DEBUG level. They
> don't indicate an erroneous behavior of the system.
>
> On Fri, Feb 19, 2016 at 8:44 PM, Saiph Kappa 
> wrote:
>
>> These were the parameters that I set btw:
>>
>> akka.watch.heartbeat.interval: 100
>> akka.transport.heartbeat.interval: 1000
>>
>> On Fri, Feb 19, 2016 at 7:43 PM, Saiph Kappa 
>> wrote:
>>
>>> I am not sure.
>>>
>>> For that particular machine I get messages like these:
>>> «
>>> myip:6123/user/jobmanager#291801197])) at akka://flink/user/$a from
>>> Actor[akka://flink/deadLetters].
>>> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Connected to new
>>> JobManager akka.tcp://flink@myip:6123/user/jobmanager.
>>>
>>> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Sending message to
>>> JobManager akka.tcp://flink@myip:6123/user/jobmanager to submit job
>>> JOB1 (5f9cef0c2e4b69530bf1e2485e94d326) and wait for progress
>>>
>>>
>>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
>>> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
>>> in 48 ms from Actor[akka://flink/deadLetters].
>>>
>>>
>>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
>>> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
>>> in 48 ms from Actor[akka://flink/deadLetters].
>>>
>>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Received message
>>> JobSubmitSuccess(2575d5ff5c10336beb7820a052a63623) at akka://flink/user/$a
>>> from Actor[akka.tcp://flink@myip:6123/user/jobmanager#1144818256].
>>> »
>>>
>>> I tried to set the heartbeat interval in the cluster but it didn't solve
>>> the problem, should I try to set it in the client (how can I do it)? I see
>>> no other errors or exceptions on the log files.
>>>
>>>
>>>
>>>
>>> On Fri, Feb 19, 2016 at 7:07 PM, Robert Metzger 
>>> wrote:
>>>
 Hi Saiph,

 are you sure that the jobs are cancelled because the client disconnects?

 For the different timeouts, check the configuration page:
 https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html
 and search for "heartbeat".

 On Fri, Feb 19, 2016 at 8:04 PM, Saiph Kappa 
 wrote:

> Hi,
>
> I have a Flink client application that launches jobs to remote
> clusters. However I'm getting my jobs cancelled:
> "18:25:29,650 WARN
> akka.remote.ReliableDeliverySupervisor- 
> Association
> with remote system [akka.tcp://flink@127.0.0.1:52929] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated]."
>
> How can I increase the akka heartbeat interval? Where should I set
> that configuration parameter, in the client or in the Flink clusters, and
> in which file.
>
> Thanks.
>
>

>>>
>>
>


Re: How to increase akka heartbeat?

2016-02-19 Thread Robert Metzger
Hi,
can you maybe (if you want also private) send me the full logs of the
jobmanager? The messages you've posted here are logged at DEBUG level. They
don't indicate an erroneous behavior of the system.

On Fri, Feb 19, 2016 at 8:44 PM, Saiph Kappa  wrote:

> These were the parameters that I set btw:
>
> akka.watch.heartbeat.interval: 100
> akka.transport.heartbeat.interval: 1000
>
> On Fri, Feb 19, 2016 at 7:43 PM, Saiph Kappa 
> wrote:
>
>> I am not sure.
>>
>> For that particular machine I get messages like these:
>> «
>> myip:6123/user/jobmanager#291801197])) at akka://flink/user/$a from
>> Actor[akka://flink/deadLetters].
>> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Connected to new
>> JobManager akka.tcp://flink@myip:6123/user/jobmanager.
>>
>> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Sending message to
>> JobManager akka.tcp://flink@myip:6123/user/jobmanager to submit job JOB1
>> (5f9cef0c2e4b69530bf1e2485e94d326) and wait for progress
>>
>>
>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
>> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
>> in 48 ms from Actor[akka://flink/deadLetters].
>>
>>
>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
>> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
>> in 48 ms from Actor[akka://flink/deadLetters].
>>
>> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Received message
>> JobSubmitSuccess(2575d5ff5c10336beb7820a052a63623) at akka://flink/user/$a
>> from Actor[akka.tcp://flink@myip:6123/user/jobmanager#1144818256].
>> »
>>
>> I tried to set the heartbeat interval in the cluster but it didn't solve
>> the problem, should I try to set it in the client (how can I do it)? I see
>> no other errors or exceptions on the log files.
>>
>>
>>
>>
>> On Fri, Feb 19, 2016 at 7:07 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Saiph,
>>>
>>> are you sure that the jobs are cancelled because the client disconnects?
>>>
>>> For the different timeouts, check the configuration page:
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html
>>> and search for "heartbeat".
>>>
>>> On Fri, Feb 19, 2016 at 8:04 PM, Saiph Kappa 
>>> wrote:
>>>
 Hi,

 I have a Flink client application that launches jobs to remote
 clusters. However I'm getting my jobs cancelled:
 "18:25:29,650 WARN
 akka.remote.ReliableDeliverySupervisor- Association
 with remote system [akka.tcp://flink@127.0.0.1:52929] has failed,
 address is now gated for [5000] ms. Reason is: [Disassociated]."

 How can I increase the akka heartbeat interval? Where should I set that
 configuration parameter, in the client or in the Flink clusters, and in
 which file.

 Thanks.


>>>
>>
>


Re: How to increase akka heartbeat?

2016-02-19 Thread Saiph Kappa
These were the parameters that I set btw:

akka.watch.heartbeat.interval: 100
akka.transport.heartbeat.interval: 1000

On Fri, Feb 19, 2016 at 7:43 PM, Saiph Kappa  wrote:

> I am not sure.
>
> For that particular machine I get messages like these:
> «
> myip:6123/user/jobmanager#291801197])) at akka://flink/user/$a from
> Actor[akka://flink/deadLetters].
> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Connected to new
> JobManager akka.tcp://flink@myip:6123/user/jobmanager.
>
> ^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Sending message to
> JobManager akka.tcp://flink@myip:6123/user/jobmanager to submit job JOB1
> (5f9cef0c2e4b69530bf1e2485e94d326) and wait for progress
>
>
> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
> in 48 ms from Actor[akka://flink/deadLetters].
>
>
> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
> LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
> in 48 ms from Actor[akka://flink/deadLetters].
>
> ^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Received message
> JobSubmitSuccess(2575d5ff5c10336beb7820a052a63623) at akka://flink/user/$a
> from Actor[akka.tcp://flink@myip:6123/user/jobmanager#1144818256].
> »
>
> I tried to set the heartbeat interval in the cluster but it didn't solve
> the problem, should I try to set it in the client (how can I do it)? I see
> no other errors or exceptions on the log files.
>
>
>
>
> On Fri, Feb 19, 2016 at 7:07 PM, Robert Metzger 
> wrote:
>
>> Hi Saiph,
>>
>> are you sure that the jobs are cancelled because the client disconnects?
>>
>> For the different timeouts, check the configuration page:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html
>> and search for "heartbeat".
>>
>> On Fri, Feb 19, 2016 at 8:04 PM, Saiph Kappa 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a Flink client application that launches jobs to remote clusters.
>>> However I'm getting my jobs cancelled:
>>> "18:25:29,650 WARN
>>> akka.remote.ReliableDeliverySupervisor- Association
>>> with remote system [akka.tcp://flink@127.0.0.1:52929] has failed,
>>> address is now gated for [5000] ms. Reason is: [Disassociated]."
>>>
>>> How can I increase the akka heartbeat interval? Where should I set that
>>> configuration parameter, in the client or in the Flink clusters, and in
>>> which file.
>>>
>>> Thanks.
>>>
>>>
>>
>


Re: How to increase akka heartbeat?

2016-02-19 Thread Saiph Kappa
I am not sure.

For that particular machine I get messages like these:
«
myip:6123/user/jobmanager#291801197])) at akka://flink/user/$a from
Actor[akka://flink/deadLetters].
^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Connected to new
JobManager akka.tcp://flink@myip:6123/user/jobmanager.

^[[34m[INFO]^[[0;39m o.a.f.r.c.JobClientActor- Sending message to
JobManager akka.tcp://flink@myip:6123/user/jobmanager to submit job JOB1
(5f9cef0c2e4b69530bf1e2485e94d326) and wait for progress


^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
in 48 ms from Actor[akka://flink/deadLetters].


^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Handled message
LeaderSessionMessage(null,JobManagerActorRef(Actor[akka.tcp://flink@myip:6123/user/jobmanager#291801197]))
in 48 ms from Actor[akka://flink/deadLetters].

^[[39m[DEBUG]^[[0;39m o.a.f.r.c.JobClientActor- Received message
JobSubmitSuccess(2575d5ff5c10336beb7820a052a63623) at akka://flink/user/$a
from Actor[akka.tcp://flink@myip:6123/user/jobmanager#1144818256].
»

I tried to set the heartbeat interval in the cluster but it didn't solve
the problem, should I try to set it in the client (how can I do it)? I see
no other errors or exceptions on the log files.




On Fri, Feb 19, 2016 at 7:07 PM, Robert Metzger  wrote:

> Hi Saiph,
>
> are you sure that the jobs are cancelled because the client disconnects?
>
> For the different timeouts, check the configuration page:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html
> and search for "heartbeat".
>
> On Fri, Feb 19, 2016 at 8:04 PM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> I have a Flink client application that launches jobs to remote clusters.
>> However I'm getting my jobs cancelled:
>> "18:25:29,650 WARN
>> akka.remote.ReliableDeliverySupervisor- Association
>> with remote system [akka.tcp://flink@127.0.0.1:52929] has failed,
>> address is now gated for [5000] ms. Reason is: [Disassociated]."
>>
>> How can I increase the akka heartbeat interval? Where should I set that
>> configuration parameter, in the client or in the Flink clusters, and in
>> which file.
>>
>> Thanks.
>>
>>
>


Re: How to increase akka heartbeat?

2016-02-19 Thread Robert Metzger
Hi Saiph,

are you sure that the jobs are cancelled because the client disconnects?

For the different timeouts, check the configuration page:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html
and search for "heartbeat".

On Fri, Feb 19, 2016 at 8:04 PM, Saiph Kappa  wrote:

> Hi,
>
> I have a Flink client application that launches jobs to remote clusters.
> However I'm getting my jobs cancelled:
> "18:25:29,650 WARN
> akka.remote.ReliableDeliverySupervisor- Association
> with remote system [akka.tcp://flink@127.0.0.1:52929] has failed, address
> is now gated for [5000] ms. Reason is: [Disassociated]."
>
> How can I increase the akka heartbeat interval? Where should I set that
> configuration parameter, in the client or in the Flink clusters, and in
> which file.
>
> Thanks.
>
>


How to increase akka heartbeat?

2016-02-19 Thread Saiph Kappa
Hi,

I have a Flink client application that launches jobs to remote clusters.
However I'm getting my jobs cancelled:
"18:25:29,650 WARN
akka.remote.ReliableDeliverySupervisor- Association
with remote system [akka.tcp://flink@127.0.0.1:52929] has failed, address
is now gated for [5000] ms. Reason is: [Disassociated]."

How can I increase the akka heartbeat interval? Where should I set that
configuration parameter, in the client or in the Flink clusters, and in
which file.

Thanks.


Re: Finding the average temperature

2016-02-19 Thread Nirmalya Sengupta
Hello  Aljoscha ,

My sincere apologies at the beginning, if I seem to repeat the same
question, almost interminably. If it is frustrating you, I seek your
patience but I really want to nail it down in mind. :-)

The point about parallelizing is well taken. I understand why the stream
should be broken into multiple partitions and how. The understanding that
is still evading me is how is the use-case of computing an (sliding)
average temperature achieved if the stream is scattered.

I want the *running* average temperature for every 3 readings, sliding by 1
reading. I am monitoring the average temperature; if it goes beyond a
certain threshold for 3 consecutive readings, I throw an alarm.

Let's take the following set of data (fields are: probeID, timestamp,
temperature ; 'timestamp' field is used for assignAscendingTimestamp()
function):

P1,T1,20
P1,T2,30
P2,T2,30
P1,T3,50
P2,T3,20
P3,T3,10

Assumption: T1 < T2 < T3

Now, if we partition on the probeID, we get three partitions, thus:

P1 -> (T1,20) | (T2,30) | (T3,50)
P2 -> (T2,30) | (T3,20)
P3 -> (T3,10)

Computing the average temperature will give me *three distinct averages*
here, one for each partition. I get average per probe, not per every 3
readings [assuming a slidingWindow(3,1)] irrespective of which probe gives.

Is it even correct to expect a running average if we partition the input
stream?

Hope I am making my understanding (or the lack of it), quite clear here! :-)

-- Nirmalya



-
To: user@flink.apache.org
Cc:
Date: Fri, 19 Feb 2016 10:41:52 +0100
Subject:
Hi,
as I understand it the “temp_reading_timestamp” field is not a key on which
you can partition your data. This is a field that would be used for
assigning the elements to timestamps. In you data you also have the
“probeID” field. This is a field that could be used to parallelize
computation, for example you could
do the following:

val inputStream = 

val result = inputStream
  .assignAscendingTimestamps { e => e.temp_reading_timestamp }
  .keyBy { e => e.probeID }
  .timeWindow(Time.minutes(10))
  .apply(new SumFunction(), new ComputeAverageFunction())

result.print()

(Where SumFunction() would sum up temperatures and keep a count and
ComputeAverageFunction() would divide the sum by the count.)

In this way, computation is parallelized because it can be spread across
several machines and partitioned by the key. Without such a key everything
has to be computed on one machine because a global view of the data is
required.

Cheers,






-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


state.backend.fs.checkpointdir setting

2016-02-19 Thread Andrew Ge Wu
Hi All,

I have been experiencing an error stopping my HA standalone setup.

The cluster startup just fine, but when i deploy an application to it, I got 
this exception:


java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Cannot 
create the file system state backend: The configuration does not specify the 
checkpoint directory 'state.backend.fs.checkpointdir'
at 
org.apache.flink.runtime.state.filesystem.FsStateBackendFactory.createFromConfig(FsStateBackendFactory.java:41)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createStateBackend(StreamTask.java:517)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:171)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

Here’s my configuration:

….
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
# recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.path.root: /flink
state.backend: filesystem

state.backend.fs.checkpointdir: file:///apps/flink/checkpoints/
recovery.zookeeper.storageDir: file:///apps/flink/recovery/



And here’s my flink folder.  

drwxr-xr-x 11 {user} {group}   4.0K Feb 19 17:31 .
drwxrwxr-x  6 {user} {group}   4.0K Feb 19 11:25 ..
-rw-r--r--  1 {user} {group}17K Nov 22 13:52 LICENSE
-rw-r--r--  1 {user} {group}779 Nov 22 13:52 NOTICE
-rw-r--r--  1 {user} {group}   1.3K Nov 22 13:52 README.txt
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 bin
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:31 checkpoints
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:37 conf
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 examples
drwxr-xr-x  2 {user} {group}   4.0K Nov 22 13:52 lib
drwxr-xr-x  2 {user} {group}   4.0K Feb 19 17:48 log
drwxr-xr-x  3 {user} {group}   4.0K Feb 19 17:48 recovery
drwxr-xr-x  3 {user} {group}   4.0K Nov 22 13:52 resources
drwxr-xr-x  5 {user} {group}   4.0K Nov 22 13:52 tools

The recovery folder works just fine, with blobs in it. 



Thanks!



Andrew
-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


RE:Flink HA

2016-02-19 Thread Thomas Lamirault
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of 
the null value and it works fine.

But, there is a way to cancel the old jobGraph who are starving in restarting 
status, and to keep the last one to restart ? Other than cancel JobId manually ?

Thanks

Thomas

De : Thomas Lamirault [thomas.lamira...@ericsson.com]
Envoyé : vendredi 19 février 2016 10:56
À : user@flink.apache.org
Objet : RE:Flink HA

After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and 
functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor 
for field
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
at 
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.(WindowOperator.java:446)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.(ObjectStreamField.java:122)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
De : Thomas Lamirault 
[thomas.lamira...@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : 
user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> 
state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I 
will modify  the recovery.zookeeper.path.root> This is only relevant if you are 
using YARN. From your completeYes, I omit to say we will use YARN.>Does this 
help?Yes, a lot :-)ThomasDe : Ufuk 
Celebi [u...@apache.org]Envoyé : jeudi 18 février 2016 19:19À : 
user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, 
Thomas Lamirault wrote:> We are trying flink in 
HA mode.Great to hear!> We set in the flink yaml :>> state.backend: 
filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:>> 
recovery.zookeeper.path.root: >> recovery.zookeeper.storageDir: 
>> recovery.backend.fs.checkpointdir: It should be 
state.backend.fs.checkpointdir.Just to check: Both 
state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to 
a file system path.> yarn.application-attempts: 100This is only relevant if you 
are using YARN. From your complete> We want in case of application crash, the 
pending window has to be restore> when the application restart.>> Pending data 
are store into the /blob directory ?>> Also, we try to write a 
script who restart the application after exceed the> max attempts, with the 
last pending window.>> How can I do that ? A simple restart of the application 
is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore 
happens automatically to the most recently checkpointed state.Everything under 
 contains the actual state (includingJARs and JobGraph). ZooKeeper 
contains pointers to this state.Therefore, you must not delete the ZooKeeper 
root path.For the automatic restart, I would recommend using YARN. If you 
wantto do it manually, you need to restart the JobManager/TaskManagerinstances. 
The application will be recovered automatically fromZooKeeper/state 
backend.Does this help?– Ufuk

Re: Flink Stream assign unique transaction ID

2016-02-19 Thread Aljoscha Krettek
Hi,
OperatorState was discarded in favor of ValueState, which you can retrieve 
using RuntimeContext.getState(…). This provides the same functionality as 
OperatorState but is the more future proof interface.

Cheers,
Aljoscha
> On 19 Feb 2016, at 15:38, Tanguy Racinet  wrote:
> 
> Hi,
> 
> I am currently working on Flink streaming API and I am trying to assign 
> unique IDs to transactions to track changes and mine Frequent Item Sets.
> 
> I identified several potential solutions such as:
> - Check pointing Local variables (snapshotState and restoreState for once 
> only passage)
> - Key Value State Interface (using operatorState and getRuntimeContext but is 
> deprecated)
> - broadcast function
> - Co-Map function 
> 
> After trying all of them, the only one I could actually make work for my 
> needs uses the deprecated operatorState.
> I tried using co-map function but couldn’t the number from one map and use it 
> in the other mapper to modify the Tid value.
> 
> Is there any solution I haven’t thought of in order to assign a unique Long 
> to every one of my transactions within the DataStream and to ensure they are 
> not only unique but also ordered and linearly distributed (transaction1 -> id 
> = 1; transaction2 -> id = 2; etc)
> Or maybe some working examples of the solutions I tried but couldn’t obtain 
> what I needed with it ?
> 
> Thank you in advance.
> Regards,
> Tanguy R.



Flink Stream assign unique transaction ID

2016-02-19 Thread Tanguy Racinet
Hi,

I am currently working on Flink streaming API and I am trying to assign unique 
IDs to transactions to track changes and mine Frequent Item Sets.

I identified several potential solutions such as:
 - Check pointing Local variables (snapshotState and restoreState for once only 
passage)
 - Key Value State Interface (using operatorState and getRuntimeContext but is 
deprecated)
 - broadcast function
 - Co-Map function 

After trying all of them, the only one I could actually make work for my needs 
uses the deprecated operatorState.
I tried using co-map function but couldn’t the number from one map and use it 
in the other mapper to modify the Tid value.

Is there any solution I haven’t thought of in order to assign a unique Long to 
every one of my transactions within the DataStream and to ensure they are not 
only unique but also ordered and linearly distributed (transaction1 -> id = 1; 
transaction2 -> id = 2; etc)
Or maybe some working examples of the solutions I tried but couldn’t obtain 
what I needed with it ?

Thank you in advance.
Regards,
Tanguy R.

Re: Trying to comprehend rolling windows + event time

2016-02-19 Thread Aljoscha Krettek
Hi,
in the second case: do you know what watermark is emitted after ("grace", 
"arctic", 25) is emitted? I imagine it is Long.MAX_VALUE, since otherwise all 
of the windows would not have been triggered. If there are no intermittent 
watermarks but we directly jump to the last (Long.MAX_VALUE) watermark then all 
in-flight windows will be emitted. The order in which they are emitted is 
arbitrary and I guess the lone (“grace”, “arctic”, 25) seen first in the output 
actually belongs to window (2 min to 7 min). 

Using a WindowFunction you could try and also print the window to which emitted 
elements belong.

Cheers,
Aljoscha
> On 19 Feb 2016, at 13:23, lofifnc  wrote:
> 
> I should add i'm using version 0.10.1
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034p5035.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: streaming hdfs sub folders

2016-02-19 Thread Robert Metzger
Hi Martin,

where is the null pointer exception thrown?
I think you didn't call the open() method of the AvroInputFormat. Maybe
that's the issue.

On Thu, Feb 18, 2016 at 5:01 PM, Martin Neumann  wrote:

> I tried to implement your idea but I'm getting NullPointer exceptions from
> the AvroInputFormat any Idea what I'm doing wrong?
> See the code below:
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> env.fromElements("00", "01", "02","03","22","23")
> .flatMap(new FileExtractor())
> .filter(new LocationFiter())
> .flatMap(new PreProcessEndSongClean())
> .writeAsCsv(outPath);
>
>
> env.execute("something");
> }
>
> private static class FileExtractor implements 
> FlatMapFunction{
>
> @Override
> public void flatMap(String s, Collector collector) 
> throws Exception {
> AvroInputFormat avroInputFormat = new 
> AvroInputFormat(new 
> Path("hdfs:///anonym/cleaned/endsong/2016-01-01/"+s), EndSongCleanedPq.class);
> avroInputFormat.setReuseAvroValue(false);
> while (! avroInputFormat.reachedEnd()){
> EndSongCleanedPq res = avroInputFormat.nextRecord(new 
> EndSongCleanedPq());
> if (res != null) collector.collect(res);
> }
> }
> }
>
>
> On Thu, Feb 18, 2016 at 4:06 PM, Martin Neumann  wrote:
>
>> I guess I need to set the parallelism for the FlatMap to 1 to make sure I
>> read one file at a time. The downside I see with this is that I will be not
>> able to read in parallel from HDFS (and the files are Huge).
>>
>> I give it a try and see how much performance I loose.
>>
>> cheers Martin
>>
>> On Thu, Feb 18, 2016 at 2:32 PM, Stephan Ewen  wrote:
>>
>>> Martin,
>>>
>>> I think you can approximate this in an easy way like this:
>>>
>>>   - On the client, you traverse your directories to collect all files
>>> that you need, collect all file paths in a list.
>>>   - Then you have a source "env.fromElements(paths)".
>>>   - Then you flatMap and in the FlatMap, run the Avro input format (open
>>> it per path, then call it to get all elements)
>>>
>>> That gives you pretty much full control about in which order the files
>>> should be processed.
>>>
>>> What do you think?
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 17, 2016 at 9:42 PM, Martin Neumann 
>>> wrote:
>>>
 I forgot to mention I'm using an AvroInputFormat to read the file (that
 might be relevant how the flag needs to be applied)
 See the code Snipped below:

 DataStream inStream =
 env.readFile(new AvroInputFormat(new 
 Path(filePath), EndSongCleanedPq.class), filePath);


 On Wed, Feb 17, 2016 at 7:33 PM, Martin Neumann 
 wrote:

> The program is a DataStream program, it usually it gets the data from
> kafka. It's an anomaly detection program that learns from the stream
> itself. The reason I want to read from files is to test different settings
> of the algorithm and compare them.
>
> I think I don't need to reply things in the exact order (wich is not
> possible with parallel reads anyway) and I have written the program so it
> can deal with out of order events.
> I only need the subfolders to be processed roughly in order. Its fine
> to process some stuff from 01 before everything from 00 is finished, if I
> get records from all 24 subfolders at the same time things will break
> though. If I set the flag will it try to get data from all sub dir's in
> parallel or will it go sub dir by sub dir?
>
> Also can you point me to some documentation or something where I can
> see how to set the Flag?
>
> cheers Martin
>
>
>
>
> On Wed, Feb 17, 2016 at 11:49 AM, Stephan Ewen 
> wrote:
>
>> Hi!
>>
>> Going through nested folders is pretty simple, there is a flag on the
>> FileInputFormat that makes sure those are read.
>>
>> Tricky is the part that all "00" files should be read before the "01"
>> files. If you still want parallel reads, that means you need to sync at
>> some point, wait for all parallel parts to finish with the "00" work 
>> before
>> anyone may start with the "01" work.
>>
>> Is your training program a DataStream or a DataSet program?`
>>
>> Stephan
>>
>> On Wed, Feb 17, 2016 at 1:16 AM, Martin Neumann 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a streaming machine learning job that usually runs with input
>>> from kafka. To tweak the models I need to run on some old data from 
>>> HDFS.
>>>
>>> Unfortunately the data on HDFS is spread out over several
>>> subfolders. Basically I have a datum with one subfolder for each hour
>>> within those are the actual input files I'm interes

Re: Trying to comprehend rolling windows + event time

2016-02-19 Thread lofifnc
I should add i'm using version 0.10.1



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034p5035.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Trying to comprehend rolling windows + event time

2016-02-19 Thread lofifnc
Hi,

I have a setup where I'm feeding a rolling window with event time:
https://gist.github.com/lofifnc/dd946fef6f4b3eb25ef1 (Obviously i'm using
Flinkspector)

The first case behaves as expected I'm emitting three records which are all
in the time frame of the first window triggering:
("hans", "elephant", 15) at second 0 with watermark 0 
("susi", "arctic", 20) at second 30 with watermark 30
("pete", "elephant", 40) at second 50 with watermark 50 
(You can see  this as well in the console output). 
The result is I'm getting:
(susi,arctic,20)
(hans,elephant,55)
Each 5 times as expected.

In the second case it gets interesting as i'm emitting a 4th record which
does not get evaluated within the first triggering of the window:
("hans", "elephant", 15) at second 0 with watermark 0 
("susi", "arctic", 20) at second 30 with watermark 30
("pete", "elephant", 40) at second 50 with watermark 50 
("grace", "arctic", 25) at second 90 with watermark 90
Now the second time the of the window is evaluated looks like this:
(grace,arctic,25)
(susi,arctic,45)
(hans,elephant,55)

The 4th record is emitted in a non aggregated form, but the grouping for the
"arctic" key has been updated.

In the third case it gets really interesting... I'm emitting a 5th record
which will fall into the third evaluation of the window.
This time the record is shown twice in the output:
(sven,elephant,5)
(grace,arctic,25)
(sven,elephant,5)
(hans,elephant,60)

I played around with this a little bit and if you insert a record into the
4th evaluation it is shown 3 times, in the 5th it will be in the output 4
times and so on. The test itself is deterministic between test runs.  

Lastly i've been able to reproduce the behavior using only vanilla flink so
I'm fairly certain this is not a side effect of flinkspector. 

I'm slowly getting a headache trying to wrap my head around why I'm seeing
this behavior, but I can't find a satisfying explanation. 

Best Alex!









--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Trying-to-comprehend-rolling-windows-event-time-tp5034.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kyro Intermittent Exception for Large Data

2016-02-19 Thread Till Rohrmann
Thanks for the pointer Ken. As far as I know, we’re using the
StdInstantiatorStrategy as the fallback instantiator strategy for our Kryo
instances.

Cheers,
Till
​

On Fri, Feb 19, 2016 at 12:39 AM, Ken Krugler 
wrote:

> I've seen this type of error when using Kryo with a Cascading scheme
> 
>  I'd
> created.
>
> In my case it happened when serializing a large object graph, where some
> of the classes didn't have no-arg constructors.
>
> The general fix was to set an instantiator strategy for Kryo - see:
>
>
> https://github.com/ScaleUnlimited/cascading.utils/blob/master/src/main/java/com/scaleunlimited/cascading/local/KryoContext.java#L81
>
> Don't know if Flink always sets up to use this (explicitly, or fallback)
> everywhere.
>
> -- Ken
>
> --
>
> *From:* Welly Tambunan
>
> *Sent:* February 17, 2016 6:10:03pm PST
>
> *To:* user@flink.apache.org
>
> *Subject:* Kyro Intermittent Exception for Large Data
>
> Hi All,
>
>
> We create a downsampling job that will respond to a request from RabbitMQ.
> However we face different kind of exception intermittently when
> transferring large data. Is there any setting that we can tune to make Kyro
> able to serialize and deserialize the large data ?
>
> I can't find any reference in the documentation about Kyro or Default Java
> serialization in the latest documentation. Can someone show me the right
> place to look at ? As i think before fink use Java serialization ?
>
> Thanks a lot.
>
> *ERROR 1*
>
> 
>
> *ERROR 2*
>
> 
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>
>
>
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
>


Re: Problem with Kafka 0.9 Client

2016-02-19 Thread Lopez, Javier
Hi, these are the properties:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
properties.setProperty("zookeeper.connect", ".37:2181");
properties.setProperty("group.id", "test");
properties.setProperty("client.id", "flink_test");
properties.setProperty("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "3");

We have tested with these as well:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
properties.setProperty("zookeeper.connect", ".37:2181");
properties.setProperty("group.id", "test");
properties.setProperty("client.id", "flink_test");
properties.setProperty("auto.offset.reset", "earliest");


and these:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
properties.setProperty("zookeeper.connect", ".37:2181");
properties.setProperty("group.id", "test");
properties.setProperty("client.id", "flink_test");
properties.setProperty("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "3");
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

With all three different configurations we get the same result.

On 19 February 2016 at 11:55, Robert Metzger  wrote:

> Thank you. Can you send me also the list of properties you are passing to
> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>
> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier 
> wrote:
>
>> Hi Robert,
>>
>> Please find attached the full logs of one of our latest executions. We
>> are basically trying to read from our kafka cluster and then writing the
>> data to elasticsearch.
>>
>> Thanks for your help!
>>
>> On 18 February 2016 at 11:19, Robert Metzger  wrote:
>>
>>> Hi Javier,
>>>
>>> sorry for the late response. In the Error Mapping of Kafka, it says that
>>> code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>
>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>
>>> How many brokers did you put into the list of bootstrap servers?
>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>> reading from Kafka?
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier >> > wrote:
>>>
 Hi guys,

 We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
 been able to retrieve data from our Kafka Cluster. The DEBUG data reports
 the following:

 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
  - Sending metadata request ClientRequest(expectResponse=true,
 callback=null,
 request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
 body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
 createdTimeMs=1455702804364, sendTimeMs=0) to node 35
 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
   - Updated cluster metadata version 838 to Cluster(nodes =
 [Node(41, ip-.eu-west-1.compute.internal, 9092), Node(35,
 ip-.eu-west-1.compute.internal, 9092), Node(87,
 ip-.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
 stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
 [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
 replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
 partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
 Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
 [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
 = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
 10:53:24,398 DEBUG
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
 group metadata request to broker 35
 10:53:24,432 DEBUG
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
 metadata response ClientResponse(receivedTimeMs=1455702804432,
 disconnected=false, request=ClientRequest(expectResponse=true,
 callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
 request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
 body={group_id=test}), createdTimeMs=14

Re: Problem with Kafka 0.9 Client

2016-02-19 Thread Robert Metzger
Thank you. Can you send me also the list of properties you are passing to
the kafka consumer? Are you only setting the "bootstrap.servers" or more?

On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier 
wrote:

> Hi Robert,
>
> Please find attached the full logs of one of our latest executions. We are
> basically trying to read from our kafka cluster and then writing the data
> to elasticsearch.
>
> Thanks for your help!
>
> On 18 February 2016 at 11:19, Robert Metzger  wrote:
>
>> Hi Javier,
>>
>> sorry for the late response. In the Error Mapping of Kafka, it says that
>> code 15 means: ConsumerCoordinatorNotAvailableCode.
>>
>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>
>> How many brokers did you put into the list of bootstrap servers?
>> Can you maybe send me the full log of one of the Flink TaskManagers
>> reading from Kafka?
>>
>>
>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier 
>> wrote:
>>
>>> Hi guys,
>>>
>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
>>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
>>> the following:
>>>
>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>  - Sending metadata request ClientRequest(expectResponse=true,
>>> callback=null,
>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>> - Updated cluster metadata version 838 to Cluster(nodes = [Node(41,
>>> ip-.eu-west-1.compute.internal, 9092), Node(35,
>>> ip-.eu-west-1.compute.internal, 9092), Node(87,
>>> ip-.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>> 10:53:24,398 DEBUG
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>> group metadata request to broker 35
>>> 10:53:24,432 DEBUG
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>> body={group_id=test}), createdTimeMs=1455702804398,
>>> sendTimeMs=1455702804398),
>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>
>>>
>>> We receive this message all the time. What we don't know understand is
>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>> works and we have some clients extracting data from it, so we don't know if
>>> this could be a Kafka issue or a Flink issue.
>>>
>>> Does anyone know, or understand, this response we are getting from Kafka?
>>>
>>> Thanks.
>>>
>>
>>
>


Re: Problem with Kafka 0.9 Client

2016-02-19 Thread HungChang
Had the same problem as Javier's. 

3450 [Thread-10] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
metadata response ClientResponse(receivedTimeMs=1455811593680,
disconnected=false, request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@278904b4,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=10,client_id=consumer-4},
body={group_id=test_group}), createdTimeMs=1455811593645,
sendTimeMs=1455811593645),
responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})

When Flink consumes Kafka-0.9 locally and the other consumer reads Kafka
cluster both can work. 

After reading this -
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design#KafkaDetailedConsumerCoordinatorDesign-Consumer

Found that this process is the issue that the consumer co-ordinator cannot
function -
The consumer sends a RegisterConsumer request to it's co-ordinator broker.
In the RegisterConsumerResponse, it receives the list of topic partitions
that it should own.

However we don't know the solution for this yet.

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-tp4975p5029.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE:Flink HA

2016-02-19 Thread Thomas Lamirault
After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and 
functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor 
for field 
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
at 
java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.(WindowOperator.java:446)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
at java.io.ObjectStreamField.(ObjectStreamField.java:122)
at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
De : Thomas Lamirault 
[thomas.lamira...@ericsson.com]Envoyé : vendredi 19 février 2016 09:39À : 
user@flink.apache.orgObjet : RE:Flink HAThanks for the quick reply !> 
state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I 
will modify  the recovery.zookeeper.path.root> This is only relevant if you are 
using YARN. From your completeYes, I omit to say we will use YARN.>Does this 
help?Yes, a lot :-)ThomasDe : Ufuk 
Celebi [u...@apache.org]Envoyé : jeudi 18 février 2016 19:19À : 
user@flink.apache.orgObjet : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, 
Thomas Lamirault wrote:> We are trying flink in 
HA mode.Great to hear!> We set in the flink yaml :>> state.backend: 
filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:>> 
recovery.zookeeper.path.root: >> recovery.zookeeper.storageDir: 
>> recovery.backend.fs.checkpointdir: It should be 
state.backend.fs.checkpointdir.Just to check: Both 
state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to 
a file system path.> yarn.application-attempts: 100This is only relevant if you 
are using YARN. From your complete> We want in case of application crash, the 
pending window has to be restore> when the application restart.>> Pending data 
are store into the /blob directory ?>> Also, we try to write a 
script who restart the application after exceed the> max attempts, with the 
last pending window.>> How can I do that ? A simple restart of the application 
is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore 
happens automatically to the most recently checkpointed state.Everything under 
 contains the actual state (includingJARs and JobGraph). ZooKeeper 
contains pointers to this state.Therefore, you must not delete the ZooKeeper 
root path.For the automatic restart, I would recommend using YARN. If you 
wantto do it manually, you need to restart the JobManager/TaskManagerinstances. 
The application will be recovered automatically fromZooKeeper/state 
backend.Does this help?– Ufuk

Re: events eviction

2016-02-19 Thread Aljoscha Krettek
Hi,
yes, in some cases it could be necessary. Could you maybe give some example of 
what kind of window computation you want to achieve? Then we can see if it 
would be possible without GlobalWindows and evictor.

Cheers,
Aljoscha
> On 15 Feb 2016, at 18:07, Radu Tudoran  wrote:
> 
> Hi,
> 
> Thanks Aljoscha for the details!
> 
> The warning about performance and evictors is useful, but I am not sure how 
> it can be put in practice always. Take for example a GlobalWindow that you 
> would use to aggregate data from multiple partitions. A GlobalWindow does not 
> come with a trigger - would it have than a default evictor? Even if it has 
> some, you still need to control the eviction of the events. Secondly, 
> assuming that you would need to aggregate the data from 2 partitions and 
> evict something only when you have one item from each partition. You would 
> need a sort of state for this. And then to ensure resiliency, the state 
> should be recoverable if a crash happens. Could you approach this without an 
> evictor state?
> 
> 
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
> 
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
> 
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, 
> which is intended only for the person or entity whose address is listed 
> above. Any use of the information contained herein in any way (including, but 
> not limited to, total or partial disclosure, reproduction, or dissemination) 
> by persons other than the intended recipient(s) is prohibited. If you receive 
> this e-mail in error, please notify the sender by phone or email immediately 
> and delete it!
> 
> 
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org] 
> Sent: Monday, February 15, 2016 11:58 AM
> To: user@flink.apache.org
> Subject: Re: events eviction
> 
> Hi,
> you are right, the logic is in EvictingNonKeyedWindowOperator.emitWindow() 
> for non-parallel (non-keyed) windows and in 
> EvictingWindow.processTriggerResult() in the case of keyed windows.
> 
> You are also right about the contract of the Evictor, it returns the number 
> of elements to be evicted from the beginning. This also means that eviction 
> does not consider any timestamps in the elements and is therefore quite 
> arbitrary. The places in the code I mentioned above simply get the value from 
> the Evictor and evict that many elements from the internal buffer/state.
> 
> Right now it is not possible to replace the window operator that is used by 
> flink. What you can do is copy the window operator code and use it manually 
> using DataStream.transform().
> 
> About the evictor state. I’m afraid this is not possible right now. It was a 
> conscious decision to make the Evictor stateless to make it easier for the 
> system to handle. I would also strongly advise against using Evictors if at 
> all possible. They make it impossible to incrementally aggregate window 
> results (for example with a reduce function). This can have a huge 
> performance/memory footprint impact. In your case, what are you using them 
> for?
> 
> I hope this helps somehow, but let us know if you need further explanations.
> 
> Cheers,
> Aljoscha
> 
>> On 15 Feb 2016, at 11:09, Radu Tudoran  wrote:
>> 
>> Hello,
>> 
>> I am looking over the mechanisms of evicting events in Flink. I saw that 
>> either using a default evictor or building a custom one the logic is that 
>> the evictor will provide the number of events to be discarded.
>> Could you please provide me with some additional pointers regarding the 
>> mechanism in Flink where this actually happens:
>> -  The class that implements this functionality of discarding the 
>> events? (my initial expectations that this happens in the window class turn 
>> out to be wrong). I checked and found the “EvictingNonKeyedWindowOperator” – 
>> is this the right place to look indeed?
>> -  If yes, would it be possible to create a customizable class like 
>> this one and somehow pass it to the framework? I would be curious if there 
>> is an option  other than modifying the core classes and recompiling the 
>> framework?
>> 
>> On a slightly parallel topic - is there some way of creating a state in the 
>> evictor that will be check pointed and restore in case of failure.  I would 
>> be interested if something like an operator state is possible in the evictor.
>> 
>> Regards,
>> 
>> Dr. Radu Tudoran
>> Research Engineer - Big Data Ex

Re: Finding the average temperature

2016-02-19 Thread Aljoscha Krettek
Hi,
as I understand it the “temp_reading_timestamp” field is not a key on which you 
can partition your data. This is a field that would be used for assigning the 
elements to timestamps. In you data you also have the “probeID” field. This is 
a field that could be used to parallelize computation, for example you could
do the following:

val inputStream = 

val result = inputStream
  .assignAscendingTimestamps { e => e.temp_reading_timestamp }
  .keyBy { e => e.probeID }
  .timeWindow(Time.minutes(10))
  .apply(new SumFunction(), new ComputeAverageFunction())

result.print()

(Where SumFunction() would sum up temperatures and keep a count and 
ComputeAverageFunction() would divide the sum by the count.)

In this way, computation is parallelized because it can be spread across 
several machines and partitioned by the key. Without such a key everything has 
to be computed on one machine because a global view of the data is required.

Cheers,
Aljoscha
> On 18 Feb 2016, at 17:54, Nirmalya Sengupta  
> wrote:
> 
> Hello Aljoscha ,
> 
> You mentioned: '.. Yes, this is right if you temperatures don’t have any 
> other field on which you could partition them. '.
> 
> What I am failing to understand is that if temperatures are partitioned on 
> some other field (in my use-case, I have one such: the 
> temp_reading_timestamp), they will be pushed to different nodes (different 
> threads in local run) based on that field. Because they will be computed 
> (scattered) and later collected (gathered), how could I arrive at the 
> _running_ average temperature? The client application needs to know *how the 
> average temperature is changing over time'. 
> 
> Could you please fill in the gap in my understanding?
> 
> -- Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is 
> where they should be.
> Now put the foundation under them."



RE:Flink HA

2016-02-19 Thread Thomas Lamirault
Thanks for the quick reply !

> state.backend.fs.checkpointdir
Is actually pointing to a hdfs directory but I will modify  the 
recovery.zookeeper.path.root

> This is only relevant if you are using YARN. From your complete
Yes, I omit to say we will use YARN.

>Does this help?
Yes, a lot :-)

Thomas


De : Ufuk Celebi [u...@apache.org]
Envoyé : jeudi 18 février 2016 19:19
À : user@flink.apache.org
Objet : Re: Flink HA

On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
 wrote:
> We are trying flink in HA mode.

Great to hear!

> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:
>
> recovery.zookeeper.path.root: 
>
> recovery.zookeeper.storageDir: 
>
> recovery.backend.fs.checkpointdir: 

It should be state.backend.fs.checkpointdir.

Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.

> yarn.application-attempts: 100

This is only relevant if you are using YARN. From your complete


> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the /blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?

Restore happens automatically to the most recently checkpointed state.

Everything under  contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.

For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.


Does this help?

– Ufuk

Optimal Configuration for Cluster

2016-02-19 Thread Welly Tambunan
Hi All,

We are trying to running our job in cluster that has this information

1. # of machine: 16
2. memory : 128 gb
3. # of core : 48

However when we try to run we have an exception.

"insufficient number of network buffers. 48 required but only 10 available.
the total number of network buffers is currently set to 2048"

After looking at the documentation we set configuration based on docs

taskmanager.network.numberOfBuffers: # core ^ 2 * # machine * 4

However we face another error from JVM

java.io.IOException: Cannot allocate network buffer pool: Could not
allocate enough memory segments for NetworkBufferPool (required (Mb): 2304,
allocated (Mb): 698, missing (Mb): 1606). Cause: Java heap space

We fiddle the taskmanager.heap.mb: 4096

Finally the cluster is running.

However i'm still not sure about the configuration and fiddling in task
manager heap really fine tune. So my question is


   1. Am i doing it right for numberOfBuffers ?
   2. How much should we allocate on taskmanager.heap.mb given the
   information
   3. Any suggestion which configuration we need to set to make it optimal
   for the cluster ?
   4. Is there any chance that this will get automatically resolve by
   memory/network buffer manager ?

Thanks a lot for the help

Cheers

-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com