Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
<chet.master...@yandex.com> wrote:
> Can do. Any advice on where the trace prints should go in the task manager
> source code?
>
> BTW - How do I know I have a correctly configured cluster? Is there a set of
> messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
>
> Thanks!
>
>
> 02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>:
>
> Hey Chet! I'm wondering why you are only seeing 2 registration
> messages for 3 task managers. Unfortunately, there is no log message
> at the task managers when they send out the notification. Is it
> possible for you to run a remote debugger with the task managers or
> build a custom Flink version with the appropriate log messages on the
> task manager side?
> – Ufuk
>
>
> On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
> <chet.master...@yandex.com> wrote:
>
>
>
>  Any insight here? I've got a situation where a key value state on a task
>  manager is being registered with the job manager, but when I try to query
>  it, the job manager responds it doesn't know the location of the key value
>  state...
>
>
>  26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:
>
>  After setting the logging to DEBUG on the job manager, I learned four
>  things:
>
>  (On the message formatting below, I have the Flink logs formatted into JSON
>  so I can import them into Kibana)
>
>  1. The appropriate key value state is registered in both parallelism = 1
> and
>  parallelism = 3 environments. In parallelism = 1, I saw one registration
>  message in the log, in the parallelism = 3, I saw two registration
> messages:
>  {"level":"DEBUG","time":"2017-04-26
>
> 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
>  "msg":"Key value state registered for job <job id> under name <statename>"}
>
>  2. When I issued the query in both parallelism = 1 and parallelism = 3
>  environments, I saw "Lookup key-value state for job <job id> with
>  registration name <statename>". In parallelism = 1, I saw 1 log message, in
>  parallelism = 3, I saw two identical messages.
>
>  3. I saw no other messages in the job manager log that seemed relevant.
>
>  4. When issuing the query in parallelism = 3, I continued to get the error:
>  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>  of null.
>
>  Thanks!
>
>
>
>
>
>  26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:
>
>  Thanks! Your config looks good to me.
>
>  Could you please set the log level org.apache.flink.runtime.jobmanager to
>  DEBUG?
>
>  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
>
>  Then we can check whether the JobManager logs the registration of the
>  state instance with the respective name in the case of parallelism >
>  1?
>
>  Expected output is something like this: "Key value state registered
>  for job ${msg.getJobId} under name ${msg.getRegistrationName}."
>
>  – Ufuk
>
>  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>  <chet.master...@yandex.com> wrote:
>
>   Ok...more information.
>
>   1. Built a fresh cluster from the ground up. Started testing queryable
>  state
>   at each step.
>   2. When running under any configuration of task managers and job managers
>   were parallelism = 1, the queries execute as expected.
>   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
>   manager) feeding off a kafka topic partitioned three ways, queries will
>   always fail, returning error
>   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>   error message of null.
>   4. I do know my state is as expected on the cluster. Liberal use of trace
>   prints show my state managed on the jobs is as I expect. However, I cannot
>   query them external.
>   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
>   is configured by using the job manager UI.
>   6. My flink-conf.yaml:
>
>   jobmanager.rpc.address: flink01
>   jobmanager.rpc.port: 6123
>   jobmanager.heap.mb: 256
>
>   taskmanager.heap.mb: 512
>   taskmanager.data.port: 6121
>   taskmanager.numberOfTaskSlots: 1
>   taskmanager.memory.preallocate: false
>
>   parallelism.default: 1
>   blob.server.port: 6130
>
>   jobmanager.web.port: 8081
>   query.server.enable: true
>
>   7. I do know my job is indeed running in parallel, from trace prints going
>   to the task manager logs.
>
>   Do I need a backend configured when running in parallel for the queryable
>   state? Do I need a shared temp directory on the task managers?
>
>   THANKS!
>
>
>   25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:
>
>   It's strange that the rpc port is set to 30000 when you use a
>   standalone cluster and configure 6123 as the port. I'm pretty sure
>   that the config has not been updated.
>
>   But everything should work as you say when you point it to the correct
>   jobmanager address and port. Could you please post the complete
>   stacktrace you get instead of the message you log?
>
>
>   On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
>   <chet.master...@yandex.com> wrote:
>
>
>
>    More information:
>
>    0. I did remove the query.server.port and query.server.enabled from all
>    flink-conf.yaml files, and restarted the cluster.
>
>    1. The Akka error doesn't seem to have anything to do with the problem.
> If
>   I
>    point my query client at an IP address with no Flink server running at
>  all,
>    I get that error. It seems to be a (side effect?) timeout for "no flink
>    service is listening on the port you told me to check"
>
>    2. I did notice (using the Flink Web UI) even with the config file
> changes
>    in 0, and no changes to the default flink-conf.yaml the
>  jobmanager.rpc.port
>    (6123), on my cluster, jobmanager.rpc.port is set to 30000.
>
>    3. If I do send a query using the jobmanager.rpc.address and the
>    jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
>   from
>    the client to Flink will be initiated and completed. When I try to
> execute
>    the query (code below), it will fail, and will get trapped. When I look
> at
>    the error message returned (e.getMessage() below), it is simply 'null':
>
>    try {
>          byte[] serializedResult = Await.result(future, new
>    FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
>          // de-serialize, commented out for testing
>          return null;
>            }
>            catch (Exception e) {
>                logger.error("Queryable State Error:
>    "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
>                return null;
>            }
>
>    Should I be sending the query to the job manager on the the job manager's
>    rpc port when flink is clustered?
>
>    ALSO - I do know the state name I am trying to query exists, is
> populated,
>    and the job id exists. I also know the task managers are communicating
>  with
>    the job managers (task managers data port: 6121) and processed the data
>   that
>    resulted in the state variable I am trying to query being populated. All
>    this was logged.
>
>
>    24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>:
>
>    Hey Chet! You can remove
>
>    query.server.port: 6123
>    query.server.enable: true
>
>    That shouldn't cause the Exception we see here though. I'm actually
>    not sure what is causing the PduCodecException. Could this be related
>    to different Akka versions being used in Flink and your client code?
>    [1] Is it possible for you to check this?
>
>    – Ufuk
>
>    [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Reply via email to