[jira] [Commented] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
[ https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833288#comment-16833288 ] Pierre Zemb commented on FLINK-12400: - Nice catch [~klion26], thanks > NullpointerException using SimpleStringSchema with Kafka > > > Key: FLINK-12400 > URL: https://issues.apache.org/jira/browse/FLINK-12400 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink 1.7.2 job on 1.8 cluster > Kafka 0.10 with a topic in log-compaction >Reporter: Pierre Zemb >Assignee: Pierre Zemb >Priority: Minor > > Hi! > Yesterday, we saw a strange behavior with our Flink job and Kafka. We are > consuming a Kafka topic setup in > [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As > such, sending a message with a null payload acts like a tombstone. > We are consuming Kafka like this: > {code:java} > new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), > this.kafkaProperties) > {code} > When we sent the message, job failed because of a NullPointerException > [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. > `byte[] message` was null, causing the NPE. > We forked the class and added a basic nullable check, returning null if so. > It fixed our issue. > Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
Pierre Zemb created FLINK-12400: --- Summary: NullpointerException using SimpleStringSchema with Kafka Key: FLINK-12400 URL: https://issues.apache.org/jira/browse/FLINK-12400 Project: Flink Issue Type: Improvement Components: API / Type Serialization System Affects Versions: 1.8.0, 1.7.2 Environment: Flink 1.7.2 job on 1.8 cluster Kafka 0.10 with a topic in log-compaction Reporter: Pierre Zemb Assignee: Pierre Zemb Hi! Yesterday, we saw a strange behavior with our Flink job and Kafka. We are consuming a Kafka topic setup in [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As such, sending a message with a null payload acts like a tombstone. We are consuming Kafka like this: {code:java} new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), this.kafkaProperties) {code} When we sent the message, job failed because of a NullPointerException [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. `byte[] message` was null, causing the NPE. We forked the class and added a basic nullable check, returning null if so. It fixed our issue. Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12274) Fix documentation to enable Queryable State
[ https://issues.apache.org/jira/browse/FLINK-12274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16823846#comment-16823846 ] Pierre Zemb commented on FLINK-12274: - Will do [~yunta], thanks! > Fix documentation to enable Queryable State > --- > > Key: FLINK-12274 > URL: https://issues.apache.org/jira/browse/FLINK-12274 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.8.0 >Reporter: Pierre Zemb >Priority: Minor > > Hi! > We just upgraded our Flink cluster from 1.7.X to 1.8, and we saw that [this > commit|https://github.com/apache/flink/commit/e8182dcd0987a7326a8c73df902c156e44a7cebf#diff-685c2154c3e2b708b2d36214fe85dfb6] > had a flag to enable or disable queryable-state. As default is now false, we > need to fix the main documentation page located > [here|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html]. > I would love to make it my first contribution to Flink! I was thinking > rewriting a bit the "Activating Queryable State" part. What do you think > about it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12274) Fix documentation to enable Queryable State
Pierre Zemb created FLINK-12274: --- Summary: Fix documentation to enable Queryable State Key: FLINK-12274 URL: https://issues.apache.org/jira/browse/FLINK-12274 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.8.0 Reporter: Pierre Zemb Hi! We just upgraded our Flink cluster from 1.7.X to 1.8, and we saw that [this commit|https://github.com/apache/flink/commit/e8182dcd0987a7326a8c73df902c156e44a7cebf#diff-685c2154c3e2b708b2d36214fe85dfb6] had a flag to enable or disable queryable-state. As default is now false, we need to fix the main documentation page located [here|https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/queryable_state.html]. I would love to make it my first contribution to Flink! I was thinking rewriting a bit the "Activating Queryable State" part. What do you think about it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635612#comment-16635612 ] Pierre Zemb commented on FLINK-10225: - Cool, thanks [~till.rohrmann]! I would love to get involved into the design document ;) > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in registerQueryableState() > (TaskExecutor.java){color} > * {color:#33}registerQueryableState() in associateWithJobManager() > (TaskExecutor.java){color} > * {color:#33}associateWithJobManager in establishJobManagerConnection > (TaskExecutor.java){color} > * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership > (TaskExecutor.java){color} > * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess > (JobLeaderService.java){color} > {color:#33}It seems that the KvStateLocationOracle map is updated only > when the task manager is part of the job. {color} > {color:#33}For now, we are creating a List> and > getting the first CompletableFuture.succeeded future, but that is a > workaround.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743 ] Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM: -- Hi! I've been reading the code a bit, and I have a question. >From what I can see, only the TaskExecutor has enough knowledge to make a call >to the ResourceManager. He is also the only one that is updating the >ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the >RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor. I like the way the interface for KvStateClientProxy is implemented, and I don't want to change , do you have an idea on how could I implement this? cc [~till.rohrmann] was (Author: pierrez): Hi! I've been reading the code a bit, and I have a question. >From what I can see, only the TaskExecutor has enough knowledge to make a call >to the ResourceManager. He is also the only one that is updating the >ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the >RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor. I like the way the interface for KvStateClientProxy is implemented, and I don't want to change , do you have an idea on how could I implement this? > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in
[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620743#comment-16620743 ] Pierre Zemb commented on FLINK-10225: - Hi! I've been reading the code a bit, and I have a question. >From what I can see, only the TaskExecutor has enough knowledge to make a call >to the ResourceManager. He is also the only one that is updating the >ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the >RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor. I like the way the interface for KvStateClientProxy is implemented, and I don't want to change , do you have an idea on how could I implement this? > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in registerQueryableState() > (TaskExecutor.java){color} > * {color:#33}registerQueryableState() in associateWithJobManager() > (TaskExecutor.java){color} > * {color:#33}associateWithJobManager in establishJobManagerConnection > (TaskExecutor.java){color} > * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership > (TaskExecutor.java){color} > * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess > (JobLeaderService.java){color} > {color:#33}It seems that the KvStateLocationOracle map is updated only > when the task manager is part of the job. {color} > {color:#33}For now,
[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596270#comment-16596270 ] Pierre Zemb commented on FLINK-10225: - I still want to give it a try ;) Do you have any additional info beside the [contributing guide|https://flink.apache.org/how-to-contribute.html]? > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in registerQueryableState() > (TaskExecutor.java){color} > * {color:#33}registerQueryableState() in associateWithJobManager() > (TaskExecutor.java){color} > * {color:#33}associateWithJobManager in establishJobManagerConnection > (TaskExecutor.java){color} > * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership > (TaskExecutor.java){color} > * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess > (JobLeaderService.java){color} > {color:#33}It seems that the KvStateLocationOracle map is updated only > when the task manager is part of the job. {color} > {color:#33}For now, we are creating a List> and > getting the first CompletableFuture.succeeded future, but that is a > workaround.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596215#comment-16596215 ] Pierre Zemb commented on FLINK-10225: - Thanks [~till.rohrmann] for the reply! I'm not a Flink contributor, do you think it is an easy first PR? Maybe I could give it a try. > Cannot access state from a empty taskmanager > > > Key: FLINK-10225 > URL: https://issues.apache.org/jira/browse/FLINK-10225 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.5.3, 1.6.0 > Environment: 4tm and 1jm for now on 1.6.0 >Reporter: Pierre Zemb >Priority: Critical > > Hi! > I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), > and deployed a small job on it. Because of the current load, job is > completely handled by a single tm. I've created a small proxy that is using > [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] > to access the current state. It is working nicely, except under certain > circumstances. It seems to me that I can only access the state through a node > that is holding a part of the job. Here's an example: > * job on tm1. Pointing QueryableStateClient to tm1. State accessible > * job still on tm1. Pointing QueryableStateClient to tm2 (for example). > State inaccessible > * killing tm1, job is now on tm2. State accessible > * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible > * adding some parallelism to spread job on tm1 and tm2. Pointing > QueryableStateClient to either tm1 and tm2 is working > * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State > inaccessible > When the state is inaccessible, I can see this (generated > [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): > {code:java} > java.lang.RuntimeException: Failed request 0. Caused by: > org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could > not retrieve location of state=repo-status of > job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is > not ready, or ii) the job does not exist. at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Went a bit through the (master branch) code. Class KvStateClientProxy is > holding {color:#33}kvStateLocationOracle the key-value state location > oracle for the given JobID. Here's the usage{color}{color:#33}:{color} > * {color:#33}updateKvStateLocationOracle() in registerQueryableState() > (TaskExecutor.java){color} > * {color:#33}registerQueryableState() in associateWithJobManager() > (TaskExecutor.java){color} > * {color:#33}associateWithJobManager in establishJobManagerConnection > (TaskExecutor.java){color} > * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership > (TaskExecutor.java){color} > * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess > (JobLeaderService.java){color} > {color:#33}It seems that the KvStateLocationOracle map is updated only > when the task manager is part of the job. {color} > {color:#33}For now, we are creating a List> and > getting the first CompletableFuture.succeeded future, but that is a > workaround.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Zemb updated FLINK-10225: Description: Hi! I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example: * job on tm1. Pointing QueryableStateClient to tm1. State accessible * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible * killing tm1, job is now on tm2. State accessible * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible * adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either tm1 and tm2 is working * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State inaccessible When the state is inaccessible, I can see this (generated [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): {code:java} java.lang.RuntimeException: Failed request 0. Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is not ready, or ii) the job does not exist. at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Went a bit through the (master branch) code. Class KvStateClientProxy is holding {color:#33}kvStateLocationOracle the key-value state location oracle for the given JobID. Here's the usage{color}{color:#33}:{color} * {color:#33}updateKvStateLocationOracle() in registerQueryableState() (TaskExecutor.java){color} * {color:#33}registerQueryableState() in associateWithJobManager() (TaskExecutor.java){color} * {color:#33}associateWithJobManager in establishJobManagerConnection (TaskExecutor.java){color} * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership (TaskExecutor.java){color} * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess (JobLeaderService.java){color} {color:#33}It seems that the KvStateLocationOracle map is updated only when the task manager is part of the job. {color} {color:#33}For now, we are creating a List> and getting the first CompletableFuture.succeeded future, but that is a workaround.{color} was: Hi! I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example: * job on tm1. Pointing QueryableStateClient to tm1. State accessible * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible * killing tm1, job is now on tm2. State accessible * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible * adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either
[jira] [Updated] (FLINK-10225) Cannot access state from a empty taskmanager
[ https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Zemb updated FLINK-10225: Description: Hi! I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example: * job on tm1. Pointing QueryableStateClient to tm1. State accessible * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible * killing tm1, job is now on tm2. State accessible * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible * adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either tm1 and tm2 is working * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State inaccessible When the state is inaccessible, I can see this (generated [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): {code:java} java.lang.RuntimeException: Failed request 0. Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is not ready, or ii) the job does not exist. at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Went a bit through the (master branch) code. Class KvStateClientProxy is holding {color:#33}kvStateLocationOracle the key-value state location oracle for the given JobID. Here's the usage{color}{color:#33}:{color} * {color:#33}updateKvStateLocationOracle() in registerQueryableState() (TaskExecutor.java){color} * {color:#33}registerQueryableState() in associateWithJobManager() (TaskExecutor.java){color} * {color:#33}associateWithJobManager in establishJobManagerConnection (TaskExecutor.java){color} * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership (TaskExecutor.java){color} * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess (JobLeaderService.java){color} {color:#33}It seems that the KvStateLocationOracle map is updated only when the task manager is part of the job. {color} {color:#33}For now, we are creating a List> and getting the first CompletableFuture.succeeded future, but that is a workaround.{color} was: Hi! I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example: * job on tm1. Pointing QueryableStateClient to tm1. State accessible * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible * killing tm1, job is now on tm2. State accessible * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible * adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either tm1
[jira] [Created] (FLINK-10225) Cannot access state from a empty taskmanager
Pierre Zemb created FLINK-10225: --- Summary: Cannot access state from a empty taskmanager Key: FLINK-10225 URL: https://issues.apache.org/jira/browse/FLINK-10225 Project: Flink Issue Type: Bug Components: Queryable State Affects Versions: 1.6.0, 1.5.3 Environment: 4tm and 1jm for now on 1.6.0 Reporter: Pierre Zemb Hi! I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), and deployed a small job on it. Because of the current load, job is completely handled by a single tm. I've created a small proxy that is using [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html] to access the current state. It is working nicely, except under certain circumstances. It seems to me that I can only access the state through a node that is holding a part of the job. Here's an example: * job on tm1. Pointing QueryableStateClient to tm1. State accessible * job still on tm1. Pointing QueryableStateClient to tm2 (for example). State inaccessible * killing tm1, job is now on tm2. State accessible * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible * adding some parallelism to spread job on tm1 and tm2. Pointing QueryableStateClient to either tm1 and tm2 is working * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State inaccessible When the state is inaccessible, I can see this (generated [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]): {{java.lang.RuntimeException: Failed request 0. Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=repo-status of job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is not ready, or ii) the job does not exist. at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)}} Went a bit through the (master branch) code. Class KvStateClientProxy is holding {color:#33}kvStateLocationOracle the key-value state location oracle for the given JobID. Here's the usage{color}{color:#33}:{color} * {color:#33}updateKvStateLocationOracle() in registerQueryableState() (TaskExecutor.java){color} * {color:#33}registerQueryableState() in associateWithJobManager() (TaskExecutor.java){color} * {color:#33}associateWithJobManager in establishJobManagerConnection (TaskExecutor.java) {color} * {color:#33}establishJobManagerConnection in jobManagerGainedLeadership (TaskExecutor.java) {color} * {color:#33}jobManagerGainedLeadership in onRegistrationSuccess (JobLeaderService.java){color} {color:#33}It seems that the KvStateLocationOracle map is updated only when the task manager is part of the job. {color} {color:#33}For now, we are creating a List> and getting the first CompletableFuture.succeeded future, but that is a workaround.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)