Hi,

I managed to work around the JobID issues, by first starting the task that
queries the state, pauzing it, and then using env.executeAsync.getJobID to
get the proper jobID to use when querying the state, and passing that to the
(pauzed) query state task, which can then continue.

However, the Queryable state CompletableFuture objects always return empty.
Below is the relevant code. Any idea what I'm doing wrong? Any help much
appreciated.


The state is a MapState<GradoopId, Hashmap&lt;GradoopId, TemporalEdge>>.
This represents a edge list of a graph, sorted by source vertex id, and then
by target vertex id. 

// The method call to get all edges from another graph partition/thread
which have a certain srcId. 
HashMap<GradoopId, TemporalEdge> answer = QS.getSrcVertex(partitionId,
srcId);

     // The method itself. The answer returned is always null, even when the
queried partition's state includes the srcId. 
     public HashMap<GradoopId, TemporalEdge> getSrcVertex(Integer
partitionId, GradoopId srcVertex) throws ExecutionException,
InterruptedException {
        CompletableFuture<MapState&lt;GradoopId, HashMap&lt;GradoopId,
TemporalEdge>>> resultFuture =
                client.getKvState(
                        jobID,
                        "queryableState",
                        partitionId,
                        new TypeHint<Integer>(){},
                        descriptor);
        AtomicReference<HashMap&lt;GradoopId, TemporalEdge>> answer = new
AtomicReference<>();
        resultFuture.thenAccept(response -> {

            // These prints are never reached
            try {
                answer.set(response.get(srcVertex));
                System.out.println(response.get(srcVertex));
            } catch (Exception e) {
                System.out.println("We dont have state");
            }
        });
        return answer.get();
    }

// The descriptor used
        descriptor =
                new MapStateDescriptor<GradoopId, HashMap&lt;GradoopId,
TemporalEdge>>(
                        "edgeList",
                        TypeInformation.of(new TypeHint<GradoopId>() {
                        }).createSerializer(new ExecutionConfig()),
                        TypeInformation.of(new
TypeHint<HashMap&lt;GradoopId, TemporalEdge>>() {
                        }).createSerializer(new ExecutionConfig())
                );

// The client
client = new QueryableStateClient("127.0.0.1", 9067);



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to