Hi, I have set up a flink cluster on my local machine. I created a flink job ( TrackMaximumTemperature) and made the state queryable. I am using *github/streamingwithflink/chapter7/QueryableState.scala* example from *https://github.com/streaming-with-flink <https://github.com/streaming-with-flink>* repository. Please find the file attached.
Now i have the running job id and when i go and try to access the state, it throws an exception. I see the job is running and I am using the correct jobId. Also checkpointing is enabled in the original job and i have set the properties related to checkpointing in flink-conf.yaml. Am I missing something? Any leads will be appreciated. Thank you :) *Exception stack trace:* Caused by: org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could not retrieve location of state=maxTemperature of job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state is not ready, or ii) the job does not exist. at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
package io.github.streamingwithflink.chapter7 import java.util.concurrent.CompletableFuture import io.github.streamingwithflink.util.{SensorReading, SensorSource, SensorTimeAssigner} import org.apache.flink.api.common.JobID import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.queryablestate.client.QueryableStateClient import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time object TrackMaximumTemperature { /** main() defines and executes the DataStream program */ def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() // checkpoint every 10 seconds env.getCheckpointConfig.setCheckpointInterval(10 * 1000) // use event time for the application env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // configure watermark interval env.getConfig.setAutoWatermarkInterval(1000L) // ingest sensor stream val sensorData: DataStream[SensorReading] = env // SensorSource generates random temperature readings .addSource(new SensorSource) // assign timestamps and watermarks which are required for event time .assignTimestampsAndWatermarks(new SensorTimeAssigner) val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData // project to sensor id and temperature .map(r => (r.id, r.temperature)) // compute every 10 seconds the max temperature per sensor .keyBy(_._1) .timeWindow(Time.seconds(10)) .max(1) // store latest value for each sensor in a queryable state tenSecsMaxTemps .keyBy(_._1) .asQueryableState("maxTemperature") // execute application env.execute("Track max temperature") } } object TemperatureDashboard { // queryable state proxy connection information. // can be looked up in logs of running QueryableStateJob val proxyHost = "localhost" val proxyPort = 9069 // jobId of running QueryableStateJob. // can be looked up in logs of running job or the web UI val jobId = "9a528bf3e1b650aed7e0b1e26d038ad5" // how many sensors to query val numSensors = 5 // how often to query val refreshInterval = 10000 def main(args: Array[String]): Unit = { // configure client with host and port of queryable state proxy val client = new QueryableStateClient(proxyHost, proxyPort) val futures = new Array[CompletableFuture[ValueState[(String, Double)]]](numSensors) val results = new Array[Double](numSensors) // print header line of dashboard table val header = (for (i <- 0 until numSensors) yield "sensor_" + (i + 1)).mkString("\t| ") println(header) // loop forever while (true) { // send out async queries for (i <- 0 until numSensors) { futures(i) = queryState("sensor_" + (i + 1), client) } // wait for results for (i <- 0 until numSensors) { results(i) = futures(i).get().value()._2 } // print result val line = results.map(t => f"$t%1.3f").mkString("\t| ") println(line) // wait to send out next queries Thread.sleep(refreshInterval) } client.shutdownAndWait() } def queryState(key: String, client: QueryableStateClient): CompletableFuture[ValueState[(String, Double)]] = { client.getKvState[String, ValueState[(String, Double)], (String, Double)]( JobID.fromHexString(jobId), "maxTemperature", key, Types.STRING, new ValueStateDescriptor[(String, Double)]("", Types.TUPLE[(String, Double)])) } }