[ https://issues.apache.org/jira/browse/FLINK-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069783#comment-16069783 ]
ASF GitHub Bot commented on FLINK-7044: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4225#discussion_r125000171 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java --- @@ -267,6 +293,177 @@ public void shutDown() { } /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeHint<K> keyTypeHint, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(keyTypeHint); + + TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo(); + return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeInfo The {@link TypeInformation} of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(keyTypeInfo); + + return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, + keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V, N> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation<K> keyTypeInfo, + final TypeInformation<N> namespaceTypeInfo, + final StateDescriptor<?, V> stateDescriptor) { + + Preconditions.checkNotNull(stateDescriptor); + + // initialize the value serializer based on the execution config. + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + TypeSerializer<V> valueSerializer = stateDescriptor.getSerializer(); + + return getKvState(jobId, queryableStateName, key, + namespace, keyTypeInfo, namespaceTypeInfo, valueSerializer); + } + + /** + * Returns a future holding the request result. + * + * <p>If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + * <p>If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + * <p>All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param valueSerializer The {@link TypeSerializer} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public <K, V, N> Future<V> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation<K> keyTypeInfo, + final TypeInformation<N> namespaceTypeInfo, + final TypeSerializer<V> valueSerializer) { --- End diff -- I think this could be `stateSerializer`, since it's not restricted to value states, having "value" in there might confuse some people when looking at the code. > Add methods to the client API that take the stateDescriptor. > ------------------------------------------------------------ > > Key: FLINK-7044 > URL: https://issues.apache.org/jira/browse/FLINK-7044 > Project: Flink > Issue Type: Improvement > Components: Queryable State > Affects Versions: 1.3.0, 1.3.1 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)