http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java deleted file mode 100644 index afd9e46..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ /dev/null @@ -1,449 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateClient; -import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.query.netty.UnknownKvStateID; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.heap.HeapValueState; -import org.apache.flink.runtime.state.heap.NestedMapsStateTable; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.MathUtils; - -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import org.junit.AfterClass; -import org.junit.Test; - -import java.net.ConnectException; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests for {@link QueryableStateClient}. - */ -public class QueryableStateClientTest { - - private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - - private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); - - @AfterClass - public static void tearDown() throws Exception { - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } - - /** - * All failures should lead to a retry with a forced location lookup. - * - * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation, - * ConnectException are checked explicitly as these indicate out-of-sync - * KvStateLocation. - */ - @Test - public void testForceLookupOnOutdatedLocation() throws Exception { - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - KvStateClient networkClient = mock(KvStateClient.class); - - QueryableStateClient client = new QueryableStateClient( - lookupService, - networkClient, - testActorSystem.dispatcher()); - - try { - JobID jobId = new JobID(); - int numKeyGroups = 4; - - // - // UnknownKvStateLocation - // - String query1 = "lucky"; - - Future<KvStateLocation> unknownKvStateLocation = Futures.failed( - new UnknownKvStateLocation(query1)); - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1))) - .thenReturn(unknownKvStateLocation); - - Future<Integer> result = client.getKvState( - jobId, - query1, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateLocation exception"); - } catch (UnknownKvStateLocation ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1)); - - // - // UnknownKvStateKeyGroupLocation - // - String query2 = "unlucky"; - - Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful( - new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2)); - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2))) - .thenReturn(unknownKeyGroupLocation); - - result = client.getKvState( - jobId, - query2, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateKeyGroupLocation exception"); - } catch (UnknownKvStateKeyGroupLocation ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2)); - - // - // UnknownKvStateID - // - String query3 = "water"; - KvStateID kvStateId = new KvStateID(); - Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId)); - - KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323); - KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3); - for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); - } - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3))) - .thenReturn(Futures.successful(location)); - - when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) - .thenReturn(unknownKvStateId); - - result = client.getKvState( - jobId, - query3, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected UnknownKvStateID exception"); - } catch (UnknownKvStateID ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3)); - - // - // ConnectException - // - String query4 = "space"; - Future<byte[]> connectException = Futures.failed(new ConnectException()); - kvStateId = new KvStateID(); - - serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123); - location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4); - for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); - } - - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4))) - .thenReturn(Futures.successful(location)); - - when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) - .thenReturn(connectException); - - result = client.getKvState( - jobId, - query4, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - try { - Await.result(result, timeout); - fail("Did not throw expected ConnectException exception"); - } catch (ConnectException ignored) { - // Expected - } - - verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4)); - - // - // Other Exceptions don't lead to a retry no retry - // - String query5 = "universe"; - Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception")); - when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5))) - .thenReturn(exception); - - client.getKvState( - jobId, - query5, - 0, - BasicTypeInfo.INT_TYPE_INFO, - new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); - - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5)); - } finally { - client.shutDown(); - } - } - - /** - * Tests queries against multiple servers. - * - * <p>The servers are populated with different keys and the client queries - * all available keys from all servers. - */ - @Test - public void testIntegrationWithKvStateServer() throws Exception { - // Config - int numServers = 2; - int numKeys = 1024; - int numKeyGroups = 1; - - JobID jobId = new JobID(); - JobVertexID jobVertexId = new JobVertexID(); - - KvStateServer[] servers = new KvStateServer[numServers]; - AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; - - QueryableStateClient client = null; - KvStateClient networkClient = null; - AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats(); - - MemoryStateBackend backend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - - AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); - - try { - KvStateRegistry[] registries = new KvStateRegistry[numServers]; - KvStateID[] kvStateIds = new KvStateID[numServers]; - List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>(); - - // Start the servers - for (int i = 0; i < numServers; i++) { - registries[i] = new KvStateRegistry(); - serverStats[i] = new AtomicKvStateRequestStats(); - servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]); - servers[i].start(); - ValueStateDescriptor<Integer> descriptor = - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - - RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - VoidNamespaceSerializer.INSTANCE, - IntSerializer.INSTANCE); - - // Register state - HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>( - descriptor, - new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo), - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - kvStates.add(kvState); - - kvStateIds[i] = registries[i].registerKvState( - jobId, - new JobVertexID(), - new KeyGroupRange(i, i), - "choco", - kvState); - } - - int[] expectedRequests = new int[numServers]; - - for (int key = 0; key < numKeys; key++) { - int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers; - expectedRequests[targetKeyGroupIndex]++; - - HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex); - - keyedStateBackend.setCurrentKey(key); - kvState.setCurrentNamespace(VoidNamespace.INSTANCE); - kvState.update(1337 + key); - } - - // Location lookup service - KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco"); - for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) { - location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress()); - } - - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco"))) - .thenReturn(Futures.successful(location)); - - // The client - networkClient = new KvStateClient(1, networkClientStats); - - client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher()); - - // Send all queries - List<Future<Integer>> futures = new ArrayList<>(numKeys); - for (int key = 0; key < numKeys; key++) { - ValueStateDescriptor<Integer> descriptor = - new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - futures.add(client.getKvState( - jobId, - "choco", - key, - BasicTypeInfo.INT_TYPE_INFO, - descriptor)); - } - - // Verify results - Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher()); - Iterable<Integer> results = Await.result(future, timeout); - - int index = 0; - for (int buffer : results) { - assertEquals(1337 + index, buffer); - index++; - } - - // Verify requests - for (int i = 0; i < numServers; i++) { - int numRetries = 10; - for (int retry = 0; retry < numRetries; retry++) { - try { - assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests()); - assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful()); - assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed()); - break; - } catch (Throwable t) { - // Retry - if (retry == numRetries - 1) { - throw t; - } else { - Thread.sleep(100); - } - } - } - } - } finally { - if (client != null) { - client.shutDown(); - } - - if (networkClient != null) { - networkClient.shutDown(); - } - - for (KvStateServer server : servers) { - if (server != null) { - server.shutDown(); - } - } - } - } - - /** - * Tests that the QueryableState client correctly caches location lookups - * keyed by both job and name. This test is mainly due to a previous bug due - * to which cache entries were by name only. This is a problem, because the - * same client can be used to query multiple jobs. - */ - @Test - public void testLookupMultipleJobIds() throws Exception { - String name = "unique-per-job"; - - // Exact contents don't matter here - KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name); - location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892)); - - JobID jobId1 = new JobID(); - JobID jobId2 = new JobID(); - - KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); - - when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString())) - .thenReturn(Futures.successful(location)); - - KvStateClient networkClient = mock(KvStateClient.class); - when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class))) - .thenReturn(Futures.successful(new byte[0])); - - QueryableStateClient client = new QueryableStateClient( - lookupService, - networkClient, - testActorSystem.dispatcher()); - - ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE); - - // Query ies with same name, but different job IDs should lead to a - // single lookup per query and job ID. - client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); - client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); - - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name)); - verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name)); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java deleted file mode 100644 index 1e41236..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; - -import org.junit.Test; - -import java.nio.channels.ClosedChannelException; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * Tests for {@link KvStateClientHandler}. - */ -public class KvStateClientHandlerTest { - - /** - * Tests that on reads the expected callback methods are called and read - * buffers are recycled. - */ - @Test - public void testReadCallbacksAndBufferRecycling() throws Exception { - KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class); - - EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback)); - - // - // Request success - // - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult( - channel.alloc(), - 1222112277, - new byte[0]); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Request failure - // - buf = KvStateRequestSerializer.serializeKvStateRequestFailure( - channel.alloc(), - 1222112278, - new RuntimeException("Expected test Exception")); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onRequestFailure(eq(1222112278L), any(RuntimeException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Server failure - // - buf = KvStateRequestSerializer.serializeServerFailure( - channel.alloc(), - new RuntimeException("Expected test Exception")); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onFailure(any(RuntimeException.class)); - - // - // Unexpected messages - // - buf = channel.alloc().buffer(4).writeInt(1223823); - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(2)).onFailure(any(IllegalStateException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Exception caught - // - channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - verify(callback, times(3)).onFailure(any(RuntimeException.class)); - - // - // Channel inactive - // - channel.pipeline().fireChannelInactive(); - verify(callback, times(4)).onFailure(any(ClosedChannelException.class)); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java deleted file mode 100644 index 6b21487..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ /dev/null @@ -1,747 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.message.KvStateRequest; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestType; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.NetUtils; - -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link KvStateClient}. - */ -public class KvStateClientTest { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class); - - // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); - - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); - - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); - } - } - - /** - * Tests simple queries, of which half succeed and half fail. - */ - @Test - public void testSimpleRequests() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - // Random result - final byte[] expected = new byte[1024]; - ThreadLocalRandom.current().nextBytes(expected); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - List<Future<byte[]>> futures = new ArrayList<>(); - - int numQueries = 1024; - - for (int i = 0; i < numQueries; i++) { - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - } - - // Respond to messages - Exception testException = new RuntimeException("Expected test Exception"); - - for (int i = 0; i < numQueries; i++) { - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf); - - buf.release(); - - if (i % 2 == 0) { - ByteBuf response = KvStateRequestSerializer.serializeKvStateRequestResult( - serverChannel.alloc(), - request.getRequestId(), - expected); - - ch.writeAndFlush(response); - } else { - ByteBuf response = KvStateRequestSerializer.serializeKvStateRequestFailure( - serverChannel.alloc(), - request.getRequestId(), - testException); - - ch.writeAndFlush(response); - } - } - - for (int i = 0; i < numQueries; i++) { - if (i % 2 == 0) { - byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft()); - assertArrayEquals(expected, serializedResult); - } else { - try { - Await.result(futures.get(i), deadline.timeLeft()); - fail("Did not throw expected Exception"); - } catch (RuntimeException ignored) { - // Expected - } - } - } - - assertEquals(numQueries, stats.getNumRequests()); - int expectedRequests = numQueries / 2; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || - stats.getNumFailed() != expectedRequests)) { - Thread.sleep(100); - } - - assertEquals(expectedRequests, stats.getNumSuccessful()); - assertEquals(expectedRequests, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a request to an unavailable host is failed with ConnectException. - */ - @Test - public void testRequestUnavailableHost() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateClient client = null; - - try { - client = new KvStateClient(1, stats); - - int availablePort = NetUtils.getAvailablePort(); - - KvStateServerAddress serverAddress = new KvStateServerAddress( - InetAddress.getLocalHost(), - availablePort); - - Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); - - try { - Await.result(future, deadline.timeLeft()); - fail("Did not throw expected ConnectException"); - } catch (ConnectException ignored) { - // Expected - } - } finally { - if (client != null) { - client.shutDown(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Multiple threads concurrently fire queries. - */ - @Test - public void testConcurrentQueries() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - ExecutorService executor = null; - KvStateClient client = null; - Channel serverChannel = null; - - final byte[] serializedResult = new byte[1024]; - ThreadLocalRandom.current().nextBytes(serializedResult); - - try { - int numQueryTasks = 4; - final int numQueriesPerTask = 1024; - - executor = Executors.newFixedThreadPool(numQueryTasks); - - client = new KvStateClient(1, stats); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - assertEquals(KvStateRequestType.REQUEST, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequest request = KvStateRequestSerializer.deserializeKvStateRequest(buf); - - buf.release(); - - ByteBuf response = KvStateRequestSerializer.serializeKvStateRequestResult( - ctx.alloc(), - request.getRequestId(), - serializedResult); - - ctx.channel().writeAndFlush(response); - } - }); - - final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - final KvStateClient finalClient = client; - Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() { - @Override - public List<Future<byte[]>> call() throws Exception { - List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask); - - for (int i = 0; i < numQueriesPerTask; i++) { - results.add(finalClient.getKvState( - serverAddress, - new KvStateID(), - new byte[0])); - } - - return results; - } - }; - - // Submit query tasks - List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>(); - for (int i = 0; i < numQueryTasks; i++) { - futures.add(executor.submit(queryTask)); - } - - // Verify results - for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) { - List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - for (Future<byte[]> result : results) { - byte[] actual = Await.result(result, deadline.timeLeft()); - assertArrayEquals(serializedResult, actual); - } - } - - int totalQueries = numQueryTasks * numQueriesPerTask; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { - Thread.sleep(100); - } - - assertEquals(totalQueries, stats.getNumRequests()); - assertEquals(totalQueries, stats.getNumSuccessful()); - } finally { - if (executor != null) { - executor.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - if (client != null) { - client.shutDown(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a server failure closes the connection and removes it from - * the established connections. - */ - @Test - public void testFailureClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - List<Future<byte[]>> futures = new ArrayList<>(); - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - assertEquals(1, stats.getNumConnections()); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - // Respond with failure - ch.writeAndFlush(KvStateRequestSerializer.serializeServerFailure( - serverChannel.alloc(), - new RuntimeException("Expected test server failure"))); - - try { - Await.result(futures.remove(0), deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (RuntimeException ignored) { - // Expected - } - - try { - Await.result(futures.remove(0), deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (RuntimeException ignored) { - // Expected - } - - assertEquals(0, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || - stats.getNumFailed() != 2)) { - Thread.sleep(100); - } - - assertEquals(2, stats.getNumRequests()); - assertEquals(0, stats.getNumSuccessful()); - assertEquals(2, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a server channel close, closes the connection and removes it - * from the established connections. - */ - @Test - public void testServerClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - final AtomicBoolean received = new AtomicBoolean(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.set(true); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); - - while (!received.get() && deadline.hasTimeLeft()) { - Thread.sleep(50); - } - assertTrue("Receive timed out", received.get()); - - assertEquals(1, stats.getNumConnections()); - - channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - try { - Await.result(future, deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (ClosedChannelException ignored) { - // Expected - } - - assertEquals(0, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || - stats.getNumFailed() != 1)) { - Thread.sleep(100); - } - - assertEquals(1, stats.getNumRequests()); - assertEquals(0, stats.getNumSuccessful()); - assertEquals(1, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests multiple clients querying multiple servers until 100k queries have - * been processed. At this point, the client is shut down and its verified - * that all ongoing requests are failed. - */ - @Test - public void testClientServerIntegration() throws Exception { - // Config - final int numServers = 2; - final int numServerEventLoopThreads = 2; - final int numServerQueryThreads = 2; - - final int numClientEventLoopThreads = 4; - final int numClientsTasks = 8; - - final int batchSize = 16; - - final int numKeyGroups = 1; - - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - KvStateRegistry dummyRegistry = new KvStateRegistry(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(dummyRegistry); - - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); - - final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - ExecutorService clientTaskExecutor = null; - final KvStateServer[] server = new KvStateServer[numServers]; - - try { - client = new KvStateClient(numClientEventLoopThreads, clientStats); - clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); - - // Create state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("any"); - - // Create servers - KvStateRegistry[] registry = new KvStateRegistry[numServers]; - AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; - final KvStateID[] ids = new KvStateID[numServers]; - - for (int i = 0; i < numServers; i++) { - registry[i] = new KvStateRegistry(); - serverStats[i] = new AtomicKvStateRequestStats(); - server[i] = new KvStateServer( - InetAddress.getLocalHost(), - 0, - numServerEventLoopThreads, - numServerQueryThreads, - registry[i], - serverStats[i]); - - server[i].start(); - - backend.setCurrentKey(1010 + i); - - // Value per server - ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - state.update(201 + i); - - // we know it must be a KvStat but this is not exposed to the user via State - InternalKvState<?> kvState = (InternalKvState<?>) state; - - // Register KvState (one state instance for all server) - ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); - } - - final KvStateClient finalClient = client; - Callable<Void> queryTask = new Callable<Void>() { - @Override - public Void call() throws Exception { - while (true) { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - // Random server permutation - List<Integer> random = new ArrayList<>(); - for (int j = 0; j < batchSize; j++) { - random.add(j); - } - Collections.shuffle(random); - - // Dispatch queries - List<Future<byte[]>> futures = new ArrayList<>(batchSize); - - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - 1010 + targetServer, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - futures.add(finalClient.getKvState( - server[targetServer].getAddress(), - ids[targetServer], - serializedKeyAndNamespace)); - } - - // Verify results - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - Future<byte[]> future = futures.get(j); - byte[] buf = Await.result(future, timeout); - int value = KvStateRequestSerializer.deserializeValue(buf, IntSerializer.INSTANCE); - assertEquals(201 + targetServer, value); - } - } - } - }; - - // Submit tasks - List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>(); - for (int i = 0; i < numClientsTasks; i++) { - taskFutures.add(clientTaskExecutor.submit(queryTask)); - } - - long numRequests; - while ((numRequests = clientStats.getNumRequests()) < 100_000) { - Thread.sleep(100); - LOG.info("Number of requests {}/100_000", numRequests); - } - - // Shut down - client.shutDown(); - - for (java.util.concurrent.Future<Void> future : taskFutures) { - try { - future.get(); - fail("Did not throw expected Exception after shut down"); - } catch (ExecutionException t) { - if (t.getCause() instanceof ClosedChannelException || - t.getCause() instanceof IllegalStateException) { - // Expected - } else { - t.printStackTrace(); - fail("Failed with unexpected Exception type: " + t.getClass().getName()); - } - } - } - - assertEquals("Connection leak (client)", 0, clientStats.getNumConnections()); - for (int i = 0; i < numServers; i++) { - boolean success = false; - int numRetries = 0; - while (!success) { - try { - assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections()); - success = true; - } catch (Throwable t) { - if (numRetries < 10) { - LOG.info("Retrying connection leak check (server)"); - Thread.sleep((numRetries + 1) * 50); - numRetries++; - } else { - throw t; - } - } - } - } - } finally { - if (client != null) { - client.shutDown(); - } - - for (int i = 0; i < numServers; i++) { - if (server[i] != null) { - server[i].shutDown(); - } - } - - if (clientTaskExecutor != null) { - clientTaskExecutor.shutdown(); - } - } - } - - // ------------------------------------------------------------------------ - - private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException { - ServerBootstrap bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(InetAddress.getLocalHost(), 0) - // NIO server channels - .group(NIO_GROUP) - .channel(NioServerSocketChannel.class) - // See initializer for pipeline details - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(handlers); - } - }); - - return bootstrap.bind().sync().channel(); - } - - private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) { - InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress(); - - return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java deleted file mode 100644 index 4914ff7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ /dev/null @@ -1,721 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure; -import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestType; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.TestLogger; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for {@link KvStateServerHandler}. - */ -public class KvStateServerHandlerTest extends TestLogger { - - /** Shared Thread pool for query execution. */ - private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor(); - - private static final int READ_TIMEOUT_MILLIS = 10000; - - @AfterClass - public static void tearDown() throws Exception { - if (TEST_THREAD_POOL != null) { - TEST_THREAD_POOL.shutdown(); - } - } - - /** - * Tests a simple successful query via an EmbeddedChannel. - */ - @Test - public void testSimpleQuery() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Update the KvState and request it - int expectedValue = 712828289; - - int key = 99812822; - backend.setCurrentKey(key); - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - state.update(expectedValue); - - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestResult response = KvStateRequestSerializer.deserializeKvStateRequestResult(buf); - - assertEquals(requestId, response.getRequestId()); - - int actualValue = KvStateRequestSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); - assertEquals(expectedValue, actualValue); - - assertEquals(stats.toString(), 1, stats.getNumRequests()); - - // Wait for async successful request report - long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) { - Thread.sleep(10); - } - - assertEquals(stats.toString(), 1, stats.getNumSuccessful()); - } - - /** - * Tests the failure response with {@link UnknownKvStateID} as cause on - * queries for unregistered KvStateIDs. - */ - @Test - public void testQueryUnknownKvStateID() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - long requestId = Integer.MAX_VALUE + 182828L; - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - new KvStateID(), - new byte[0]); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - - assertEquals(requestId, response.getRequestId()); - - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID); - - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); - } - - /** - * Tests the failure response with {@link UnknownKeyOrNamespace} as cause - * on queries for non-existing keys. - */ - @Test - public void testQueryUnknownKey() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - 1238283, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 22982L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - - assertEquals(requestId, response.getRequestId()); - - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace); - - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); - } - - /** - * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} - * call. - */ - @Test - public void testFailureOnGetSerializedValue() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Failing KvState - InternalKvState<?> kvState = mock(InternalKvState.class); - when(kvState.getSerializedValue(any(byte[].class))) - .thenThrow(new RuntimeException("Expected test Exception")); - - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - new KeyGroupRange(0, 0), - "vanilla", - kvState); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - kvStateId, - new byte[0]); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - - assertTrue(response.getCause().getMessage().contains("Expected test Exception")); - - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); - } - - /** - * Tests that the channel is closed if an Exception reaches the channel - * handler. - */ - @Test - public void testCloseChannelOnExceptionCaught() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(handler); - - channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - Throwable response = KvStateRequestSerializer.deserializeServerFailure(buf); - - assertTrue(response.getMessage().contains("Expected test Exception")); - - channel.closeFuture().await(READ_TIMEOUT_MILLIS); - assertFalse(channel.isActive()); - } - - /** - * Tests the failure response on a rejected execution, because the query - * executor has been closed. - */ - @Test - public void testQueryExecutorShutDown() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - ExecutorService closedExecutor = Executors.newSingleThreadExecutor(); - closedExecutor.shutdown(); - assertTrue(closedExecutor.isShutdown()); - - KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - - assertTrue(registryListener.registrationName.equals("vanilla")); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - registryListener.kvStateId, - new byte[0]); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - - assertTrue(response.getCause().getMessage().contains("RejectedExecutionException")); - - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); - } - - /** - * Tests response on unexpected messages. - */ - @Test - public void testUnexpectedMessage() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Write the request and wait for the response - ByteBuf unexpectedMessage = Unpooled.buffer(8); - unexpectedMessage.writeInt(4); - unexpectedMessage.writeInt(123238213); - - channel.writeInbound(unexpectedMessage); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - Throwable response = KvStateRequestSerializer.deserializeServerFailure(buf); - - assertEquals(0, stats.getNumRequests()); - assertEquals(0, stats.getNumFailed()); - - unexpectedMessage = KvStateRequestSerializer.serializeKvStateRequestResult( - channel.alloc(), - 192, - new byte[0]); - - channel.writeInbound(unexpectedMessage); - - buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.SERVER_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - response = KvStateRequestSerializer.deserializeServerFailure(buf); - - assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException); - - assertEquals(0, stats.getNumRequests()); - assertEquals(0, stats.getNumFailed()); - } - - /** - * Tests that incoming buffer instances are recycled. - */ - @Test - public void testIncomingBufferIsRecycled() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - new KvStateID(), - new byte[0]); - - assertEquals(1, request.refCnt()); - - // Write regular request - channel.writeInbound(request); - assertEquals("Buffer not recycled", 0, request.refCnt()); - - // Write unexpected msg - ByteBuf unexpected = channel.alloc().buffer(8); - unexpected.writeInt(4); - unexpected.writeInt(4); - - assertEquals(1, unexpected.refCnt()); - - channel.writeInbound(unexpected); - assertEquals("Buffer not recycled", 0, unexpected.refCnt()); - } - - /** - * Tests the failure response if the serializers don't match. - */ - @Test - public void testSerializerMismatch() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - int key = 99812822; - - // Update the KvState - backend.setCurrentKey(key); - state.update(712828289); - - byte[] wrongKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - "wrong-key-type", - StringSerializer.INSTANCE, - "wrong-namespace-type", - StringSerializer.INSTANCE); - - byte[] wrongNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - "wrong-namespace-type", - StringSerializer.INSTANCE); - - assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - 182828, - registryListener.kvStateId, - wrongKeyAndNamespace); - - // Write the request and wait for the response - channel.writeInbound(request); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(182828, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IOException")); - - // Repeat with wrong namespace only - request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - 182829, - registryListener.kvStateId, - wrongNamespace); - - // Write the request and wait for the response - channel.writeInbound(request); - - buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(KvStateRequestType.REQUEST_FAILURE, KvStateRequestSerializer.deserializeHeader(buf)); - response = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(182829, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IOException")); - - assertEquals(2, stats.getNumRequests()); - assertEquals(2, stats.getNumFailed()); - } - - /** - * Tests that large responses are chunked. - */ - @Test - public void testChunkedResponse() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - KvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<byte[]> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - // Update KvState - byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()]; - - byte current = 0; - for (int i = 0; i < bytes.length; i++) { - bytes[i] = current++; - } - - int key = 99812822; - backend.setCurrentKey(key); - state.update(bytes); - - // Request - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); - - // Write the request and wait for the response - channel.writeInbound(request); - - Object msg = readInboundBlocking(channel); - assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf); - } - - // ------------------------------------------------------------------------ - - /** - * Queries the embedded channel for data. - */ - private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException { - final int sleepMillis = 50; - - int sleptMillis = 0; - - Object msg = null; - while (sleptMillis < READ_TIMEOUT_MILLIS && - (msg = channel.readOutbound()) == null) { - - Thread.sleep(sleepMillis); - sleptMillis += sleepMillis; - } - - if (msg == null) { - throw new TimeoutException(); - } else { - return msg; - } - } - - /** - * Frame length decoder (expected by the serialized messages). - */ - private ChannelHandler getFrameDecoder() { - return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4); - } - - /** - * A listener that keeps the last updated KvState information so that a test - * can retrieve it. - */ - static class TestRegistryListener implements KvStateRegistryListener { - volatile JobVertexID jobVertexID; - volatile KeyGroupRange keyGroupIndex; - volatile String registrationName; - volatile KvStateID kvStateId; - - @Override - public void notifyKvStateRegistered(JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName, - KvStateID kvStateId) { - this.jobVertexID = jobVertexId; - this.keyGroupIndex = keyGroupRange; - this.registrationName = registrationName; - this.kvStateId = kvStateId; - } - - @Override - public void notifyKvStateUnregistered(JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName) { - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java deleted file mode 100644 index f8213e1..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestType; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; - -import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; - -import java.net.InetAddress; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link KvStateServer}. - */ -public class KvStateServerTest { - - // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); - - private static final int TIMEOUT_MILLIS = 10000; - - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); - } - } - - /** - * Tests a simple successful query via a SocketChannel. - */ - @Test - public void testSimpleRequest() throws Exception { - KvStateServer server = null; - Bootstrap bootstrap = null; - try { - KvStateRegistry registry = new KvStateRegistry(); - KvStateRequestStats stats = new AtomicKvStateRequestStats(); - - server = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registry, stats); - server.start(); - - KvStateServerAddress serverAddress = server.getAddress(); - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(new JobID(), new JobVertexID())); - - final KvStateServerHandlerTest.TestRegistryListener registryListener = - new KvStateServerHandlerTest.TestRegistryListener(); - - registry.registerListener(registryListener); - - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - // Update KvState - int expectedValue = 712828289; - - int key = 99812822; - backend.setCurrentKey(key); - state.update(expectedValue); - - // Request - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - // Connect to the server - final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>(); - bootstrap = createBootstrap( - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - responses.add((ByteBuf) msg); - } - }); - - Channel channel = bootstrap - .connect(serverAddress.getHost(), serverAddress.getPort()) - .sync().channel(); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); - - channel.writeAndFlush(request); - - ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - - assertEquals(KvStateRequestType.REQUEST_RESULT, KvStateRequestSerializer.deserializeHeader(buf)); - KvStateRequestResult response = KvStateRequestSerializer.deserializeKvStateRequestResult(buf); - - assertEquals(requestId, response.getRequestId()); - int actualValue = KvStateRequestSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); - assertEquals(expectedValue, actualValue); - } finally { - if (server != null) { - server.shutDown(); - } - - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(); - } - } - } - } - - /** - * Creates a client bootstrap. - */ - private Bootstrap createBootstrap(final ChannelHandler... handlers) { - return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(handlers); - } - }); - } - -}
