http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java new file mode 100644 index 0000000..15a5ff6 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.testingUtils.TestingCluster; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.fail; + +/** + * Base class with the cluster configuration for the tests on the NON-HA mode. + */ +public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { + + private static final int NUM_JMS = 2; + private static final int NUM_TMS = 4; + private static final int NUM_SLOTS_PER_TM = 4; + + private static TestingServer zkServer; + private static TemporaryFolder temporaryFolder; + + @BeforeClass + public static void setup() { + try { + zkServer = new TestingServer(); + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = new TestingCluster(config, false); + cluster.start(); + + testActorSystem = AkkaUtils.createDefaultActorSystem(); + + // verify that we are in HA mode + Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stop(); + cluster.awaitTermination(); + } + + testActorSystem.shutdown(); + testActorSystem.awaitTermination(); + + try { + zkServer.stop(); + zkServer.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + temporaryFolder.delete(); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java new file mode 100644 index 0000000..a2d3ad0 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link FsStateBackend}. + */ +public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java new file mode 100644 index 0000000..fda1171 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link RocksDBStateBackend}. + */ +public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java new file mode 100644 index 0000000..907e8a3 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.contrib.streaming.state.PredefinedOptions; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; + +import java.io.File; + +import static org.mockito.Mockito.mock; + +/** + * Additional tests for the serialization and deserialization using + * the KvStateSerializer with a RocksDB state back-end. + */ +public final class KVStateRequestSerializerRocksDBTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Extension of {@link RocksDBKeyedStateBackend} to make {@link + * #createListState(TypeSerializer, ListStateDescriptor)} public for use in + * the tests. + * + * @param <K> key type + */ + static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> { + + RocksDBKeyedStateBackend2( + final String operatorIdentifier, + final ClassLoader userCodeClassLoader, + final File instanceBasePath, + final DBOptions dbOptions, + final ColumnFamilyOptions columnFamilyOptions, + final TaskKvStateRegistry kvStateRegistry, + final TypeSerializer<K> keySerializer, + final int numberOfKeyGroups, + final KeyGroupRange keyGroupRange, + final ExecutionConfig executionConfig) throws Exception { + + super(operatorIdentifier, userCodeClassLoader, + instanceBasePath, + dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, + numberOfKeyGroups, keyGroupRange, executionConfig, false); + } + + @Override + public <N, T> InternalListState<N, T> createListState( + final TypeSerializer<N> namespaceSerializer, + final ListStateDescriptor<T> stateDesc) throws Exception { + + return super.createListState(namespaceSerializer, stateDesc); + } + } + + /** + * Tests list serialization and deserialization match. + * + * @see KvStateRequestSerializerTest#testListSerialization() + * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end + * test + */ + @Test + public void testListSerialization() throws Exception { + final long key = 0L; + + // objects for RocksDB state list serialisation + DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); + dbOptions.setCreateIfMissing(true); + ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); + final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend = + new RocksDBKeyedStateBackend2<>( + "no-op", + ClassLoader.getSystemClassLoader(), + temporaryFolder.getRoot(), + dbOptions, + columnFamilyOptions, + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + 1, new KeyGroupRange(0, 0), + new ExecutionConfig() + ); + longHeapKeyedStateBackend.restore(null); + longHeapKeyedStateBackend.setCurrentKey(key); + + final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend + .createListState(VoidNamespaceSerializer.INSTANCE, + new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + + KvStateRequestSerializerTest.testListSerialization(key, listState); + } + + /** + * Tests map serialization and deserialization match. + * + * @see KvStateRequestSerializerTest#testMapSerialization() + * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end + * test + */ + @Test + public void testMapSerialization() throws Exception { + final long key = 0L; + + // objects for RocksDB state list serialisation + DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); + dbOptions.setCreateIfMissing(true); + ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); + final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = + new RocksDBKeyedStateBackend<>( + "no-op", + ClassLoader.getSystemClassLoader(), + temporaryFolder.getRoot(), + dbOptions, + columnFamilyOptions, + mock(TaskKvStateRegistry.class), + LongSerializer.INSTANCE, + 1, new KeyGroupRange(0, 0), + new ExecutionConfig(), + false); + longHeapKeyedStateBackend.restore(null); + longHeapKeyedStateBackend.setCurrentKey(key); + + final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) + longHeapKeyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + KvStateRequestSerializerTest.testMapSerialization(key, mapState); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java new file mode 100644 index 0000000..c52acc8 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.testingUtils.TestingCluster; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +import static org.junit.Assert.fail; + +/** + * Base class with the cluster configuration for the tests on the HA mode. + */ +public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase { + + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 4; + + @BeforeClass + public static void setup() { + try { + Configuration config = new Configuration(); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); + config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); + config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); + + cluster = new TestingCluster(config, false); + cluster.start(true); + + testActorSystem = AkkaUtils.createDefaultActorSystem(); + + // verify that we are not in HA mode + Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE); + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @AfterClass + public static void tearDown() { + try { + cluster.shutdown(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + + if (testActorSystem != null) { + testActorSystem.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java new file mode 100644 index 0000000..caa315a --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link FsStateBackend}. + */ +public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java new file mode 100644 index 0000000..10e9b57 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.itcases; + +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link RocksDBStateBackend}. + */ +public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java new file mode 100644 index 0000000..d9a41a1 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.queryablestate.UnknownJobManager; +import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Status; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link AkkaKvStateLocationLookupService}. + */ +public class AkkaKvStateLocationLookupServiceTest extends TestLogger { + + /** The default timeout. */ + private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS); + + /** Test actor system shared between the tests. */ + private static ActorSystem testActorSystem; + + @BeforeClass + public static void setUp() throws Exception { + testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void tearDown() throws Exception { + if (testActorSystem != null) { + testActorSystem.shutdown(); + } + } + + /** + * Tests responses if no leader notification has been reported or leadership + * has been lost (leaderAddress = <code>null</code>). + */ + @Test + public void testNoJobManagerRegistered() throws Exception { + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); + Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); + + AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( + leaderRetrievalService, + testActorSystem, + TIMEOUT, + new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); + + lookupService.start(); + + // + // No leader registered initially => fail with UnknownJobManager + // + try { + JobID jobId = new JobID(); + String name = "coffee"; + + Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name); + + Await.result(locationFuture, TIMEOUT); + fail("Did not throw expected Exception"); + } catch (UnknownJobManager ignored) { + // Expected + } + + assertEquals("Received unexpected lookup", 0, received.size()); + + // + // Leader registration => communicate with new leader + // + UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID; + KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea"); + + ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected); + + String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); + + // Notify the service about a leader + leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId); + + JobID jobId = new JobID(); + String name = "tea"; + + // Verify that the leader response is handled + KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT); + assertEquals(expected, location); + + // Verify that the correct message was sent to the leader + assertEquals(1, received.size()); + + verifyLookupMsg(received.poll(), jobId, name); + + // + // Leader loss => fail with UnknownJobManager + // + leaderRetrievalService.notifyListener(null, null); + + try { + Future<KvStateLocation> locationFuture = lookupService + .getKvStateLookupInfo(new JobID(), "coffee"); + + Await.result(locationFuture, TIMEOUT); + fail("Did not throw expected Exception"); + } catch (UnknownJobManager ignored) { + // Expected + } + + // No new messages received + assertEquals(0, received.size()); + } + + /** + * Tests that messages are properly decorated with the leader session ID. + */ + @Test + public void testLeaderSessionIdChange() throws Exception { + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); + Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); + + AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( + leaderRetrievalService, + testActorSystem, + TIMEOUT, + new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); + + lookupService.start(); + + // Create test actors with random leader session IDs + KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt"); + UUID leaderSessionId1 = UUID.randomUUID(); + ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1); + String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1); + + KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper"); + UUID leaderSessionId2 = UUID.randomUUID(); + ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2); + String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2); + + JobID jobId = new JobID(); + + // + // Notify about first leader + // + leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1); + + KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT); + assertEquals(expected1, location); + + assertEquals(1, received.size()); + verifyLookupMsg(received.poll(), jobId, "rock"); + + // + // Notify about second leader + // + leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2); + + location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT); + assertEquals(expected2, location); + + assertEquals(1, received.size()); + verifyLookupMsg(received.poll(), jobId, "roll"); + } + + /** + * Tests that lookups are retried when no leader notification is available. + */ + @Test + public void testRetryOnUnknownJobManager() throws Exception { + final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>(); + + AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy = + new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() { + @Override + public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() { + return retryStrategies.poll(); + } + }; + + final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); + + AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( + leaderRetrievalService, + testActorSystem, + TIMEOUT, + retryStrategy); + + lookupService.start(); + + // + // Test call to retry + // + final AtomicBoolean hasRetried = new AtomicBoolean(); + retryStrategies.add( + new AkkaKvStateLocationLookupService.LookupRetryStrategy() { + @Override + public FiniteDuration getRetryDelay() { + return FiniteDuration.Zero(); + } + + @Override + public boolean tryRetry() { + if (hasRetried.compareAndSet(false, true)) { + return true; + } + return false; + } + }); + + Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir"); + + Await.ready(locationFuture, TIMEOUT); + assertTrue("Did not retry ", hasRetried.get()); + + // + // Test leader notification after retry + // + Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); + + KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic"); + ActorRef testActor = LookupResponseActor.create(received, null, expected); + final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); + + retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() { + @Override + public FiniteDuration getRetryDelay() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + + @Override + public boolean tryRetry() { + leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID); + return true; + } + }); + + KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT); + assertEquals(expected, location); + } + + @Test + public void testUnexpectedResponseType() throws Exception { + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); + Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); + + AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( + leaderRetrievalService, + testActorSystem, + TIMEOUT, + new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); + + lookupService.start(); + + // Create test actors with random leader session IDs + String expected = "unexpected-response-type"; + ActorRef testActor = LookupResponseActor.create(received, null, expected); + String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); + + leaderRetrievalService.notifyListener(testActorAddress, null); + + try { + Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT); + fail("Did not throw expected Exception"); + } catch (Throwable ignored) { + // Expected + } + } + + private static final class LookupResponseActor extends FlinkUntypedActor { + + /** Received lookup messages. */ + private final Queue<LookupKvStateLocation> receivedLookups; + + /** Responses on KvStateMessage.LookupKvStateLocation messages. */ + private final Queue<Object> lookupResponses; + + /** The leader session ID. */ + private UUID leaderSessionId; + + public LookupResponseActor( + Queue<LookupKvStateLocation> receivedLookups, + UUID leaderSessionId, Object... lookupResponses) { + + this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups"); + this.leaderSessionId = leaderSessionId; + this.lookupResponses = new ArrayDeque<>(); + + if (lookupResponses != null) { + for (Object resp : lookupResponses) { + this.lookupResponses.add(resp); + } + } + } + + @Override + public void handleMessage(Object message) throws Exception { + if (message instanceof LookupKvStateLocation) { + // Add to received lookups queue + receivedLookups.add((LookupKvStateLocation) message); + + Object msg = lookupResponses.poll(); + if (msg != null) { + if (msg instanceof Throwable) { + sender().tell(new Status.Failure((Throwable) msg), self()); + } else { + sender().tell(new Status.Success(msg), self()); + } + } + } else if (message instanceof UUID) { + this.leaderSessionId = (UUID) message; + } else { + LOG.debug("Received unhandled message: {}", message); + } + } + + @Override + protected UUID getLeaderSessionID() { + return leaderSessionId; + } + + private static ActorRef create( + Queue<LookupKvStateLocation> receivedLookups, + UUID leaderSessionId, + Object... lookupResponses) { + + return testActorSystem.actorOf(Props.create( + LookupResponseActor.class, + receivedLookups, + leaderSessionId, + lookupResponses)); + } + } + + private static void verifyLookupMsg( + LookupKvStateLocation lookUpMsg, + JobID expectedJobId, + String expectedName) { + + assertNotNull(lookUpMsg); + assertEquals(expectedJobId, lookUpMsg.getJobId()); + assertEquals(expectedName, lookUpMsg.getRegistrationName()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java new file mode 100644 index 0000000..0b97bda --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.queryablestate.client.KvStateClientHandler; +import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + +import org.junit.Test; + +import java.nio.channels.ClosedChannelException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for {@link KvStateClientHandler}. + */ +public class KvStateClientHandlerTest { + + /** + * Tests that on reads the expected callback methods are called and read + * buffers are recycled. + */ + @Test + public void testReadCallbacksAndBufferRecycling() throws Exception { + KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class); + + EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback)); + + // + // Request success + // + ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( + channel.alloc(), + 1222112277, + new byte[0]); + buf.skipBytes(4); // skip frame length + + // Verify callback + channel.writeInbound(buf); + verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class)); + assertEquals("Buffer not recycled", 0, buf.refCnt()); + + // + // Request failure + // + buf = MessageSerializer.serializeKvStateRequestFailure( + channel.alloc(), + 1222112278, + new RuntimeException("Expected test Exception")); + buf.skipBytes(4); // skip frame length + + // Verify callback + channel.writeInbound(buf); + verify(callback, times(1)).onRequestFailure(eq(1222112278L), any(RuntimeException.class)); + assertEquals("Buffer not recycled", 0, buf.refCnt()); + + // + // Server failure + // + buf = MessageSerializer.serializeServerFailure( + channel.alloc(), + new RuntimeException("Expected test Exception")); + buf.skipBytes(4); // skip frame length + + // Verify callback + channel.writeInbound(buf); + verify(callback, times(1)).onFailure(any(RuntimeException.class)); + + // + // Unexpected messages + // + buf = channel.alloc().buffer(4).writeInt(1223823); + + // Verify callback + channel.writeInbound(buf); + verify(callback, times(2)).onFailure(any(IllegalStateException.class)); + assertEquals("Buffer not recycled", 0, buf.refCnt()); + + // + // Exception caught + // + channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); + verify(callback, times(3)).onFailure(any(RuntimeException.class)); + + // + // Channel inactive + // + channel.pipeline().fireChannelInactive(); + verify(callback, times(4)).onFailure(any(ClosedChannelException.class)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java new file mode 100644 index 0000000..a2850b3 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java @@ -0,0 +1,752 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.queryablestate.client.KvStateClient; +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.NetUtils; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.junit.AfterClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.ConnectException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link KvStateClient}. + */ +public class KvStateClientTest { + + private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class); + + // Thread pool for client bootstrap (shared between tests) + private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); + + @AfterClass + public static void tearDown() throws Exception { + if (NIO_GROUP != null) { + NIO_GROUP.shutdownGracefully(); + } + } + + /** + * Tests simple queries, of which half succeed and half fail. + */ + @Test + public void testSimpleRequests() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateClient client = null; + Channel serverChannel = null; + + try { + client = new KvStateClient(1, stats); + + // Random result + final byte[] expected = new byte[1024]; + ThreadLocalRandom.current().nextBytes(expected); + + final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add((ByteBuf) msg); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + List<Future<byte[]>> futures = new ArrayList<>(); + + int numQueries = 1024; + + for (int i = 0; i < numQueries; i++) { + futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); + } + + // Respond to messages + Exception testException = new RuntimeException("Expected test Exception"); + + for (int i = 0; i < numQueries; i++) { + ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + + Channel ch = channel.get(); + assertNotNull("Channel not active", ch); + + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); + + buf.release(); + + if (i % 2 == 0) { + ByteBuf response = MessageSerializer.serializeKvStateRequestResult( + serverChannel.alloc(), + request.getRequestId(), + expected); + + ch.writeAndFlush(response); + } else { + ByteBuf response = MessageSerializer.serializeKvStateRequestFailure( + serverChannel.alloc(), + request.getRequestId(), + testException); + + ch.writeAndFlush(response); + } + } + + for (int i = 0; i < numQueries; i++) { + if (i % 2 == 0) { + byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft()); + assertArrayEquals(expected, serializedResult); + } else { + try { + Await.result(futures.get(i), deadline.timeLeft()); + fail("Did not throw expected Exception"); + } catch (RuntimeException ignored) { + // Expected + } + } + } + + assertEquals(numQueries, stats.getNumRequests()); + int expectedRequests = numQueries / 2; + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || + stats.getNumFailed() != expectedRequests)) { + Thread.sleep(100); + } + + assertEquals(expectedRequests, stats.getNumSuccessful()); + assertEquals(expectedRequests, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutDown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0, stats.getNumConnections()); + } + } + + /** + * Tests that a request to an unavailable host is failed with ConnectException. + */ + @Test + public void testRequestUnavailableHost() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + KvStateClient client = null; + + try { + client = new KvStateClient(1, stats); + + int availablePort = NetUtils.getAvailablePort(); + + KvStateServerAddress serverAddress = new KvStateServerAddress( + InetAddress.getLocalHost(), + availablePort); + + Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); + + try { + Await.result(future, deadline.timeLeft()); + fail("Did not throw expected ConnectException"); + } catch (ConnectException ignored) { + // Expected + } + } finally { + if (client != null) { + client.shutDown(); + } + + assertEquals("Channel leak", 0, stats.getNumConnections()); + } + } + + /** + * Multiple threads concurrently fire queries. + */ + @Test + public void testConcurrentQueries() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + ExecutorService executor = null; + KvStateClient client = null; + Channel serverChannel = null; + + final byte[] serializedResult = new byte[1024]; + ThreadLocalRandom.current().nextBytes(serializedResult); + + try { + int numQueryTasks = 4; + final int numQueriesPerTask = 1024; + + executor = Executors.newFixedThreadPool(numQueryTasks); + + client = new KvStateClient(1, stats); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf buf = (ByteBuf) msg; + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); + + buf.release(); + + ByteBuf response = MessageSerializer.serializeKvStateRequestResult( + ctx.alloc(), + request.getRequestId(), + serializedResult); + + ctx.channel().writeAndFlush(response); + } + }); + + final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + final KvStateClient finalClient = client; + Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() { + @Override + public List<Future<byte[]>> call() throws Exception { + List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask); + + for (int i = 0; i < numQueriesPerTask; i++) { + results.add(finalClient.getKvState( + serverAddress, + new KvStateID(), + new byte[0])); + } + + return results; + } + }; + + // Submit query tasks + List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>(); + for (int i = 0; i < numQueryTasks; i++) { + futures.add(executor.submit(queryTask)); + } + + // Verify results + for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) { + List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + for (Future<byte[]> result : results) { + byte[] actual = Await.result(result, deadline.timeLeft()); + assertArrayEquals(serializedResult, actual); + } + } + + int totalQueries = numQueryTasks * numQueriesPerTask; + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { + Thread.sleep(100); + } + + assertEquals(totalQueries, stats.getNumRequests()); + assertEquals(totalQueries, stats.getNumSuccessful()); + } finally { + if (executor != null) { + executor.shutdown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + if (client != null) { + client.shutDown(); + } + + assertEquals("Channel leak", 0, stats.getNumConnections()); + } + } + + /** + * Tests that a server failure closes the connection and removes it from + * the established connections. + */ + @Test + public void testFailureClosesChannel() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateClient client = null; + Channel serverChannel = null; + + try { + client = new KvStateClient(1, stats); + + final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.add((ByteBuf) msg); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + // Requests + List<Future<byte[]>> futures = new ArrayList<>(); + futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); + futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); + + ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + buf.release(); + + buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + assertNotNull("Receive timed out", buf); + buf.release(); + + assertEquals(1, stats.getNumConnections()); + + Channel ch = channel.get(); + assertNotNull("Channel not active", ch); + + // Respond with failure + ch.writeAndFlush(MessageSerializer.serializeServerFailure( + serverChannel.alloc(), + new RuntimeException("Expected test server failure"))); + + try { + Await.result(futures.remove(0), deadline.timeLeft()); + fail("Did not throw expected server failure"); + } catch (RuntimeException ignored) { + // Expected + } + + try { + Await.result(futures.remove(0), deadline.timeLeft()); + fail("Did not throw expected server failure"); + } catch (RuntimeException ignored) { + // Expected + } + + assertEquals(0, stats.getNumConnections()); + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || + stats.getNumFailed() != 2)) { + Thread.sleep(100); + } + + assertEquals(2, stats.getNumRequests()); + assertEquals(0, stats.getNumSuccessful()); + assertEquals(2, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutDown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0, stats.getNumConnections()); + } + } + + /** + * Tests that a server channel close, closes the connection and removes it + * from the established connections. + */ + @Test + public void testServerClosesChannel() throws Exception { + Deadline deadline = TEST_TIMEOUT.fromNow(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateClient client = null; + Channel serverChannel = null; + + try { + client = new KvStateClient(1, stats); + + final AtomicBoolean received = new AtomicBoolean(); + final AtomicReference<Channel> channel = new AtomicReference<>(); + + serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + received.set(true); + } + }); + + KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); + + // Requests + Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); + + while (!received.get() && deadline.hasTimeLeft()) { + Thread.sleep(50); + } + assertTrue("Receive timed out", received.get()); + + assertEquals(1, stats.getNumConnections()); + + channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + try { + Await.result(future, deadline.timeLeft()); + fail("Did not throw expected server failure"); + } catch (ClosedChannelException ignored) { + // Expected + } + + assertEquals(0, stats.getNumConnections()); + + // Counts can take some time to propagate + while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || + stats.getNumFailed() != 1)) { + Thread.sleep(100); + } + + assertEquals(1, stats.getNumRequests()); + assertEquals(0, stats.getNumSuccessful()); + assertEquals(1, stats.getNumFailed()); + } finally { + if (client != null) { + client.shutDown(); + } + + if (serverChannel != null) { + serverChannel.close(); + } + + assertEquals("Channel leak", 0, stats.getNumConnections()); + } + } + + /** + * Tests multiple clients querying multiple servers until 100k queries have + * been processed. At this point, the client is shut down and its verified + * that all ongoing requests are failed. + */ + @Test + public void testClientServerIntegration() throws Exception { + // Config + final int numServers = 2; + final int numServerEventLoopThreads = 2; + final int numServerQueryThreads = 2; + + final int numClientEventLoopThreads = 4; + final int numClientsTasks = 8; + + final int batchSize = 16; + + final int numKeyGroups = 1; + + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + KvStateRegistry dummyRegistry = new KvStateRegistry(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(dummyRegistry); + + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); + + final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); + + KvStateClient client = null; + ExecutorService clientTaskExecutor = null; + final KvStateServer[] server = new KvStateServer[numServers]; + + try { + client = new KvStateClient(numClientEventLoopThreads, clientStats); + clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); + + // Create state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("any"); + + // Create servers + KvStateRegistry[] registry = new KvStateRegistry[numServers]; + AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; + final KvStateID[] ids = new KvStateID[numServers]; + + for (int i = 0; i < numServers; i++) { + registry[i] = new KvStateRegistry(); + serverStats[i] = new AtomicKvStateRequestStats(); + server[i] = new KvStateServerImpl( + InetAddress.getLocalHost(), + 0, + numServerEventLoopThreads, + numServerQueryThreads, + registry[i], + serverStats[i]); + + server[i].start(); + + backend.setCurrentKey(1010 + i); + + // Value per server + ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + state.update(201 + i); + + // we know it must be a KvStat but this is not exposed to the user via State + InternalKvState<?> kvState = (InternalKvState<?>) state; + + // Register KvState (one state instance for all server) + ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); + } + + final KvStateClient finalClient = client; + Callable<Void> queryTask = new Callable<Void>() { + @Override + public Void call() throws Exception { + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // Random server permutation + List<Integer> random = new ArrayList<>(); + for (int j = 0; j < batchSize; j++) { + random.add(j); + } + Collections.shuffle(random); + + // Dispatch queries + List<Future<byte[]>> futures = new ArrayList<>(batchSize); + + for (int j = 0; j < batchSize; j++) { + int targetServer = random.get(j) % numServers; + + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + 1010 + targetServer, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + futures.add(finalClient.getKvState( + server[targetServer].getAddress(), + ids[targetServer], + serializedKeyAndNamespace)); + } + + // Verify results + for (int j = 0; j < batchSize; j++) { + int targetServer = random.get(j) % numServers; + + Future<byte[]> future = futures.get(j); + byte[] buf = Await.result(future, timeout); + int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE); + assertEquals(201 + targetServer, value); + } + } + } + }; + + // Submit tasks + List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>(); + for (int i = 0; i < numClientsTasks; i++) { + taskFutures.add(clientTaskExecutor.submit(queryTask)); + } + + long numRequests; + while ((numRequests = clientStats.getNumRequests()) < 100_000) { + Thread.sleep(100); + LOG.info("Number of requests {}/100_000", numRequests); + } + + // Shut down + client.shutDown(); + + for (java.util.concurrent.Future<Void> future : taskFutures) { + try { + future.get(); + fail("Did not throw expected Exception after shut down"); + } catch (ExecutionException t) { + if (t.getCause() instanceof ClosedChannelException || + t.getCause() instanceof IllegalStateException) { + // Expected + } else { + t.printStackTrace(); + fail("Failed with unexpected Exception type: " + t.getClass().getName()); + } + } + } + + assertEquals("Connection leak (client)", 0, clientStats.getNumConnections()); + for (int i = 0; i < numServers; i++) { + boolean success = false; + int numRetries = 0; + while (!success) { + try { + assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections()); + success = true; + } catch (Throwable t) { + if (numRetries < 10) { + LOG.info("Retrying connection leak check (server)"); + Thread.sleep((numRetries + 1) * 50); + numRetries++; + } else { + throw t; + } + } + } + } + } finally { + if (client != null) { + client.shutDown(); + } + + for (int i = 0; i < numServers; i++) { + if (server[i] != null) { + server[i].shutDown(); + } + } + + if (clientTaskExecutor != null) { + clientTaskExecutor.shutdown(); + } + } + } + + // ------------------------------------------------------------------------ + + private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException { + ServerBootstrap bootstrap = new ServerBootstrap() + // Bind address and port + .localAddress(InetAddress.getLocalHost(), 0) + // NIO server channels + .group(NIO_GROUP) + .channel(NioServerSocketChannel.class) + // See initializer for pipeline details + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline() + .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) + .addLast(handlers); + } + }); + + return bootstrap.bind().sync().channel(); + } + + private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) { + InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress(); + + return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java new file mode 100644 index 0000000..f28ca68 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.queryablestate.messages.KvStateRequest; +import org.apache.flink.queryablestate.messages.KvStateRequestFailure; +import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link KvStateSerializer}. + */ +@RunWith(Parameterized.class) +public class KvStateRequestSerializerTest { + + private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + + @Parameterized.Parameters + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean async; + + /** + * Tests KvState request serialization. + */ + @Test + public void testKvStateRequestSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1337L; + KvStateID kvStateId = new KvStateID(); + byte[] serializedKeyAndNamespace = randomByteArray(1024); + + ByteBuf buf = MessageSerializer.serializeKvStateRequest( + alloc, + requestId, + kvStateId, + serializedKeyAndNamespace); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(requestId, request.getRequestId()); + assertEquals(kvStateId, request.getKvStateId()); + assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); + } + + /** + * Tests KvState request serialization with zero-length serialized key and namespace. + */ + @Test + public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { + byte[] serializedKeyAndNamespace = new byte[0]; + + ByteBuf buf = MessageSerializer.serializeKvStateRequest( + alloc, + 1823, + new KvStateID(), + serializedKeyAndNamespace); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> key and namespace. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { + new KvStateRequest(0, new KvStateID(), null); + } + + /** + * Tests KvState request result serialization. + */ + @Test + public void testKvStateRequestResultSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 72727278L; + byte[] serializedResult = randomByteArray(1024); + + ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( + alloc, + requestId, + serializedResult); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(requestId, request.getRequestId()); + + assertArrayEquals(serializedResult, request.getSerializedResult()); + } + + /** + * Tests KvState request result serialization with zero-length serialized result. + */ + @Test + public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception { + byte[] serializedResult = new byte[0]; + + ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( + alloc, + 72727278, + serializedResult); + + int frameLength = buf.readInt(); + + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedResult, request.getSerializedResult()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> results. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedResult() throws Exception { + new KvStateRequestResult(0, null); + } + + /** + * Tests KvState request failure serialization. + */ + @Test + public void testKvStateRequestFailureSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1111222L; + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure( + alloc, + requestId, + cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(requestId, request.getRequestId()); + assertEquals(cause.getClass(), request.getCause().getClass()); + assertEquals(cause.getMessage(), request.getCause().getMessage()); + } + + /** + * Tests KvState server failure serialization. + */ + @Test + public void testServerFailureSerialization() throws Exception { + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + Throwable request = MessageSerializer.deserializeServerFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(cause.getClass(), request.getClass()); + assertEquals(cause.getMessage(), request.getMessage()); + } + + private byte[] randomByteArray(int capacity) { + byte[] bytes = new byte[capacity]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } +}
