I found the issue. When parallelism = 3, my test data set was skewed such that data was only going to two of the three task managers (kafka partition = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a test data set with enough keys that spread across all three task managers, queryable state started working as expected. That is why only two KVStates were registered with the job manager, instead of three.
 
my FINAL :-) question....should I be getting org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event only N-1 task managers have data in a parallelism of N situation?
 
Thanks for all the help!
 
 
04.05.2017, 11:24, "Ufuk Celebi" <u...@apache.org>:

Could you try KvStateRegistry#registerKvState please?

In the JM logs you should see something about the number of connected
task managers and in the task manager logs that each one connects to a
JM.

– Ufuk


On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
<chet.master...@yandex.com> wrote:

 Can do. Any advice on where the trace prints should go in the task manager
 source code?

 BTW - How do I know I have a correctly configured cluster? Is there a set of
 messages in the job / task manager logs that indicate all required
 connectivity is present? I know I use the UI to make sure all the task
 managers are present, and that the job is running on all of them, but is
 there some verbiage in the logs that indicates the job manager can talk to
 all the task managers, and vice versa?

 Thanks!


 02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>:

 Hey Chet! I'm wondering why you are only seeing 2 registration
 messages for 3 task managers. Unfortunately, there is no log message
 at the task managers when they send out the notification. Is it
 possible for you to run a remote debugger with the task managers or
 build a custom Flink version with the appropriate log messages on the
 task manager side?
 – Ufuk


 On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
 <chet.master...@yandex.com> wrote:



  Any insight here? I've got a situation where a key value state on a task
  manager is being registered with the job manager, but when I try to query
  it, the job manager responds it doesn't know the location of the key value
  state...


  26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:

  After setting the logging to DEBUG on the job manager, I learned four
  things:

  (On the message formatting below, I have the Flink logs formatted into JSON
  so I can import them into Kibana)

  1. The appropriate key value state is registered in both parallelism = 1
 and
  parallelism = 3 environments. In parallelism = 1, I saw one registration
  message in the log, in the parallelism = 3, I saw two registration
 messages:
  {"level":"DEBUG","time":"2017-04-26

 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc":"",
  "msg":"Key value state registered for job <job id> under name <statename>"}

  2. When I issued the query in both parallelism = 1 and parallelism = 3
  environments, I saw "Lookup key-value state for job <job id> with
  registration name <statename>". In parallelism = 1, I saw 1 log message, in
  parallelism = 3, I saw two identical messages.

  3. I saw no other messages in the job manager log that seemed relevant.

  4. When issuing the query in parallelism = 3, I continued to get the error:
  org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
 message
  of null.

  Thanks!





  26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:

  Thanks! Your config looks good to me.

  Could you please set the log level org.apache.flink.runtime.jobmanager to
  DEBUG?

  log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG

  Then we can check whether the JobManager logs the registration of the
  state instance with the respective name in the case of parallelism >
  1?

  Expected output is something like this: "Key value state registered
  for job ${msg.getJobId} under name ${msg.getRegistrationName}."

  – Ufuk

  On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
  <chet.master...@yandex.com> wrote:

   Ok...more information.

   1. Built a fresh cluster from the ground up. Started testing queryable
  state
   at each step.
   2. When running under any configuration of task managers and job managers
   were parallelism = 1, the queries execute as expected.
   3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
   manager) feeding off a kafka topic partitioned three ways, queries will
   always fail, returning error
   (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
   error message of null.
   4. I do know my state is as expected on the cluster. Liberal use of trace
   prints show my state managed on the jobs is as I expect. However, I cannot
   query them external.
   5. I am sending the query to jobmanager.rpc.port = 6123, which I confirmed
   is configured by using the job manager UI.
   6. My flink-conf.yaml:

   jobmanager.rpc.address: flink01
   jobmanager.rpc.port: 6123
   jobmanager.heap.mb: 256

   taskmanager.heap.mb: 512
   taskmanager.data.port: 6121
   taskmanager.numberOfTaskSlots: 1
   taskmanager.memory.preallocate: false

   parallelism.default: 1
   blob.server.port: 6130

   jobmanager.web.port: 8081
   query.server.enable: true

   7. I do know my job is indeed running in parallel, from trace prints going
   to the task manager logs.

   Do I need a backend configured when running in parallel for the queryable
   state? Do I need a shared temp directory on the task managers?

   THANKS!


   25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:

   It's strange that the rpc port is set to 30000 when you use a
   standalone cluster and configure 6123 as the port. I'm pretty sure
   that the config has not been updated.

   But everything should work as you say when you point it to the correct
   jobmanager address and port. Could you please post the complete
   stacktrace you get instead of the message you log?


   On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
   <chet.master...@yandex.com> wrote:



    More information:

    0. I did remove the query.server.port and query.server.enabled from all
    flink-conf.yaml files, and restarted the cluster.

    1. The Akka error doesn't seem to have anything to do with the problem.
 If
   I
    point my query client at an IP address with no Flink server running at
  all,
    I get that error. It seems to be a (side effect?) timeout for "no flink
    service is listening on the port you told me to check"

    2. I did notice (using the Flink Web UI) even with the config file
 changes
    in 0, and no changes to the default flink-conf.yaml the
  jobmanager.rpc.port
    (6123), on my cluster, jobmanager.rpc.port is set to 30000.

    3. If I do send a query using the jobmanager.rpc.address and the
    jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
   from
    the client to Flink will be initiated and completed. When I try to
 execute
    the query (code below), it will fail, and will get trapped. When I look
 at
    the error message returned (e.getMessage() below), it is simply 'null':

    try {
          byte[] serializedResult = Await.result(future, new
    FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
          // de-serialize, commented out for testing
          return null;
            }
            catch (Exception e) {
                logger.error("Queryable State Error:
    "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
                return null;
            }

    Should I be sending the query to the job manager on the the job manager's
    rpc port when flink is clustered?

    ALSO - I do know the state name I am trying to query exists, is
 populated,
    and the job id exists. I also know the task managers are communicating
  with
    the job managers (task managers data port: 6121) and processed the data
   that
    resulted in the state variable I am trying to query being populated. All
    this was logged.


    24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>:

    Hey Chet! You can remove

    query.server.port: 6123
    query.server.enable: true

    That shouldn't cause the Exception we see here though. I'm actually
    not sure what is causing the PduCodecException. Could this be related
    to different Akka versions being used in Flink and your client code?
    [1] Is it possible for you to check this?

    – Ufuk

    [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Reply via email to