Repository: flink Updated Branches: refs/heads/master bc4638a3c -> f48f5340a
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index 2567004..aa4e6d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; @@ -35,10 +34,6 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -53,18 +48,15 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; /** - * Tests for {@link KvStateRequestSerializer}. + * Tests for {@link KvStateSerializer}. */ @RunWith(Parameterized.class) public class KvStateRequestSerializerTest { - private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; - @Parameterized.Parameters public static Collection<Boolean> parameters() { return Arrays.asList(false, true); @@ -74,155 +66,6 @@ public class KvStateRequestSerializerTest { public boolean async; /** - * Tests KvState request serialization. - */ - @Test - public void testKvStateRequestSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1337L; - KvStateID kvStateId = new KvStateID(); - byte[] serializedKeyAndNamespace = randomByteArray(1024); - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest( - alloc, - requestId, - kvStateId, - serializedKeyAndNamespace); - - int frameLength = buf.readInt(); - assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - assertEquals(kvStateId, request.getKvStateId()); - assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); - } - - /** - * Tests KvState request serialization with zero-length serialized key and namespace. - */ - @Test - public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { - byte[] serializedKeyAndNamespace = new byte[0]; - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest( - alloc, - 1823, - new KvStateID(), - serializedKeyAndNamespace); - - int frameLength = buf.readInt(); - assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> key and namespace. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { - new KvStateRequest(0, new KvStateID(), null); - } - - /** - * Tests KvState request result serialization. - */ - @Test - public void testKvStateRequestResultSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 72727278L; - byte[] serializedResult = randomByteArray(1024); - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult( - alloc, - requestId, - serializedResult); - - int frameLength = buf.readInt(); - assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestResult request = KvStateRequestSerializer.deserializeKvStateRequestResult(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - - assertArrayEquals(serializedResult, request.getSerializedResult()); - } - - /** - * Tests KvState request result serialization with zero-length serialized result. - */ - @Test - public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception { - byte[] serializedResult = new byte[0]; - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult( - alloc, - 72727278, - serializedResult); - - int frameLength = buf.readInt(); - - assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestResult request = KvStateRequestSerializer.deserializeKvStateRequestResult(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedResult, request.getSerializedResult()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> results. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedResult() throws Exception { - new KvStateRequestResult(0, null); - } - - /** - * Tests KvState request failure serialization. - */ - @Test - public void testKvStateRequestFailureSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1111222L; - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestFailure( - alloc, - requestId, - cause); - - int frameLength = buf.readInt(); - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure request = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - assertEquals(cause.getClass(), request.getCause().getClass()); - assertEquals(cause.getMessage(), request.getCause().getMessage()); - } - - /** - * Tests KvState server failure serialization. - */ - @Test - public void testServerFailureSerialization() throws Exception { - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = KvStateRequestSerializer.serializeServerFailure(alloc, cause); - - int frameLength = buf.readInt(); - assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - Throwable request = KvStateRequestSerializer.deserializeServerFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(cause.getClass(), request.getClass()); - assertEquals(cause.getMessage(), request.getMessage()); - } - - /** * Tests key and namespace serialization utils. */ @Test @@ -233,10 +76,10 @@ public class KvStateRequestSerializerTest { long expectedKey = Integer.MAX_VALUE + 12323L; String expectedNamespace = "knilf"; - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( expectedKey, keySerializer, expectedNamespace, namespaceSerializer); - Tuple2<Long, String> actual = KvStateRequestSerializer.deserializeKeyAndNamespace( + Tuple2<Long, String> actual = KvStateSerializer.deserializeKeyAndNamespace( serializedKeyAndNamespace, keySerializer, namespaceSerializer); assertEquals(expectedKey, actual.f0.longValue()); @@ -248,7 +91,7 @@ public class KvStateRequestSerializerTest { */ @Test(expected = IOException.class) public void testKeyAndNamespaceDeserializationEmpty() throws Exception { - KvStateRequestSerializer.deserializeKeyAndNamespace( + KvStateSerializer.deserializeKeyAndNamespace( new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); } @@ -257,7 +100,7 @@ public class KvStateRequestSerializerTest { */ @Test(expected = IOException.class) public void testKeyAndNamespaceDeserializationTooShort() throws Exception { - KvStateRequestSerializer.deserializeKeyAndNamespace( + KvStateSerializer.deserializeKeyAndNamespace( new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); } @@ -267,7 +110,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testKeyAndNamespaceDeserializationTooMany1() throws Exception { // Long + null String + 1 byte - KvStateRequestSerializer.deserializeKeyAndNamespace( + KvStateSerializer.deserializeKeyAndNamespace( new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); } @@ -278,7 +121,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testKeyAndNamespaceDeserializationTooMany2() throws Exception { // Long + null String + 2 bytes - KvStateRequestSerializer.deserializeKeyAndNamespace( + KvStateSerializer.deserializeKeyAndNamespace( new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); } @@ -291,8 +134,8 @@ public class KvStateRequestSerializerTest { TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; long expectedValue = Long.MAX_VALUE - 1292929292L; - byte[] serializedValue = KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer); - long actualValue = KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); + byte[] serializedValue = KvStateSerializer.serializeValue(expectedValue, valueSerializer); + long actualValue = KvStateSerializer.deserializeValue(serializedValue, valueSerializer); assertEquals(expectedValue, actualValue); } @@ -302,7 +145,7 @@ public class KvStateRequestSerializerTest { */ @Test(expected = IOException.class) public void testDeserializeValueEmpty() throws Exception { - KvStateRequestSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE); + KvStateSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE); } /** @@ -311,7 +154,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeValueTooShort() throws Exception { // 1 byte (incomplete Long) - KvStateRequestSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE); + KvStateSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE); } /** @@ -320,7 +163,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeValueTooMany1() throws Exception { // Long + 1 byte - KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, + KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, LongSerializer.INSTANCE); } @@ -330,7 +173,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeValueTooMany2() throws Exception { // Long + 2 bytes - KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, + KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, LongSerializer.INSTANCE); } @@ -363,7 +206,7 @@ public class KvStateRequestSerializerTest { /** * Verifies that the serialization of a list using the given list state - * matches the deserialization with {@link KvStateRequestSerializer#deserializeList}. + * matches the deserialization with {@link KvStateSerializer#deserializeList}. * * @param key * key of the list state @@ -390,19 +233,19 @@ public class KvStateRequestSerializerTest { } final byte[] serializedKey = - KvStateRequestSerializer.serializeKeyAndNamespace( + KvStateSerializer.serializeKeyAndNamespace( key, LongSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); final byte[] serializedValues = listState.getSerializedValue(serializedKey); - List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer); + List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer); assertEquals(expectedValues, actualValues); // Single value long expectedValue = ThreadLocalRandom.current().nextLong(); - byte[] serializedValue = KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer); - List<Long> actualValue = KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer); + byte[] serializedValue = KvStateSerializer.serializeValue(expectedValue, valueSerializer); + List<Long> actualValue = KvStateSerializer.deserializeList(serializedValue, valueSerializer); assertEquals(1, actualValue.size()); assertEquals(expectedValue, actualValue.get(0).longValue()); } @@ -412,7 +255,7 @@ public class KvStateRequestSerializerTest { */ @Test public void testDeserializeListEmpty() throws Exception { - List<Long> actualValue = KvStateRequestSerializer + List<Long> actualValue = KvStateSerializer .deserializeList(new byte[] {}, LongSerializer.INSTANCE); assertEquals(0, actualValue.size()); } @@ -423,7 +266,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeListTooShort1() throws Exception { // 1 byte (incomplete Long) - KvStateRequestSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE); + KvStateSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE); } /** @@ -432,7 +275,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeListTooShort2() throws Exception { // Long + 1 byte (separator) + 1 byte (incomplete Long) - KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, + KvStateSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, LongSerializer.INSTANCE); } @@ -466,7 +309,7 @@ public class KvStateRequestSerializerTest { /** * Verifies that the serialization of a map using the given map state - * matches the deserialization with {@link KvStateRequestSerializer#deserializeList}. + * matches the deserialization with {@link KvStateSerializer#deserializeList}. * * @param key * key of the map state @@ -497,13 +340,13 @@ public class KvStateRequestSerializerTest { mapState.put(0L, null); final byte[] serializedKey = - KvStateRequestSerializer.serializeKeyAndNamespace( + KvStateSerializer.serializeKeyAndNamespace( key, LongSerializer.INSTANCE, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); final byte[] serializedValues = mapState.getSerializedValue(serializedKey); - Map<Long, String> actualValues = KvStateRequestSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer); + Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer); assertEquals(expectedValues.size(), actualValues.size()); for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) { assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue()); @@ -515,12 +358,12 @@ public class KvStateRequestSerializerTest { String expectedValue = Long.toString(expectedKey); byte[] isNull = {0}; - baos.write(KvStateRequestSerializer.serializeValue(expectedKey, userKeySerializer)); + baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer)); baos.write(isNull); - baos.write(KvStateRequestSerializer.serializeValue(expectedValue, userValueSerializer)); + baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer)); byte[] serializedValue = baos.toByteArray(); - Map<Long, String> actualValue = KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); + Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); assertEquals(1, actualValue.size()); assertEquals(expectedValue, actualValue.get(expectedKey)); } @@ -530,7 +373,7 @@ public class KvStateRequestSerializerTest { */ @Test public void testDeserializeMapEmpty() throws Exception { - Map<Long, String> actualValue = KvStateRequestSerializer + Map<Long, String> actualValue = KvStateSerializer .deserializeMap(new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); assertEquals(0, actualValue.size()); } @@ -541,7 +384,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeMapTooShort1() throws Exception { // 1 byte (incomplete Key) - KvStateRequestSerializer.deserializeMap(new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); + KvStateSerializer.deserializeMap(new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); } /** @@ -550,7 +393,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeMapTooShort2() throws Exception { // Long (Key) + 1 byte (incomplete Value) - KvStateRequestSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0}, + KvStateSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0}, LongSerializer.INSTANCE, LongSerializer.INSTANCE); } @@ -560,7 +403,7 @@ public class KvStateRequestSerializerTest { @Test(expected = IOException.class) public void testDeserializeMapTooShort3() throws Exception { // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte (incomplete Key2) - KvStateRequestSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3}, + KvStateSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3}, LongSerializer.INSTANCE, LongSerializer.INSTANCE); } http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index ed280a7..dbf131f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -56,7 +56,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.heap.AbstractHeapState; import org.apache.flink.runtime.state.heap.NestedMapsStateTable; import org.apache.flink.runtime.state.heap.StateTable; @@ -3070,7 +3070,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception { - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( key, keySerializer, namespace, namespaceSerializer); byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace); @@ -3078,7 +3078,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten if (serializedValue == null) { return null; } else { - return KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); + return KvStateSerializer.deserializeValue(serializedValue, valueSerializer); } } @@ -3094,7 +3094,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) throws Exception { - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( key, keySerializer, namespace, namespaceSerializer); byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace); @@ -3102,7 +3102,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten if (serializedValue == null) { return null; } else { - return KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer); + return KvStateSerializer.deserializeList(serializedValue, valueSerializer); } } @@ -3120,7 +3120,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten TypeSerializer<UV> userValueSerializer ) throws Exception { - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( key, keySerializer, namespace, namespaceSerializer); byte[] serializedValue = kvState.getSerializedValue(serializedKeyAndNamespace); @@ -3128,7 +3128,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten if (serializedValue == null) { return null; } else { - return KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); + return KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java deleted file mode 100644 index 8ac3d2f..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java +++ /dev/null @@ -1,1128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; -import org.apache.flink.runtime.query.QueryableStateClient; -import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.QueryableStateStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.OnSuccess; -import akka.dispatch.Recover; -import akka.pattern.Patterns; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongArray; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Base class for queryable state integration tests with a configurable state backend. - */ -public abstract class AbstractQueryableStateITCase extends TestLogger { - - protected static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000, TimeUnit.SECONDS); - private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); - - protected static ActorSystem testActorSystem; - - /** - * State backend to use. - */ - protected AbstractStateBackend stateBackend; - - /** - * Shared between all the test. Make sure to have at least NUM_SLOTS - * available after your test finishes, e.g. cancel the job you submitted. - */ - protected static FlinkMiniCluster cluster; - - protected static int maxParallelism; - - @Before - public void setUp() throws Exception { - // NOTE: do not use a shared instance for all tests as the tests may brake - this.stateBackend = createStateBackend(); - - Assert.assertNotNull(cluster); - - maxParallelism = cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) * - cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - } - - /** - * Creates a state backend instance which is used in the {@link #setUp()} method before each - * test case. - * - * @return a state backend instance for each unit test - */ - protected abstract AbstractStateBackend createStateBackend() throws Exception; - - /** - * Runs a simple topology producing random (key, 1) pairs at the sources (where - * number of keys is in fixed in range 0...numKeys). The records are keyed and - * a reducing queryable state instance is created, which sums up the records. - * - * <p>After submitting the job in detached mode, the QueryableStateCLient is used - * to query the counts of each key in rounds until all keys have non-zero counts. - */ - @Test - @SuppressWarnings("unchecked") - public void testQueryableState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 256; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "hakuna-matata"; - - final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7143749578983540352L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - cluster.submitJobDetached(jobGraph); - - // - // Start querying - // - jobId = jobGraph.getJobID(); - - final AtomicLongArray counts = new AtomicLongArray(numKeys); - - boolean allNonZero = false; - while (!allNonZero && deadline.hasTimeLeft()) { - allNonZero = true; - - final List<Future<Tuple2<Integer, Long>>> futures = new ArrayList<>(numKeys); - - for (int i = 0; i < numKeys; i++) { - final int key = i; - - if (counts.get(key) > 0) { - // Skip this one - continue; - } else { - allNonZero = false; - } - - Future<Tuple2<Integer, Long>> result = getKvStateWithRetries( - client, - jobId, - queryName, - key, - BasicTypeInfo.INT_TYPE_INFO, - reducingState, - QUERY_RETRY_DELAY, - false); - - result.onSuccess(new OnSuccess<Tuple2<Integer, Long>>() { - @Override - public void onSuccess(Tuple2<Integer, Long> result) throws Throwable { - counts.set(key, result.f1); - assertEquals("Key mismatch", key, result.f0.intValue()); - } - }, testActorSystem.dispatcher()); - - futures.add(result); - } - - Future<Iterable<Tuple2<Integer, Long>>> futureSequence = Futures.sequence( - futures, - testActorSystem.dispatcher()); - - Await.ready(futureSequence, deadline.timeLeft()); - } - - assertTrue("Not all keys are non-zero", allNonZero); - - // All should be non-zero - for (int i = 0; i < numKeys; i++) { - long count = counts.get(i); - assertTrue("Count at position " + i + " is " + count, count > 0); - } - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Tests that duplicate query registrations fail the job at the JobManager. - * - * <b>NOTE: </b> This test is only in the non-HA variant of the tests because - * in the HA mode we use the actual JM code which does not recognize the - * {@code NotifyWhenJobStatus} message. * - */ - @Test - public void testDuplicateRegistrationFailsJob() throws Exception { - final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 256; - - JobID jobId = null; - - try { - // - // Test program - // - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestKeyRangeSource(numKeys)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>( - "any-name", - new SumReduce(), - source.getType()); - - final String queryName = "duplicate-me"; - - final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -4126824763829132959L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName, reducingState); - - final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -6265024000462809436L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState(queryName); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - Future<TestingJobManagerMessages.JobStatusIs> failedFuture = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)); - - cluster.submitJobDetached(jobGraph); - - TestingJobManagerMessages.JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft()); - assertEquals(JobStatus.FAILED, jobStatus.state()); - - // Get the job and check the cause - JobManagerMessages.JobFound jobFound = Await.result( - cluster.getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)), - deadline.timeLeft()); - - String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString(); - - assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException")); - int causedByIndex = failureCause.indexOf("Caused by: "); - String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length()); - assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException")); - assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName)); - } finally { - // Free cluster resources - if (jobId != null) { - Future<JobManagerMessages.CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - } - } - - /** - * Tests simple value state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The tests succeeds after each subtask index is queried with - * value numElements (the latest element updated the state). - */ - @Test - public void testValueState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>( - "any", - source.getType()); - - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7662520075515707428L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - long expected = numElements; - - executeQuery(deadline, client, jobId, "hakuna", valueState, expected); - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Similar tests as {@link #testValueState()} but before submitting the - * job, we already issue one request which fails. - */ - @Test - public void testQueryNonStartedJobState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = new ValueStateDescriptor<>( - "any", - source.getType(), - null); - - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 7480503339992214681L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - // Now query - long expected = numElements; - - // query once - client.getKvState( - jobId, - queryableState.getQueryableStateName(), - 0, - VoidNamespace.INSTANCE, - BasicTypeInfo.INT_TYPE_INFO, - VoidNamespaceTypeInfo.INSTANCE, - valueState); - - cluster.submitJobDetached(jobGraph); - - executeQuery(deadline, client, jobId, "hakuna", valueState, expected); - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * <tt>expected</tt> equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final StateDescriptor<?, Tuple2<Integer, Long>> stateDescriptor, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - stateDescriptor, - QUERY_RETRY_DELAY, - false); - - Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft()); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50); - } - } - - assertTrue("Did not succeed query", success); - } - } - - /** - * Retry a query for state for keys between 0 and {@link #maxParallelism} until - * <tt>expected</tt> equals the value of the result tuple's second field. - */ - private void executeQuery( - final Deadline deadline, - final QueryableStateClient client, - final JobID jobId, - final String queryableStateName, - final TypeSerializer<Tuple2<Integer, Long>> valueSerializer, - final long expected) throws Exception { - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableStateName, - key, - BasicTypeInfo.INT_TYPE_INFO, - valueSerializer, - QUERY_RETRY_DELAY, - false); - - Tuple2<Integer, Long> value = Await.result(future, deadline.timeLeft()); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50); - } - } - - assertTrue("Did not succeed query", success); - } - } - - /** - * Tests simple value state queryable state instance with a default value - * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) - * tuples, the key is mapped to 1 but key 0 is queried which should throw - * a {@link UnknownKeyOrNamespace} exception. - * - * @throws UnknownKeyOrNamespace thrown due querying a non-existent key - */ - @Test(expected = UnknownKeyOrNamespace.class) - public void testValueStateDefault() throws - Exception, UnknownKeyOrNamespace { - - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies - .fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state - ValueStateDescriptor<Tuple2<Integer, Long>> valueState = - new ValueStateDescriptor<>( - "any", - source.getType(), - Tuple2.of(0, 1337L)); - - // only expose key "1" - QueryableStateStream<Integer, Tuple2<Integer, Long>> - queryableState = - source.keyBy( - new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 4509274556892655887L; - - @Override - public Integer getKey( - Tuple2<Integer, Long> value) throws - Exception { - return 1; - } - }).asQueryableState("hakuna", valueState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - int key = 0; - Future<Tuple2<Integer, Long>> future = getKvStateWithRetries(client, - jobId, - queryableState.getQueryableStateName(), - key, - BasicTypeInfo.INT_TYPE_INFO, - valueState, - QUERY_RETRY_DELAY, - true); - - Await.result(future, deadline.timeLeft()); - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), - deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply( - CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Tests simple value state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The tests succeeds after each subtask index is queried with - * value numElements (the latest element updated the state). - * - * <p>This is the same as the simple value state test, but uses the API shortcut. - */ - @Test - public void testValueStateShortcut() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Value state shortcut - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 9168901838808830068L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("matata"); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - long expected = numElements; - - executeQuery(deadline, client, jobId, "matata", - queryableState.getValueSerializer(), expected); - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Tests simple folding state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The folding state sums these up and maps them to Strings. The - * test succeeds after each subtask index is queried with result n*(n+1)/2 - * (as a String). - */ - @Test - public void testFoldingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Folding state - FoldingStateDescriptor<Tuple2<Integer, Long>, String> foldingState = - new FoldingStateDescriptor<>( - "any", - "0", - new SumFold(), - StringSerializer.INSTANCE); - - QueryableStateStream<Integer, String> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = -842809958106747539L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("pumba", foldingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Now query - String expected = Integer.toString(numElements * (numElements + 1) / 2); - - for (int key = 0; key < maxParallelism; key++) { - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future<String> future = getKvStateWithRetries(client, - jobId, - queryableState.getQueryableStateName(), - key, - BasicTypeInfo.INT_TYPE_INFO, - foldingState, - QUERY_RETRY_DELAY, - false); - - String value = Await.result(future, deadline.timeLeft()); - if (expected.equals(value)) { - success = true; - } else { - // Retry - Thread.sleep(50); - } - } - - assertTrue("Did not succeed query", success); - } - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - /** - * Tests simple reducing state queryable state instance. Each source emits - * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then - * queried. The reducing state instance sums these up. The test succeeds - * after each subtask index is queried with result n*(n+1)/2. - */ - @Test - public void testReducingState() throws Exception { - // Config - final Deadline deadline = TEST_TIMEOUT.fromNow(); - - final int numElements = 1024; - - final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); - - JobID jobId = null; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStateBackend(stateBackend); - env.setParallelism(maxParallelism); - // Very important, because cluster is shared between tests and we - // don't explicitly check that all slots are available before - // submitting. - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - - DataStream<Tuple2<Integer, Long>> source = env - .addSource(new TestAscendingValueSource(numElements)); - - // Reducing state - ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = - new ReducingStateDescriptor<>( - "any", - new SumReduce(), - source.getType()); - - QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState = - source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() { - private static final long serialVersionUID = 8470749712274833552L; - - @Override - public Integer getKey(Tuple2<Integer, Long> value) throws Exception { - return value.f0; - } - }).asQueryableState("jungle", reducingState); - - // Submit the job graph - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobId = jobGraph.getJobID(); - - cluster.submitJobDetached(jobGraph); - - // Wait until job is running - - // Now query - long expected = numElements * (numElements + 1) / 2; - - executeQuery(deadline, client, jobId, "jungle", reducingState, expected); - } finally { - // Free cluster resources - if (jobId != null) { - Future<CancellationSuccess> cancellation = cluster - .getLeaderGateway(deadline.timeLeft()) - .ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft()) - .mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class)); - - Await.ready(cancellation, deadline.timeLeft()); - } - - client.shutDown(); - } - } - - private static <K, V> Future<V> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final TypeSerializer<V> valueTypeSerializer, - final FiniteDuration retryDelay, - final boolean failForUnknownKeyOrNamespace) { - - return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer) - .recoverWith(new Recover<Future<V>>() { - @Override - public Future<V> recover(Throwable failure) throws Throwable { - if (failure instanceof AssertionError) { - return Futures.failed(failure); - } else if (failForUnknownKeyOrNamespace && - (failure instanceof UnknownKeyOrNamespace)) { - return Futures.failed(failure); - } else { - // At startup some failures are expected - // due to races. Make sure that they don't - // fail this test. - return Patterns.after( - retryDelay, - testActorSystem.scheduler(), - testActorSystem.dispatcher(), - new Callable<Future<V>>() { - @Override - public Future<V> call() throws Exception { - return getKvStateWithRetries( - client, - jobId, - queryName, - key, - keyTypeInfo, - valueTypeSerializer, - retryDelay, - failForUnknownKeyOrNamespace); - } - }); - } - } - }, testActorSystem.dispatcher()); - - } - - private static <K, V> Future<V> getKvStateWithRetries( - final QueryableStateClient client, - final JobID jobId, - final String queryName, - final K key, - final TypeInformation<K> keyTypeInfo, - final StateDescriptor<?, V> stateDescriptor, - final FiniteDuration retryDelay, - final boolean failForUnknownKeyOrNamespace) { - - return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor) - .recoverWith(new Recover<Future<V>>() { - @Override - public Future<V> recover(Throwable failure) throws Throwable { - if (failure instanceof AssertionError) { - return Futures.failed(failure); - } else if (failForUnknownKeyOrNamespace && - (failure instanceof UnknownKeyOrNamespace)) { - return Futures.failed(failure); - } else { - // At startup some failures are expected - // due to races. Make sure that they don't - // fail this test. - return Patterns.after( - retryDelay, - testActorSystem.scheduler(), - testActorSystem.dispatcher(), - new Callable<Future<V>>() { - @Override - public Future<V> call() throws Exception { - return getKvStateWithRetries( - client, - jobId, - queryName, - key, - keyTypeInfo, - stateDescriptor, - retryDelay, - failForUnknownKeyOrNamespace); - } - }); - } - } - }, testActorSystem.dispatcher()); - } - - /** - * Test source producing (key, 0)..(key, maxValue) with key being the sub - * task index. - * - * <p>After all tuples have been emitted, the source waits to be cancelled - * and does not immediately finish. - */ - private static class TestAscendingValueSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> { - - private static final long serialVersionUID = 1459935229498173245L; - - private final long maxValue; - private volatile boolean isRunning = true; - - TestAscendingValueSource(long maxValue) { - Preconditions.checkArgument(maxValue >= 0); - this.maxValue = maxValue; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - } - - @Override - public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception { - // f0 => key - int key = getRuntimeContext().getIndexOfThisSubtask(); - Tuple2<Integer, Long> record = new Tuple2<>(key, 0L); - - long currentValue = 0; - while (isRunning && currentValue <= maxValue) { - synchronized (ctx.getCheckpointLock()) { - record.f1 = currentValue; - ctx.collect(record); - } - - currentValue++; - } - - while (isRunning) { - synchronized (this) { - this.wait(); - } - } - } - - @Override - public void cancel() { - isRunning = false; - - synchronized (this) { - this.notifyAll(); - } - } - - } - - /** - * Test source producing (key, 1) tuples with random key in key range (numKeys). - */ - protected static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple2<Integer, Long>> - implements CheckpointListener { - private static final long serialVersionUID = -5744725196953582710L; - - private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong(); - private final int numKeys; - private final ThreadLocalRandom random = ThreadLocalRandom.current(); - private volatile boolean isRunning = true; - - TestKeyRangeSource(int numKeys) { - this.numKeys = numKeys; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - LATEST_CHECKPOINT_ID.set(0); - } - } - - @Override - public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception { - // f0 => key - Tuple2<Integer, Long> record = new Tuple2<>(0, 1L); - - while (isRunning) { - synchronized (ctx.getCheckpointLock()) { - record.f0 = random.nextInt(numKeys); - ctx.collect(record); - } - // mild slow down - Thread.sleep(1); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - if (getRuntimeContext().getIndexOfThisSubtask() == 0) { - LATEST_CHECKPOINT_ID.set(checkpointId); - } - } - } - - /** - * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. - */ - private static class SumFold implements FoldFunction<Tuple2<Integer, Long>, String> { - private static final long serialVersionUID = -6249227626701264599L; - - @Override - public String fold(String accumulator, Tuple2<Integer, Long> value) throws Exception { - long acc = Long.valueOf(accumulator); - acc += value.f1; - return Long.toString(acc); - } - } - - /** - * Test {@link ReduceFunction} summing up its two arguments. - */ - protected static class SumReduce implements ReduceFunction<Tuple2<Integer, Long>> { - private static final long serialVersionUID = -8651235077342052336L; - - @Override - public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { - value1.f1 += value2.f1; - return value1; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java deleted file mode 100644 index cd89e00..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.apache.curator.test.TestingServer; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.rules.TemporaryFolder; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the NON-HA mode. - */ -public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { - - private static final int NUM_JMS = 2; - private static final int NUM_TMS = 4; - private static final int NUM_SLOTS_PER_TM = 4; - - private static TestingServer zkServer; - private static TemporaryFolder temporaryFolder; - - @BeforeClass - public static void setup() { - try { - zkServer = new TestingServer(); - temporaryFolder = new TemporaryFolder(); - temporaryFolder.create(); - - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); - config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); - config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); - - cluster = new TestingCluster(config, false); - cluster.start(); - - testActorSystem = AkkaUtils.createDefaultActorSystem(); - - // verify that we are in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() { - if (cluster != null) { - cluster.stop(); - cluster.awaitTermination(); - } - - testActorSystem.shutdown(); - testActorSystem.awaitTermination(); - - try { - zkServer.stop(); - zkServer.close(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - temporaryFolder.delete(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java deleted file mode 100644 index 5d5b671..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link FsStateBackend}. - */ -public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java deleted file mode 100644 index 22570b5..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link RocksDBStateBackend}. - */ -public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java deleted file mode 100644 index 0c628e4..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.contrib.streaming.state.PredefinedOptions; -import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.runtime.state.internal.InternalMapState; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; - -import java.io.File; - -import static org.mockito.Mockito.mock; - -/** - * Additional tests for the serialization and deserialization of {@link - * org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer} - * with a RocksDB state back-end. - */ -public final class KVStateRequestSerializerRocksDBTest { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - /** - * Extension of {@link RocksDBKeyedStateBackend} to make {@link - * #createListState(TypeSerializer, ListStateDescriptor)} public for use in - * the tests. - * - * @param <K> key type - */ - static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> { - - RocksDBKeyedStateBackend2( - final String operatorIdentifier, - final ClassLoader userCodeClassLoader, - final File instanceBasePath, - final DBOptions dbOptions, - final ColumnFamilyOptions columnFamilyOptions, - final TaskKvStateRegistry kvStateRegistry, - final TypeSerializer<K> keySerializer, - final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange, - final ExecutionConfig executionConfig) throws Exception { - - super(operatorIdentifier, userCodeClassLoader, - instanceBasePath, - dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange, executionConfig, false); - } - - @Override - public <N, T> InternalListState<N, T> createListState( - final TypeSerializer<N> namespaceSerializer, - final ListStateDescriptor<T> stateDesc) throws Exception { - - return super.createListState(namespaceSerializer, stateDesc); - } - } - - /** - * Tests list serialization and deserialization match. - * - * @see KvStateRequestSerializerTest#testListSerialization() - * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end - * test - */ - @Test - public void testListSerialization() throws Exception { - final long key = 0L; - - // objects for RocksDB state list serialisation - DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); - dbOptions.setCreateIfMissing(true); - ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); - final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend = - new RocksDBKeyedStateBackend2<>( - "no-op", - ClassLoader.getSystemClassLoader(), - temporaryFolder.getRoot(), - dbOptions, - columnFamilyOptions, - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), - new ExecutionConfig() - ); - longHeapKeyedStateBackend.restore(null); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend - .createListState(VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); - - KvStateRequestSerializerTest.testListSerialization(key, listState); - } - - /** - * Tests map serialization and deserialization match. - * - * @see KvStateRequestSerializerTest#testMapSerialization() - * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end - * test - */ - @Test - public void testMapSerialization() throws Exception { - final long key = 0L; - - // objects for RocksDB state list serialisation - DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); - dbOptions.setCreateIfMissing(true); - ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); - final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = - new RocksDBKeyedStateBackend<>( - "no-op", - ClassLoader.getSystemClassLoader(), - temporaryFolder.getRoot(), - dbOptions, - columnFamilyOptions, - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - 1, new KeyGroupRange(0, 0), - new ExecutionConfig(), - false); - longHeapKeyedStateBackend.restore(null); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) - longHeapKeyedStateBackend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); - - KvStateRequestSerializerTest.testMapSerialization(key, mapState); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java deleted file mode 100644 index 83f86e4..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.testingUtils.TestingCluster; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; - -import static org.junit.Assert.fail; - -/** - * Base class with the cluster configuration for the tests on the HA mode. - */ -public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { - - private static final int NUM_TMS = 2; - private static final int NUM_SLOTS_PER_TM = 4; - - @BeforeClass - public static void setup() { - try { - Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); - config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); - - cluster = new TestingCluster(config, false); - cluster.start(true); - - testActorSystem = AkkaUtils.createDefaultActorSystem(); - - // verify that we are not in HA mode - Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @AfterClass - public static void tearDown() { - try { - cluster.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java deleted file mode 100644 index d4dbe83..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link FsStateBackend}. - */ -public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java deleted file mode 100644 index a15e6a4..0000000 --- a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.query; - -import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; - -import org.junit.Rule; -import org.junit.rules.TemporaryFolder; - -/** - * Several integration tests for queryable state using the {@link RocksDBStateBackend}. - */ -public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase { - - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @Override - protected AbstractStateBackend createStateBackend() throws Exception { - return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 524e718..1bb3732 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,7 @@ under the License. <module>flink-yarn</module> <module>flink-yarn-tests</module> <module>flink-fs-tests</module> + <module>flink-queryable-state</module> </modules> <properties>
