I was testing with Flink 1.9. Here is how I set up mini cluster

                int port = 6124;
                int parallelism = 2;
                Configuration config = new Configuration();
                config.setInteger(JobManagerOptions.PORT, port);
                config.setString(JobManagerOptions.ADDRESS, "localhost");
                config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
parallelism);
                // In a non MiniCluster setup queryable state is enabled by 
default.
                config.setString(QueryableStateOptions.PROXY_PORT_RANGE, 
"9069");
                config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
2);
                
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);

                config.setString(QueryableStateOptions.SERVER_PORT_RANGE, 
"9067");
                config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
2);
                
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);

                MiniClusterConfiguration clusterconfig =
                        new MiniClusterConfiguration(config, 1, 
RpcServiceSharing.DEDICATED, null);
                try {
                        // Create a local Flink server
                        MiniCluster flinkCluster = new 
MiniCluster(clusterconfig);
                        // Start server and create environment
                        flinkCluster.start();
                        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
                        env.setParallelism(parallelism);
                        // Build Graph
                        buildGraph(env);
                        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
                        // Submit to the server and wait for completion
                        JobSubmissionResult result = 
flinkCluster.submitJob(jobGraph).get();
                        System.out.println("Job ID : " + result.getJobID());
                        Thread.sleep(Long.MAX_VALUE);
                } catch (Throwable t){
                        t.printStackTrace();
                }

And have a client, that looks like follows:

def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int 
= 9069,
            timeInterval: Long = defaulttimeInterval): Unit = {

    // JobID, has to correspond to a running job
    val jobId = JobID.fromHexString(job)
    // Client
    val client = new QueryableStateClient(host, port)

But when I tried it, it gives an exception that nothing is listening on port 
9069

It works with the old FlinkLocalMiniCluster, but not with the MiniCluster

Am I missing something?



Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

Reply via email to