Hi,

I have set up a flink cluster on my local machine. I created a flink job (
TrackMaximumTemperature) and made the state queryable. I am using
*github/streamingwithflink/chapter7/QueryableState.scala* example from
*https://github.com/streaming-with-flink
<https://github.com/streaming-with-flink>* repository. Please find the
file attached.

Now i have the running job id and when i go and try to access the state, it
throws an exception. I see the job is running and I am using the correct
jobId. Also checkpointing is enabled in the original job and i have set the
properties related to checkpointing in flink-conf.yaml. Am I
missing something? Any leads will be appreciated. Thank you :)


*Exception stack trace:*
Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=maxTemperature of
job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
package io.github.streamingwithflink.chapter7

import java.util.concurrent.CompletableFuture

import io.github.streamingwithflink.util.{SensorReading, SensorSource, 
SensorTimeAssigner}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.queryablestate.client.QueryableStateClient
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object TrackMaximumTemperature {

  /** main() defines and executes the DataStream program */
  def main(args: Array[String]) {

    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

    // checkpoint every 10 seconds
    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

    // use event time for the application
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // configure watermark interval
    env.getConfig.setAutoWatermarkInterval(1000L)

    // ingest sensor stream
    val sensorData: DataStream[SensorReading] = env
      // SensorSource generates random temperature readings
      .addSource(new SensorSource)
      // assign timestamps and watermarks which are required for event time
      .assignTimestampsAndWatermarks(new SensorTimeAssigner)

    val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
      // project to sensor id and temperature
      .map(r => (r.id, r.temperature))
      // compute every 10 seconds the max temperature per sensor
      .keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .max(1)

    // store latest value for each sensor in a queryable state
    tenSecsMaxTemps
      .keyBy(_._1)
      .asQueryableState("maxTemperature")

    // execute application
    env.execute("Track max temperature")
  }
}

object TemperatureDashboard {

  // queryable state proxy connection information.
  // can be looked up in logs of running QueryableStateJob
  val proxyHost = "localhost"
  val proxyPort = 9069
  // jobId of running QueryableStateJob.
  // can be looked up in logs of running job or the web UI
  val jobId = "9a528bf3e1b650aed7e0b1e26d038ad5"

  // how many sensors to query
  val numSensors = 5
  // how often to query
  val refreshInterval = 10000

  def main(args: Array[String]): Unit = {

    // configure client with host and port of queryable state proxy
    val client = new QueryableStateClient(proxyHost, proxyPort)

    val futures = new Array[CompletableFuture[ValueState[(String, 
Double)]]](numSensors)
    val results = new Array[Double](numSensors)

    // print header line of dashboard table
    val header = (for (i <- 0 until numSensors) yield "sensor_" + (i + 
1)).mkString("\t| ")
    println(header)

    // loop forever
    while (true) {

      // send out async queries
      for (i <- 0 until numSensors) {
        futures(i) = queryState("sensor_" + (i + 1), client)
      }
      // wait for results
      for (i <- 0 until numSensors) {
        results(i) = futures(i).get().value()._2
      }
      // print result
      val line = results.map(t => f"$t%1.3f").mkString("\t| ")
      println(line)

      // wait to send out next queries
      Thread.sleep(refreshInterval)
    }

    client.shutdownAndWait()

  }

  def queryState(key: String, client: QueryableStateClient): 
CompletableFuture[ValueState[(String, Double)]] = {
    client.getKvState[String, ValueState[(String, Double)], (String, Double)](
      JobID.fromHexString(jobId),
      "maxTemperature",
      key,
      Types.STRING,
      new ValueStateDescriptor[(String, Double)]("", Types.TUPLE[(String, 
Double)]))
  }

}

Reply via email to