[jira] [Commented] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-05 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833288#comment-16833288
 ] 

Pierre Zemb commented on FLINK-12400:
-

Nice catch [~klion26], thanks

> NullpointerException using SimpleStringSchema with Kafka
> 
>
> Key: FLINK-12400
> URL: https://issues.apache.org/jira/browse/FLINK-12400
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink 1.7.2 job on 1.8 cluster
> Kafka 0.10 with a topic in log-compaction
>Reporter: Pierre Zemb
>Assignee: Pierre Zemb
>Priority: Minor
>
> Hi!
> Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
> consuming a Kafka topic setup in 
> [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
> such, sending a message with a null payload acts like a tombstone.
> We are consuming Kafka like this:
> {code:java}
> new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
> this.kafkaProperties)
> {code}
> When we sent the message, job failed because of a NullPointerException 
> [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
>  `byte[] message` was null, causing the NPE. 
> We forked the class and added a basic nullable check, returning null if so. 
> It fixed our issue. 
> Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-04 Thread Pierre Zemb (JIRA)
Pierre Zemb created FLINK-12400:
---

 Summary: NullpointerException using SimpleStringSchema with Kafka
 Key: FLINK-12400
 URL: https://issues.apache.org/jira/browse/FLINK-12400
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Affects Versions: 1.8.0, 1.7.2
 Environment: Flink 1.7.2 job on 1.8 cluster
Kafka 0.10 with a topic in log-compaction
Reporter: Pierre Zemb
Assignee: Pierre Zemb


Hi!

Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
consuming a Kafka topic setup in 
[log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
such, sending a message with a null payload acts like a tombstone.

We are consuming Kafka like this:

{code:java}
new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
this.kafkaProperties)
{code}

When we sent the message, job failed because of a NullPointerException 
[here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
 `byte[] message` was null, causing the NPE. 

We forked the class and added a basic nullable check, returning null if so. It 
fixed our issue. 

Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12274) Fix documentation to enable Queryable State

2019-04-23 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823846#comment-16823846
 ] 

Pierre Zemb commented on FLINK-12274:
-

Will do [~yunta], thanks!

> Fix documentation to enable Queryable State
> ---
>
> Key: FLINK-12274
> URL: https://issues.apache.org/jira/browse/FLINK-12274
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.8.0
>Reporter: Pierre Zemb
>Priority: Minor
>
> Hi!
> We just upgraded our Flink cluster from 1.7.X to 1.8, and we saw that [this 
> commit|https://github.com/apache/flink/commit/e8182dcd0987a7326a8c73df902c156e44a7cebf#diff-685c2154c3e2b708b2d36214fe85dfb6]
>  had a flag to enable or disable queryable-state. As default is now false, we 
> need to fix the main documentation page located 
> [here|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html].
> I would love to make it my first contribution to Flink! I was thinking 
> rewriting a bit the "Activating Queryable State" part. What do you think 
> about it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12274) Fix documentation to enable Queryable State

2019-04-20 Thread Pierre Zemb (JIRA)
Pierre Zemb created FLINK-12274:
---

 Summary: Fix documentation to enable Queryable State
 Key: FLINK-12274
 URL: https://issues.apache.org/jira/browse/FLINK-12274
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.8.0
Reporter: Pierre Zemb


Hi!

We just upgraded our Flink cluster from 1.7.X to 1.8, and we saw that [this 
commit|https://github.com/apache/flink/commit/e8182dcd0987a7326a8c73df902c156e44a7cebf#diff-685c2154c3e2b708b2d36214fe85dfb6]
 had a flag to enable or disable queryable-state. As default is now false, we 
need to fix the main documentation page located 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html].

I would love to make it my first contribution to Flink! I was thinking 
rewriting a bit the "Activating Queryable State" part. What do you think about 
it?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager

2018-10-02 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635612#comment-16635612
 ] 

Pierre Zemb commented on FLINK-10225:
-

Cool, thanks [~till.rohrmann]! I would love to get involved into the design 
document ;)

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
> (TaskExecutor.java){color}
>  * {color:#33}registerQueryableState() in associateWithJobManager() 
> (TaskExecutor.java){color}
>  * {color:#33}associateWithJobManager in establishJobManagerConnection 
> (TaskExecutor.java){color}
>  * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
> (TaskExecutor.java){color}
>  * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
> (JobLeaderService.java){color}
> {color:#33}It seems that the KvStateLocationOracle map is updated only 
> when the task manager is part of the job. {color}
> {color:#33}For now, we are creating a List> and 
> getting the first CompletableFuture.succeeded future, but that is a 
> workaround.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10225) Cannot access state from a empty taskmanager

2018-09-25 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743
 ] 

Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM:
--

Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

 

cc [~till.rohrmann]


was (Author: pierrez):
Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in 

[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager

2018-09-19 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743
 ] 

Pierre Zemb commented on FLINK-10225:
-

Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
> (TaskExecutor.java){color}
>  * {color:#33}registerQueryableState() in associateWithJobManager() 
> (TaskExecutor.java){color}
>  * {color:#33}associateWithJobManager in establishJobManagerConnection 
> (TaskExecutor.java){color}
>  * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
> (TaskExecutor.java){color}
>  * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
> (JobLeaderService.java){color}
> {color:#33}It seems that the KvStateLocationOracle map is updated only 
> when the task manager is part of the job. {color}
> {color:#33}For now, 

[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-29 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596270#comment-16596270
 ] 

Pierre Zemb commented on FLINK-10225:
-

I still want to give it a try ;) Do you have any additional info beside the 
[contributing guide|https://flink.apache.org/how-to-contribute.html]?

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
> (TaskExecutor.java){color}
>  * {color:#33}registerQueryableState() in associateWithJobManager() 
> (TaskExecutor.java){color}
>  * {color:#33}associateWithJobManager in establishJobManagerConnection 
> (TaskExecutor.java){color}
>  * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
> (TaskExecutor.java){color}
>  * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
> (JobLeaderService.java){color}
> {color:#33}It seems that the KvStateLocationOracle map is updated only 
> when the task manager is part of the job. {color}
> {color:#33}For now, we are creating a List> and 
> getting the first CompletableFuture.succeeded future, but that is a 
> workaround.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-29 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596215#comment-16596215
 ] 

Pierre Zemb commented on FLINK-10225:
-

Thanks [~till.rohrmann] for the reply! I'm not a Flink contributor, do you 
think it is an easy first PR? Maybe I could give it a try.

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
> (TaskExecutor.java){color}
>  * {color:#33}registerQueryableState() in associateWithJobManager() 
> (TaskExecutor.java){color}
>  * {color:#33}associateWithJobManager in establishJobManagerConnection 
> (TaskExecutor.java){color}
>  * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
> (TaskExecutor.java){color}
>  * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
> (JobLeaderService.java){color}
> {color:#33}It seems that the KvStateLocationOracle map is updated only 
> when the task manager is part of the job. {color}
> {color:#33}For now, we are creating a List> and 
> getting the first CompletableFuture.succeeded future, but that is a 
> workaround.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-27 Thread Pierre Zemb (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Zemb updated FLINK-10225:

Description: 
Hi!

I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
and deployed a small job on it. Because of the current load, job is completely 
handled by a single tm. I've created a small proxy that is using 
[QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
 to access the current state. It is working nicely, except under certain 
circumstances. It seems to me that I can only access the state through a node 
that is holding a part of the job. Here's an example:
 * job on tm1. Pointing QueryableStateClient to tm1. State accessible

 * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
inaccessible

 * killing tm1, job is now on tm2. State accessible

 * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible

 * adding some parallelism to spread job on tm1 and tm2. Pointing 
QueryableStateClient to either tm1 and tm2 is working

 * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
inaccessible

When the state is inaccessible, I can see this (generated 
[here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
{code:java}
java.lang.RuntimeException: Failed request 0. Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. 
Potential reasons are: i) the state is not ready, or ii) the job does not 
exist. at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)
{code}
Went a bit through the (master branch) code. Class KvStateClientProxy is 
holding {color:#33}kvStateLocationOracle the key-value state location 
oracle for the given JobID. Here's the usage{color}{color:#33}:{color}

 
 * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
(TaskExecutor.java){color}
 * {color:#33}registerQueryableState() in associateWithJobManager() 
(TaskExecutor.java){color}
 * {color:#33}associateWithJobManager in establishJobManagerConnection 
(TaskExecutor.java){color}
 * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
(TaskExecutor.java){color}
 * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
(JobLeaderService.java){color}

{color:#33}It seems that the KvStateLocationOracle map is updated only when 
the task manager is part of the job. {color}

{color:#33}For now, we are creating a List> and 
getting the first CompletableFuture.succeeded future, but that is a 
workaround.{color}

  was:
Hi!

I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
and deployed a small job on it. Because of the current load, job is completely 
handled by a single tm. I've created a small proxy that is using 
[QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
 to access the current state. It is working nicely, except under certain 
circumstances. It seems to me that I can only access the state through a node 
that is holding a part of the job. Here's an example:
 * job on tm1. Pointing QueryableStateClient to tm1. State accessible

 * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
inaccessible

 * killing tm1, job is now on tm2. State accessible

 * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible

 * adding some parallelism to spread job on tm1 and tm2. Pointing 
QueryableStateClient to either 

[jira] [Updated] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-27 Thread Pierre Zemb (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Zemb updated FLINK-10225:

Description: 
Hi!

I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
and deployed a small job on it. Because of the current load, job is completely 
handled by a single tm. I've created a small proxy that is using 
[QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
 to access the current state. It is working nicely, except under certain 
circumstances. It seems to me that I can only access the state through a node 
that is holding a part of the job. Here's an example:
 * job on tm1. Pointing QueryableStateClient to tm1. State accessible

 * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
inaccessible

 * killing tm1, job is now on tm2. State accessible

 * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible

 * adding some parallelism to spread job on tm1 and tm2. Pointing 
QueryableStateClient to either tm1 and tm2 is working

 * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
inaccessible

When the state is inaccessible, I can see this (generated 
[here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
{code:java}
java.lang.RuntimeException: Failed request 0. Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. 
Potential reasons are: i) the state is not ready, or ii) the job does not 
exist. at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)
{code}
Went a bit through the (master branch) code. Class KvStateClientProxy is 
holding {color:#33}kvStateLocationOracle the key-value state location 
oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
 * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
(TaskExecutor.java){color}
 * {color:#33}registerQueryableState() in associateWithJobManager() 
(TaskExecutor.java){color}
 * {color:#33}associateWithJobManager in establishJobManagerConnection 
(TaskExecutor.java){color}
 * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
(TaskExecutor.java){color}
 * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
(JobLeaderService.java){color}

{color:#33}It seems that the KvStateLocationOracle map is updated only when 
the task manager is part of the job. {color}

{color:#33}For now, we are creating a List> and 
getting the first CompletableFuture.succeeded future, but that is a 
workaround.{color}

  was:
Hi!

I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
and deployed a small job on it. Because of the current load, job is completely 
handled by a single tm. I've created a small proxy that is using 
[QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
 to access the current state. It is working nicely, except under certain 
circumstances. It seems to me that I can only access the state through a node 
that is holding a part of the job. Here's an example:
 * job on tm1. Pointing QueryableStateClient to tm1. State accessible

 * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
inaccessible

 * killing tm1, job is now on tm2. State accessible

 * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible

 * adding some parallelism to spread job on tm1 and tm2. Pointing 
QueryableStateClient to either tm1 

[jira] [Created] (FLINK-10225) Cannot access state from a empty taskmanager

2018-08-27 Thread Pierre Zemb (JIRA)
Pierre Zemb created FLINK-10225:
---

 Summary: Cannot access state from a empty taskmanager
 Key: FLINK-10225
 URL: https://issues.apache.org/jira/browse/FLINK-10225
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Affects Versions: 1.6.0, 1.5.3
 Environment: 4tm and 1jm for now on 1.6.0
Reporter: Pierre Zemb


Hi!

I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
and deployed a small job on it. Because of the current load, job is completely 
handled by a single tm. I've created a small proxy that is using 
[QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
 to access the current state. It is working nicely, except under certain 
circumstances. It seems to me that I can only access the state through a node 
that is holding a part of the job. Here's an example:
 * job on tm1. Pointing QueryableStateClient to tm1. State accessible

 * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State 
inaccessible

 * killing tm1, job is now on tm2. State accessible

 * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible

 * adding some parallelism to spread job on tm1 and tm2. Pointing 
QueryableStateClient to either tm1 and tm2 is working

 * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
inaccessible

When the state is inaccessible, I can see this (generated 
[here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):

{{java.lang.RuntimeException: Failed request 0. Caused by: 
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not 
retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. 
Potential reasons are: i) the state is not ready, or ii) the job does not 
exist. at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)}}

 

Went a bit through the (master branch) code. Class KvStateClientProxy is 
holding {color:#33}kvStateLocationOracle the key-value state location 
oracle for the given JobID. Here's the usage{color}{color:#33}:{color}

 
 * {color:#33}updateKvStateLocationOracle() in registerQueryableState() 
(TaskExecutor.java){color}
 * {color:#33}registerQueryableState() in associateWithJobManager() 
(TaskExecutor.java){color}
 * {color:#33}associateWithJobManager in establishJobManagerConnection 
(TaskExecutor.java)
{color}
 * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership 
(TaskExecutor.java)
{color}
 * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess 
(JobLeaderService.java){color}

{color:#33}It seems that the KvStateLocationOracle map is updated only when 
the task manager is part of the job. {color}

{color:#33}For now, we are creating a List> and 
getting the first CompletableFuture.succeeded future, but that is a 
workaround.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)