[FLINK-7826][QS] Add support for all types of state to the QS Client.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abc3e1c8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abc3e1c8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abc3e1c8 Branch: refs/heads/master Commit: abc3e1c888ceee941d557381a8cb8a7df8af2058 Parents: 717a7dc Author: kkloudas <kklou...@gmail.com> Authored: Tue Oct 10 16:40:57 2017 +0200 Committer: kkloudas <kklou...@gmail.com> Committed: Wed Oct 18 09:40:16 2017 +0200 ---------------------------------------------------------------------- .../common/state/FoldingStateDescriptor.java | 2 +- .../client/QueryableStateClient.java | 57 +- .../client/state/ImmutableAggregatingState.java | 71 ++ .../client/state/ImmutableFoldingState.java | 70 ++ .../client/state/ImmutableListState.java | 70 ++ .../client/state/ImmutableMapState.java | 139 ++++ .../client/state/ImmutableReducingState.java | 69 ++ .../client/state/ImmutableState.java | 29 + .../client/state/ImmutableStateBinder.java | 80 +++ .../client/state/ImmutableValueState.java | 69 ++ .../network/AbstractServerHandler.java | 2 +- .../itcases/AbstractQueryableStateITCase.java | 644 +++++++++++++++---- .../state/ImmutableAggregatingStateTest.java | 114 ++++ .../state/ImmutableFoldingStateTest.java | 94 +++ .../state/ImmutableListStateTest.java | 112 ++++ .../state/ImmutableMapStateTest.java | 189 ++++++ .../state/ImmutableReducingStateTest.java | 84 +++ .../state/ImmutableValueStateTest.java | 70 ++ .../streaming/api/datastream/KeyedStream.java | 6 +- .../api/datastream/QueryableStateStream.java | 28 +- .../flink/streaming/api/scala/KeyedStream.scala | 6 +- 21 files changed, 1817 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java index f7609c3..0954047 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java @@ -30,7 +30,7 @@ import static java.util.Objects.requireNonNull; * {@link StateDescriptor} for {@link FoldingState}. This can be used to create partitioned * folding state. * - * @param <T> Type of the values folded int othe state + * @param <T> Type of the values folded in the other state * @param <ACC> Type of the value in the state * * @deprecated will be removed in a future version in favor of {@link AggregatingStateDescriptor} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java index 005c874..70bccf0 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java @@ -21,10 +21,12 @@ package org.apache.flink.queryablestate.client; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.queryablestate.client.state.ImmutableStateBinder; import org.apache.flink.queryablestate.messages.KvStateRequest; import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.Client; @@ -141,15 +143,15 @@ public class QueryableStateClient { * @param key The key we are interested in. * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public <K, V> CompletableFuture<V> getKvState( + public <K, S extends State, V> CompletableFuture<S> getKvState( final JobID jobId, final String queryableStateName, final K key, final TypeHint<K> keyTypeHint, - final StateDescriptor<?, V> stateDescriptor) { + final StateDescriptor<S, V> stateDescriptor) { Preconditions.checkNotNull(keyTypeHint); @@ -164,15 +166,15 @@ public class QueryableStateClient { * @param key The key we are interested in. * @param keyTypeInfo The {@link TypeInformation} of the key. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public <K, V> CompletableFuture<V> getKvState( + public <K, S extends State, V> CompletableFuture<S> getKvState( final JobID jobId, final String queryableStateName, final K key, final TypeInformation<K> keyTypeInfo, - final StateDescriptor<?, V> stateDescriptor) { + final StateDescriptor<S, V> stateDescriptor) { return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); @@ -187,48 +189,17 @@ public class QueryableStateClient { * @param keyTypeInfo The {@link TypeInformation} of the keys. * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. + * @return Future holding the immutable {@link State} object containing the result. */ @PublicEvolving - public <K, V, N> CompletableFuture<V> getKvState( + public <K, N, S extends State, V> CompletableFuture<S> getKvState( final JobID jobId, final String queryableStateName, final K key, final N namespace, final TypeInformation<K> keyTypeInfo, final TypeInformation<N> namespaceTypeInfo, - final StateDescriptor<?, V> stateDescriptor) { - - Preconditions.checkNotNull(stateDescriptor); - - // initialize the value serializer based on the execution config. - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer(); - - return getKvState(jobId, queryableStateName, key, - namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer); - } - - /** - * Returns a future holding the request result. - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key that the state we request is associated with. - * @param namespace The namespace of the state. - * @param keyTypeInfo The {@link TypeInformation} of the keys. - * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. - * @param stateSerializer The {@link TypeSerializer} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public <K, N, V> CompletableFuture<V> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final N namespace, - final TypeInformation<K> keyTypeInfo, - final TypeInformation<N> namespaceTypeInfo, - final TypeSerializer<V> stateSerializer) { + final StateDescriptor<S, V> stateDescriptor) { Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(queryableStateName); @@ -237,7 +208,7 @@ public class QueryableStateClient { Preconditions.checkNotNull(keyTypeInfo); Preconditions.checkNotNull(namespaceTypeInfo); - Preconditions.checkNotNull(stateSerializer); + Preconditions.checkNotNull(stateDescriptor); TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); @@ -253,8 +224,8 @@ public class QueryableStateClient { return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply( stateResponse -> { try { - return KvStateSerializer.deserializeValue(stateResponse.getContent(), stateSerializer); - } catch (IOException e) { + return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent())); + } catch (Exception e) { throw new FlinkRuntimeException(e); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java new file mode 100644 index 0000000..b853cfc --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link AggregatingState} that <b>does not</b> allow for modifications. + * + * <p>This is the type of the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link AggregatingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> { + + private final OUT value; + + private ImmutableAggregatingState(OUT value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public OUT get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState( + final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor, + final byte[] serializedValue) throws IOException { + + final ACC accumulator = KvStateSerializer.deserializeValue( + serializedValue, + stateDescriptor.getSerializer()); + + final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator); + return new ImmutableAggregatingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java new file mode 100644 index 0000000..a12adaa --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link FoldingState} that does not allow for modifications. + * + * <p>This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link FoldingStateDescriptor}. + */ +@PublicEvolving +@Deprecated +public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> { + + private final ACC value; + + private ImmutableFoldingState(ACC value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public ACC get() { + return value; + } + + @Override + public void add(Object newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState( + final FoldingStateDescriptor<IN, ACC> stateDescriptor, + final byte[] serializedState) throws IOException { + + final ACC state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableFoldingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java new file mode 100644 index 0000000..8416905 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; + +/** + * A read-only {@link ListState} that does not allow for modifications. + * + * <p>This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ListStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableListState<V> extends ImmutableState implements ListState<V> { + + private final List<V> listState; + + private ImmutableListState(final List<V> state) { + this.listState = Preconditions.checkNotNull(state); + } + + @Override + public Iterable<V> get() { + return listState; + } + + @Override + public void add(V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <V> ImmutableListState<V> createState( + final ListStateDescriptor<V> stateDescriptor, + final byte[] serializedState) throws IOException { + + final List<V> state = KvStateSerializer.deserializeList( + serializedState, + stateDescriptor.getElementSerializer()); + return new ImmutableListState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java new file mode 100644 index 0000000..c216d5d --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * A read-only {@link MapState} that does not allow for modifications. + * + * <p>This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link MapStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> { + + private final Map<K, V> state; + + private ImmutableMapState(final Map<K, V> mapState) { + this.state = Preconditions.checkNotNull(mapState); + } + + @Override + public V get(K key) { + return state.get(key); + } + + @Override + public void put(K key, V value) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void putAll(Map<K, V> map) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void remove(K key) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public boolean contains(K key) { + return state.containsKey(key); + } + + /** + * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the key-value pairs in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable<Map.Entry<K, V>> entries() { + return Collections.unmodifiableSet(state.entrySet()); + } + + /** + * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}. + * + * @return A read-only iterable view of all the keys in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable<K> keys() { + return Collections.unmodifiableSet(state.keySet()); + } + + /** + * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}. + * + * @return A read-only iterable view of all the values in the state. + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterable<V> values() { + return Collections.unmodifiableCollection(state.values()); + } + + /** + * Iterates over all the mappings in the state. The iterator cannot + * remove elements. + * + * @return A read-only iterator over all the mappings in the state + * + * @throws Exception Thrown if the system cannot access the state. + */ + @Override + public Iterator<Map.Entry<K, V>> iterator() { + return Collections.unmodifiableSet(state.entrySet()).iterator(); + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <K, V> ImmutableMapState<K, V> createState( + final MapStateDescriptor<K, V> stateDescriptor, + final byte[] serializedState) throws IOException { + + final Map<K, V> state = KvStateSerializer.deserializeMap( + serializedState, + stateDescriptor.getKeySerializer(), + stateDescriptor.getValueSerializer()); + return new ImmutableMapState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java new file mode 100644 index 0000000..da08c53 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ReducingState} that does not allow for modifications. + * + * <p>This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ReducingStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> { + + private final V value; + + private ImmutableReducingState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V get() { + return value; + } + + @Override + public void add(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <V> ImmutableReducingState<V> createState( + final ReducingStateDescriptor<V> stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableReducingState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java new file mode 100644 index 0000000..863f07b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +/** + * A base class for the <b>read-only</b> types of state returned + * as results from the Queryable State Client. + */ +abstract class ImmutableState { + + protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR = + new UnsupportedOperationException("State is read-only. No modifications allowed."); +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java new file mode 100644 index 0000000..6ce2787 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateBinder; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.util.Preconditions; + +/** + * A {@link StateBinder} used to deserialize the results returned by the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient}. + * + * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State} + * object containing the requested result. + */ +public class ImmutableStateBinder implements StateBinder { + + private final byte[] serializedState; + + public ImmutableStateBinder(final byte[] content) { + serializedState = Preconditions.checkNotNull(content); + } + + @Override + public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception { + return ImmutableValueState.createState(stateDesc, serializedState); + } + + @Override + public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception { + return ImmutableListState.createState(stateDesc, serializedState); + } + + @Override + public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception { + return ImmutableReducingState.createState(stateDesc, serializedState); + } + + @Override + public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception { + return ImmutableAggregatingState.createState(stateDesc, serializedState); + } + + @Override + public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception { + return ImmutableFoldingState.createState(stateDesc, serializedState); + } + + @Override + public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception { + return ImmutableMapState.createState(stateDesc, serializedState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java new file mode 100644 index 0000000..7fd6457 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A read-only {@link ValueState} that does not allow for modifications. + * + * <p>This is the result returned when querying Flink's keyed state using the + * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and + * providing an {@link ValueStateDescriptor}. + */ +@PublicEvolving +public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> { + + private final V value; + + private ImmutableValueState(V value) { + this.value = Preconditions.checkNotNull(value); + } + + @Override + public V value() { + return value; + } + + @Override + public void update(V newValue) { + throw MODIFICATION_ATTEMPT_ERROR; + } + + @Override + public void clear() { + throw MODIFICATION_ATTEMPT_ERROR; + } + + public static <V> ImmutableValueState<V> createState( + final ValueStateDescriptor<V> stateDescriptor, + final byte[] serializedState) throws IOException { + + final V state = KvStateSerializer.deserializeValue( + serializedState, + stateDescriptor.getSerializer()); + return new ImmutableValueState<>(state); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java index b9bf671..18a88da 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java +++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java @@ -121,7 +121,7 @@ public abstract class AbstractServerHandler<REQ extends MessageBody, RESP extend // Execute actual query async, because it is possibly // blocking (e.g. file I/O). // - // A submission failure is not treated as fatal. todo here if there is a shared resource e.g. registry, then I will have to sync on that. + // A submission failure is not treated as fatal. queryExecutor.submit(new AsyncRequestTask<>(this, ctx, requestId, request, stats)); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java index a096f55..69316fa 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateITCase.java @@ -19,17 +19,27 @@ package org.apache.flink.queryablestate.itcases; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -49,12 +59,18 @@ import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -63,13 +79,16 @@ import org.junit.Before; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -198,7 +217,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; - final List<CompletableFuture<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys); + final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys); for (int i = 0; i < numKeys; i++) { final int key = i; @@ -210,7 +229,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { allNonZero = false; } - CompletableFuture<Tuple2<Integer, Long>> result = getKvStateWithRetries( + CompletableFuture<ReducingState<Tuple2<Integer, Long>>> result = getKvStateWithRetries( client, jobId, queryName, @@ -221,9 +240,14 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { false, executor); - result.thenAccept(res -> { - counts.set(key, res.f1); - assertEquals("Key mismatch", key, res.f0.intValue()); + result.thenAccept(response -> { + try { + Tuple2<Integer, Long> res = response.get(); + counts.set(key, res.f1); + assertEquals("Key mismatch", key, res.f0.intValue()); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } }); futures.add(result); @@ -406,7 +430,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "hakuna", valueState, numElements); + executeValueQuery(deadline, client, jobId, "hakuna", valueState, numElements); } finally { // Free cluster resources if (jobId != null) { @@ -485,7 +509,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "hakuna", valueState, expected); + executeValueQuery(deadline, client, jobId, "hakuna", valueState, expected); } finally { // Free cluster resources if (jobId != null) { @@ -502,87 +526,6 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * <tt>expected</tt> equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries( - client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - stateDescriptor, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } - - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * <tt>expected</tt> equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final TypeSerializer<Tuple2<Integer, Long>> valueSerializer, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - valueSerializer, - QUERY_RETRY_DELAY, - false, - executor); - - Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50L); - } - } - - assertTrue("Did not succeed query", success); - } - } - - /** * Tests simple value state queryable state instance with a default value * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) * tuples, the key is mapped to 1 but key 0 is queried which should throw @@ -647,7 +590,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { // Now query int key = 0; - CompletableFuture<Tuple2<Integer, Long>> future = getKvStateWithRetries( + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( client, jobId, queryableState.getQueryableStateName(), @@ -730,8 +673,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - executeQuery(deadline, client, jobId, "matata", - queryableState.getValueSerializer(), numElements); + final ValueStateDescriptor<Tuple2<Integer, Long>> stateDesc = + (ValueStateDescriptor<Tuple2<Integer, Long>>) queryableState.getStateDescriptor(); + executeValueQuery(deadline, client, jobId, "matata", stateDesc, numElements); } finally { // Free cluster resources @@ -809,10 +753,10 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { for (int key = 0; key < maxParallelism; key++) { boolean success = false; while (deadline.hasTimeLeft() && !success) { - CompletableFuture<String> future = getKvStateWithRetries( + CompletableFuture<FoldingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( client, jobId, - queryableState.getQueryableStateName(), + "pumba", key, BasicTypeInfo.INT_TYPE_INFO, foldingState, @@ -820,7 +764,9 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { false, executor); - String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + //assertEquals("Key mismatch", key, value.f0.intValue()); if (expected.equals(value)) { success = true; } else { @@ -898,12 +844,150 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { cluster.submitJobDetached(jobGraph); - // Wait until job is running + // Now query + long expected = numElements * (numElements + 1L) / 2L; + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture<ReducingState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( + client, + jobId, + "jungle", + key, + BasicTypeInfo.INT_TYPE_INFO, + reducingState, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + /** + * Tests simple map state queryable state instance. Each source emits + * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then + * queried. The map state instance sums the values up. The test succeeds + * after each subtask index is queried with result n*(n+1)/2. + */ + @Test + public void testMapState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final MapStateDescriptor<Integer, Tuple2<Integer, Long>> mapStateDescriptor = new MapStateDescriptor<>( + "timon", + BasicTypeInfo.INT_TYPE_INFO, + source.getType()); + mapStateDescriptor.setQueryable("timon-queryable"); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { + private static final long serialVersionUID = -805125545438296619L; + + private transient MapState<Integer, Tuple2<Integer, Long>> mapState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + mapState = getRuntimeContext().getMapState(mapStateDescriptor); + } + + @Override + public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { + Tuple2<Integer, Long> v = mapState.get(value.f0); + if (v == null) { + v = new Tuple2<>(value.f0, 0L); + } + mapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1)); + } + }); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); // Now query long expected = numElements * (numElements + 1L) / 2L; - executeQuery(deadline, client, jobId, "jungle", reducingState, expected); + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture<MapState<Integer, Tuple2<Integer, Long>>> future = getKvStateWithRetries( + client, + jobId, + "timon-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + mapStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(key); + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } } finally { // Free cluster resources if (jobId != null) { @@ -920,6 +1004,227 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } /** + * Tests simple list state queryable state instance. Each source emits + * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then + * queried. The list state instance add the values to the list. The test + * succeeds after each subtask index is queried and the list contains + * the correct number of distinct elements. + */ + @Test + public void testListState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final ListStateDescriptor<Long> listStateDescriptor = new ListStateDescriptor<Long>( + "list", + BasicTypeInfo.LONG_TYPE_INFO); + listStateDescriptor.setQueryable("list-queryable"); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).process(new ProcessFunction<Tuple2<Integer, Long>, Object>() { + private static final long serialVersionUID = -805125545438296619L; + + private transient ListState<Long> listState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + listState = getRuntimeContext().getListState(listStateDescriptor); + } + + @Override + public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object> out) throws Exception { + listState.add(value.f1); + } + }); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + + Map<Integer, Set<Long>> results = new HashMap<>(); + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture<ListState<Long>> future = getKvStateWithRetries( + client, + jobId, + "list-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + listStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Iterable<Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + Set<Long> res = new HashSet<>(); + for (Long v: value) { + res.add(v); + } + + // the source starts at 0, so +1 + if (res.size() == numElements + 1L) { + success = true; + results.put(key, res); + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + + for (int key = 0; key < maxParallelism; key++) { + Set<Long> values = results.get(key); + for (long i = 0L; i <= numElements; i++) { + assertTrue(values.contains(i)); + } + } + + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + @Test + public void testAggregatingState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final long numElements = 1024L; + + final QueryableStateClient client = new QueryableStateClient( + "localhost", + Integer.parseInt(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(maxParallelism); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L)); + + DataStream<Tuple2<Integer, Long>> source = env + .addSource(new TestAscendingValueSource(numElements)); + + final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> aggrStateDescriptor = + new AggregatingStateDescriptor<>( + "aggregates", + new SumAggr(), + MutableString.class); + aggrStateDescriptor.setQueryable("aggr-queryable"); + + source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + + @Override + public Integer getKey(Tuple2<Integer, Long> value) throws Exception { + return value.f0; + } + }).transform( + "TestAggregatingOperator", + BasicTypeInfo.STRING_TYPE_INFO, + new AggregatingTestOperator(aggrStateDescriptor) + ); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture<AggregatingState<Tuple2<Integer, Long>, String>> future = getKvStateWithRetries( + client, + jobId, + "aggr-queryable", + key, + BasicTypeInfo.INT_TYPE_INFO, + aggrStateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + String value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).get(); + + if (Long.parseLong(value) == numElements * (numElements + 1L) / 2L) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } finally { + // Free cluster resources + if (jobId != null) { + CompletableFuture<CancellationSuccess> cancellation = FutureUtils.toJava(cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class))); + + cancellation.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } + + client.shutdown(); + } + } + + ///// Sources/UDFs Used in the Tests ////// + + /** * Test source producing (key, 0)..(key, maxValue) with key being the sub * task index. * @@ -980,8 +1285,8 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { /** * Test source producing (key, 1) tuples with random key in key range (numKeys). */ - protected static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> - implements CheckpointListener { + private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> implements CheckpointListener { + private static final long serialVersionUID = -5744725196953582710L; private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong(); @@ -997,7 +1302,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { public void open(Configuration parameters) throws Exception { super.open(parameters); if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - LATEST_CHECKPOINT_ID.set(0); + LATEST_CHECKPOINT_ID.set(0L); } } @@ -1012,7 +1317,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { ctx.collect(record); } // mild slow down - Thread.sleep(1); + Thread.sleep(1L); } } @@ -1030,6 +1335,77 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { } /** + * An operator that uses {@link AggregatingState}. + * + * <p>The operator exists for lack of possibility to get an + * {@link AggregatingState} from the {@link org.apache.flink.api.common.functions.RuntimeContext}. + * If this were not the case, we could have a {@link ProcessFunction}. + */ + private static class AggregatingTestOperator + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<Tuple2<Integer, Long>, String> { + + private static final long serialVersionUID = 1L; + + private final AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDescriptor; + private transient AggregatingState<Tuple2<Integer, Long>, String> state; + + AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, MutableString, String> stateDesc) { + this.stateDescriptor = stateDesc; + } + + @Override + public void open() throws Exception { + super.open(); + this.state = getKeyedStateBackend().getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + stateDescriptor); + } + + @Override + public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception { + state.add(element.getValue()); + } + } + + /** + * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumAggr implements AggregateFunction<Tuple2<Integer, Long>, MutableString, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public MutableString createAccumulator() { + return new MutableString(); + } + + @Override + public void add(Tuple2<Integer, Long> value, MutableString accumulator) { + long acc = Long.valueOf(accumulator.value); + acc += value.f1; + accumulator.value = Long.toString(acc); + } + + @Override + public String getResult(MutableString accumulator) { + return accumulator.value; + } + + @Override + public MutableString merge(MutableString a, MutableString b) { + MutableString nValue = new MutableString(); + nValue.value = Long.toString(Long.valueOf(a.value) + Long.valueOf(b.value)); + return nValue; + } + } + + private static final class MutableString { + String value = "0"; + } + + /** * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. */ private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> { @@ -1058,32 +1434,13 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { ///// General Utility Methods ////// - private static <K, V> Future<V> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final TypeSerializer<V> valueTypeSerializer, - final Time retryDelay, - final boolean failForUnknownKeyOrNamespace, - final ScheduledExecutor executor) { - - return retryWithDelay( - () -> client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer), - NO_OF_RETRIES, - retryDelay, - executor, - failForUnknownKeyOrNamespace); - } - - private static <K, V> CompletableFuture<V> getKvStateWithRetries( + private static <K, S extends State, V> CompletableFuture<S> getKvStateWithRetries( final QueryableStateClient client, final JobID jobId, final String queryName, final K key, final TypeInformation<K> keyTypeInfo, - final StateDescriptor<?, V> stateDescriptor, + final StateDescriptor<S, V> stateDescriptor, final Time retryDelay, final boolean failForUnknownKeyOrNamespace, final ScheduledExecutor executor) { @@ -1157,4 +1514,45 @@ public abstract class AbstractQueryableStateITCase extends TestLogger { (t, throwable) -> operationResultFuture.cancel(false)); } } + + /** + * Retry a query for state for keys between 0 and {@link #maxParallelism} until + * <tt>expected</tt> equals the value of the result tuple's second field. + */ + private void executeValueQuery( + final Deadline deadline, + final QueryableStateClient client, + final JobID jobId, + final String queryableStateName, + final ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor, + final long expected) throws Exception { + + for (int key = 0; key < maxParallelism; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = getKvStateWithRetries( + client, + jobId, + queryableStateName, + key, + BasicTypeInfo.INT_TYPE_INFO, + stateDescriptor, + QUERY_RETRY_DELAY, + false, + executor); + + Tuple2<Integer, Long> value = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS).value(); + + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50L); + } + } + + assertTrue("Did not succeed query", success); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java new file mode 100644 index 0000000..69b2f61 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableAggregatingStateTest}. + */ +public class ImmutableAggregatingStateTest { + + private final AggregatingStateDescriptor<Long, MutableString, String> aggrStateDesc = + new AggregatingStateDescriptor<>( + "test", + new SumAggr(), + MutableString.class); + + private ImmutableAggregatingState<Long, String> aggrState; + + @Before + public void setUp() throws Exception { + if (!aggrStateDesc.isSerializerInitialized()) { + aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + final MutableString initValue = new MutableString(); + initValue.value = "42"; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out)); + + aggrState = ImmutableAggregatingState.createState( + aggrStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.clear(); + } + + /** + * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumAggr implements AggregateFunction<Long, MutableString, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public MutableString createAccumulator() { + return new MutableString(); + } + + @Override + public void add(Long value, MutableString accumulator) { + accumulator.value += ", " + value; + } + + @Override + public String getResult(MutableString accumulator) { + return accumulator.value; + } + + @Override + public MutableString merge(MutableString a, MutableString b) { + MutableString nValue = new MutableString(); + nValue.value = a.value + ", " + b.value; + return nValue; + } + } + + private static final class MutableString { + String value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java new file mode 100644 index 0000000..d2c9535 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableFoldingState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableFoldingState}. + */ +public class ImmutableFoldingStateTest { + + private final FoldingStateDescriptor<Long, String> foldingStateDesc = + new FoldingStateDescriptor<>( + "test", + "0", + new SumFold(), + StringSerializer.INSTANCE); + + private ImmutableFoldingState<Long, String> foldingState; + + @Before + public void setUp() throws Exception { + if (!foldingStateDesc.isSerializerInitialized()) { + foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out)); + + foldingState = ImmutableFoldingState.createState( + foldingStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.clear(); + } + + /** + * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumFold implements FoldFunction<Long, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public String fold(String accumulator, Long value) throws Exception { + long acc = Long.valueOf(accumulator); + acc += value; + return Long.toString(acc); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java new file mode 100644 index 0000000..3283295 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.queryablestate.client.state.ImmutableListState; +import org.apache.flink.runtime.state.heap.HeapListState; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableListState}. + */ +public class ImmutableListStateTest { + + private final ListStateDescriptor<Long> listStateDesc = + new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableListState<Long> listState; + + @Before + public void setUp() throws Exception { + if (!listStateDesc.isSerializerInitialized()) { + listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + List<Long> init = new ArrayList<>(); + init.add(42L); + + byte[] serInit = serializeInitValue(init); + listState = ImmutableListState.createState(listStateDesc, serInit); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + List<Long> list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + List<Long> list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.clear(); + } + + /** + * Copied from {@link HeapListState#getSerializedValue(Object, Object)}. + */ + private byte[] serializeInitValue(List<Long> toSerialize) throws IOException { + TypeSerializer<Long> serializer = listStateDesc.getElementSerializer(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); + + // write the same as RocksDB writes lists, with one ',' separator + for (int i = 0; i < toSerialize.size(); i++) { + serializer.serialize(toSerialize.get(i), view); + if (i < toSerialize.size() - 1) { + view.writeByte(','); + } + } + view.flush(); + + return baos.toByteArray(); + } + + private List<Long> getStateContents() { + List<Long> list = new ArrayList<>(); + for (Long elem: listState.get()) { + list.add(elem); + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java new file mode 100644 index 0000000..30a8a50 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.ImmutableMapState; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link ImmutableMapState}. + */ +public class ImmutableMapStateTest { + + private final MapStateDescriptor<Long, Long> mapStateDesc = + new MapStateDescriptor<>( + "test", + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableMapState<Long, Long> mapState; + + @Before + public void setUp() throws Exception { + if (!mapStateDesc.isSerializerInitialized()) { + mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + Map<Long, Long> initMap = new HashMap<>(); + initMap.put(1L, 5L); + initMap.put(2L, 5L); + + byte[] initSer = KvStateSerializer.serializeMap( + initMap.entrySet(), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + mapState = ImmutableMapState.createState(mapStateDesc, initSer); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPut() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPutAll() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Map<Long, Long> nMap = new HashMap<>(); + nMap.put(1L, 7L); + nMap.put(2L, 7L); + + mapState.putAll(nMap); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterator() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterable() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterable<Map.Entry<Long, Long>> iterable = mapState.entries(); + Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator(); + while (iterator.hasNext()) { + assertEquals(5L, (long) iterator.next().getValue()); + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testKeys() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Long> iterator = mapState.keys().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testValues() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Long> iterator = mapState.values().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java new file mode 100644 index 0000000..9b1ecf8 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.ImmutableReducingState; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableReducingState}. + */ +public class ImmutableReducingStateTest { + + private final ReducingStateDescriptor<Long> reducingStateDesc = + new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableReducingState<Long> reduceState; + + @Before + public void setUp() throws Exception { + if (!reducingStateDesc.isSerializerInitialized()) { + reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + reduceState = ImmutableReducingState.createState( + reducingStateDesc, + ByteBuffer.allocate(Long.BYTES).putLong(42L).array() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.clear(); + } + + /** + * Test {@link ReduceFunction} summing up its two arguments. + */ + private static class SumReduce implements ReduceFunction<Long> { + + private static final long serialVersionUID = 6041237513913189144L; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +}