http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java deleted file mode 100644 index 4023925..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ /dev/null @@ -1,784 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.NetUtils; - -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link Client}. - */ -public class ClientTest { - - private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); - - // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); - - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS); - - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); - } - } - - /** - * Tests simple queries, of which half succeed and half fail. - */ - @Test - public void testSimpleRequests() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - Client<KvStateInternalRequest, KvStateResponse> client = null; - Channel serverChannel = null; - - try { - client = new Client<>("Test Client", 1, serializer, stats); - - // Random result - final byte[] expected = new byte[1024]; - ThreadLocalRandom.current().nextBytes(expected); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - long numQueries = 1024L; - - List<CompletableFuture<KvStateResponse>> futures = new ArrayList<>(); - for (long i = 0L; i < numQueries; i++) { - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - futures.add(client.sendRequest(serverAddress, request)); - } - - // Respond to messages - Exception testException = new RuntimeException("Expected test Exception"); - - for (long i = 0L; i < numQueries; i++) { - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - long requestId = MessageSerializer.getRequestId(buf); - KvStateInternalRequest deserRequest = serializer.deserializeRequest(buf); - - buf.release(); - - if (i % 2L == 0L) { - ByteBuf response = MessageSerializer.serializeResponse( - serverChannel.alloc(), - requestId, - new KvStateResponse(expected)); - - ch.writeAndFlush(response); - } else { - ByteBuf response = MessageSerializer.serializeRequestFailure( - serverChannel.alloc(), - requestId, - testException); - - ch.writeAndFlush(response); - } - } - - for (long i = 0L; i < numQueries; i++) { - - if (i % 2L == 0L) { - KvStateResponse serializedResult = futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertArrayEquals(expected, serializedResult.getContent()); - } else { - try { - futures.get((int) i).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail("Did not throw expected Exception"); - } catch (ExecutionException e) { - - if (!(e.getCause() instanceof RuntimeException)) { - fail("Did not throw expected Exception"); - } - // else expected - } - } - } - - assertEquals(numQueries, stats.getNumRequests()); - long expectedRequests = numQueries / 2L; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || - stats.getNumFailed() != expectedRequests)) { - Thread.sleep(100L); - } - - assertEquals(expectedRequests, stats.getNumSuccessful()); - assertEquals(expectedRequests, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0L, stats.getNumConnections()); - } - } - - /** - * Tests that a request to an unavailable host is failed with ConnectException. - */ - @Test - public void testRequestUnavailableHost() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - Client<KvStateInternalRequest, KvStateResponse> client = null; - - try { - client = new Client<>("Test Client", 1, serializer, stats); - - int availablePort = NetUtils.getAvailablePort(); - - KvStateServerAddress serverAddress = new KvStateServerAddress( - InetAddress.getLocalHost(), - availablePort); - - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request); - - try { - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail("Did not throw expected ConnectException"); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof ConnectException)) { - fail("Did not throw expected ConnectException"); - } - // else expected - } - } finally { - if (client != null) { - client.shutdown(); - } - - assertEquals("Channel leak", 0L, stats.getNumConnections()); - } - } - - /** - * Multiple threads concurrently fire queries. - */ - @Test - public void testConcurrentQueries() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - ExecutorService executor = null; - Client<KvStateInternalRequest, KvStateResponse> client = null; - Channel serverChannel = null; - - final byte[] serializedResult = new byte[1024]; - ThreadLocalRandom.current().nextBytes(serializedResult); - - try { - int numQueryTasks = 4; - final int numQueriesPerTask = 1024; - - executor = Executors.newFixedThreadPool(numQueryTasks); - - client = new Client<>("Test Client", 1, serializer, stats); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - long requestId = MessageSerializer.getRequestId(buf); - KvStateInternalRequest request = serializer.deserializeRequest(buf); - - buf.release(); - - KvStateResponse response = new KvStateResponse(serializedResult); - ByteBuf serResponse = MessageSerializer.serializeResponse( - ctx.alloc(), - requestId, - response); - - ctx.channel().writeAndFlush(serResponse); - } - }); - - final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - final Client<KvStateInternalRequest, KvStateResponse> finalClient = client; - Callable<List<CompletableFuture<KvStateResponse>>> queryTask = () -> { - List<CompletableFuture<KvStateResponse>> results = new ArrayList<>(numQueriesPerTask); - - for (int i = 0; i < numQueriesPerTask; i++) { - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - results.add(finalClient.sendRequest(serverAddress, request)); - } - - return results; - }; - - // Submit query tasks - List<Future<List<CompletableFuture<KvStateResponse>>>> futures = new ArrayList<>(); - for (int i = 0; i < numQueryTasks; i++) { - futures.add(executor.submit(queryTask)); - } - - // Verify results - for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) { - List<CompletableFuture<KvStateResponse>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - for (CompletableFuture<KvStateResponse> result : results) { - KvStateResponse actual = result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertArrayEquals(serializedResult, actual.getContent()); - } - } - - int totalQueries = numQueryTasks * numQueriesPerTask; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { - Thread.sleep(100L); - } - - assertEquals(totalQueries, stats.getNumRequests()); - assertEquals(totalQueries, stats.getNumSuccessful()); - } finally { - if (executor != null) { - executor.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - if (client != null) { - client.shutdown(); - } - - assertEquals("Channel leak", 0L, stats.getNumConnections()); - } - } - - /** - * Tests that a server failure closes the connection and removes it from - * the established connections. - */ - @Test - public void testFailureClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - Client<KvStateInternalRequest, KvStateResponse> client = null; - Channel serverChannel = null; - - try { - client = new Client<>("Test Client", 1, serializer, stats); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - List<Future<KvStateResponse>> futures = new ArrayList<>(); - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - - futures.add(client.sendRequest(serverAddress, request)); - futures.add(client.sendRequest(serverAddress, request)); - - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - assertEquals(1L, stats.getNumConnections()); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - // Respond with failure - ch.writeAndFlush(MessageSerializer.serializeServerFailure( - serverChannel.alloc(), - new RuntimeException("Expected test server failure"))); - - try { - futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail("Did not throw expected server failure"); - } catch (ExecutionException e) { - - if (!(e.getCause() instanceof RuntimeException)) { - fail("Did not throw expected Exception"); - } - // Expected - } - - try { - futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail("Did not throw expected server failure"); - } catch (ExecutionException e) { - - if (!(e.getCause() instanceof RuntimeException)) { - fail("Did not throw expected Exception"); - } - // Expected - } - - assertEquals(0L, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) { - Thread.sleep(100L); - } - - assertEquals(2L, stats.getNumRequests()); - assertEquals(0L, stats.getNumSuccessful()); - assertEquals(2L, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0L, stats.getNumConnections()); - } - } - - /** - * Tests that a server channel close, closes the connection and removes it - * from the established connections. - */ - @Test - public void testServerClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - Client<KvStateInternalRequest, KvStateResponse> client = null; - Channel serverChannel = null; - - try { - client = new Client<>("Test Client", 1, serializer, stats); - - final AtomicBoolean received = new AtomicBoolean(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.set(true); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - Future<KvStateResponse> future = client.sendRequest(serverAddress, request); - - while (!received.get() && deadline.hasTimeLeft()) { - Thread.sleep(50L); - } - assertTrue("Receive timed out", received.get()); - - assertEquals(1, stats.getNumConnections()); - - channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - try { - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - fail("Did not throw expected server failure"); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof ClosedChannelException)) { - fail("Did not throw expected Exception"); - } - // Expected - } - - assertEquals(0L, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) { - Thread.sleep(100L); - } - - assertEquals(1L, stats.getNumRequests()); - assertEquals(0L, stats.getNumSuccessful()); - assertEquals(1L, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0L, stats.getNumConnections()); - } - } - - /** - * Tests multiple clients querying multiple servers until 100k queries have - * been processed. At this point, the client is shut down and its verified - * that all ongoing requests are failed. - */ - @Test - public void testClientServerIntegration() throws Throwable { - // Config - final int numServers = 2; - final int numServerEventLoopThreads = 2; - final int numServerQueryThreads = 2; - - final int numClientEventLoopThreads = 4; - final int numClientsTasks = 8; - - final int batchSize = 16; - - final int numKeyGroups = 1; - - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - KvStateRegistry dummyRegistry = new KvStateRegistry(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(dummyRegistry); - - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); - - final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); - - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - Client<KvStateInternalRequest, KvStateResponse> client = null; - ExecutorService clientTaskExecutor = null; - final KvStateServerImpl[] server = new KvStateServerImpl[numServers]; - - try { - client = new Client<>("Test Client", numClientEventLoopThreads, serializer, clientStats); - clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); - - // Create state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("any"); - - // Create servers - KvStateRegistry[] registry = new KvStateRegistry[numServers]; - AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; - final KvStateID[] ids = new KvStateID[numServers]; - - for (int i = 0; i < numServers; i++) { - registry[i] = new KvStateRegistry(); - serverStats[i] = new AtomicKvStateRequestStats(); - server[i] = new KvStateServerImpl( - InetAddress.getLocalHost(), - Collections.singletonList(0).iterator(), - numServerEventLoopThreads, - numServerQueryThreads, - registry[i], - serverStats[i]); - - server[i].start(); - - backend.setCurrentKey(1010 + i); - - // Value per server - ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - state.update(201 + i); - - // we know it must be a KvStat but this is not exposed to the user via State - InternalKvState<?> kvState = (InternalKvState<?>) state; - - // Register KvState (one state instance for all server) - ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); - } - - final Client<KvStateInternalRequest, KvStateResponse> finalClient = client; - Callable<Void> queryTask = () -> { - while (true) { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - // Random server permutation - List<Integer> random = new ArrayList<>(); - for (int j = 0; j < batchSize; j++) { - random.add(j); - } - Collections.shuffle(random); - - // Dispatch queries - List<Future<KvStateResponse>> futures = new ArrayList<>(batchSize); - - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - 1010 + targetServer, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace); - futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), request)); - } - - // Verify results - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - Future<KvStateResponse> future = futures.get(j); - byte[] buf = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS).getContent(); - int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE); - assertEquals(201L + targetServer, value); - } - } - }; - - // Submit tasks - List<Future<Void>> taskFutures = new ArrayList<>(); - for (int i = 0; i < numClientsTasks; i++) { - taskFutures.add(clientTaskExecutor.submit(queryTask)); - } - - long numRequests; - while ((numRequests = clientStats.getNumRequests()) < 100_000L) { - Thread.sleep(100L); - LOG.info("Number of requests {}/100_000", numRequests); - } - - // Shut down - client.shutdown(); - - for (Future<Void> future : taskFutures) { - try { - future.get(); - fail("Did not throw expected Exception after shut down"); - } catch (ExecutionException t) { - if (t.getCause().getCause() instanceof ClosedChannelException || - t.getCause().getCause() instanceof IllegalStateException) { - // Expected - } else { - t.printStackTrace(); - fail("Failed with unexpected Exception type: " + t.getClass().getName()); - } - } - } - - assertEquals("Connection leak (client)", 0L, clientStats.getNumConnections()); - for (int i = 0; i < numServers; i++) { - boolean success = false; - int numRetries = 0; - while (!success) { - try { - assertEquals("Connection leak (server)", 0L, serverStats[i].getNumConnections()); - success = true; - } catch (Throwable t) { - if (numRetries < 10) { - LOG.info("Retrying connection leak check (server)"); - Thread.sleep((numRetries + 1) * 50L); - numRetries++; - } else { - throw t; - } - } - } - } - } finally { - if (client != null) { - client.shutdown(); - } - - for (int i = 0; i < numServers; i++) { - if (server[i] != null) { - server[i].shutdown(); - } - } - - if (clientTaskExecutor != null) { - clientTaskExecutor.shutdown(); - } - } - } - - // ------------------------------------------------------------------------ - - private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException { - ServerBootstrap bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(InetAddress.getLocalHost(), 0) - // NIO server channels - .group(NIO_GROUP) - .channel(NioServerSocketChannel.class) - // See initializer for pipeline details - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(handlers); - } - }); - - return bootstrap.bind().sync().channel(); - } - - private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) { - InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress(); - - return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java deleted file mode 100644 index cb490aa..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; - -import org.junit.Test; - -import java.nio.channels.ClosedChannelException; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * Tests for {@link ClientHandler}. - */ -public class KvStateClientHandlerTest { - - /** - * Tests that on reads the expected callback methods are called and read - * buffers are recycled. - */ - @Test - public void testReadCallbacksAndBufferRecycling() throws Exception { - final ClientHandlerCallback<KvStateResponse> callback = mock(ClientHandlerCallback.class); - - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - final EmbeddedChannel channel = new EmbeddedChannel(new ClientHandler<>("Test Client", serializer, callback)); - - final byte[] content = new byte[0]; - final KvStateResponse response = new KvStateResponse(content); - - // - // Request success - // - ByteBuf buf = MessageSerializer.serializeResponse(channel.alloc(), 1222112277L, response); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onRequestResult(eq(1222112277L), any(KvStateResponse.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Request failure - // - buf = MessageSerializer.serializeRequestFailure( - channel.alloc(), - 1222112278, - new RuntimeException("Expected test Exception")); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onRequestFailure(eq(1222112278L), any(RuntimeException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Server failure - // - buf = MessageSerializer.serializeServerFailure( - channel.alloc(), - new RuntimeException("Expected test Exception")); - buf.skipBytes(4); // skip frame length - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(1)).onFailure(any(RuntimeException.class)); - - // - // Unexpected messages - // - buf = channel.alloc().buffer(4).writeInt(1223823); - - // Verify callback - channel.writeInbound(buf); - verify(callback, times(2)).onFailure(any(IllegalStateException.class)); - assertEquals("Buffer not recycled", 0, buf.refCnt()); - - // - // Exception caught - // - channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - verify(callback, times(3)).onFailure(any(RuntimeException.class)); - - // - // Channel inactive - // - channel.pipeline().fireChannelInactive(); - verify(callback, times(4)).onFailure(any(ClosedChannelException.class)); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java deleted file mode 100644 index 217d0b5..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ /dev/null @@ -1,758 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; -import org.apache.flink.queryablestate.UnknownKvStateIdException; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.network.messages.RequestFailure; -import org.apache.flink.queryablestate.server.KvStateServerHandler; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyedStateBackend; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.TestLogger; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.net.InetAddress; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests for {@link KvStateServerHandler}. - */ -public class KvStateServerHandlerTest extends TestLogger { - - private static KvStateServerImpl testServer; - - private static final long READ_TIMEOUT_MILLIS = 10000L; - - @BeforeClass - public static void setup() { - try { - testServer = new KvStateServerImpl( - InetAddress.getLocalHost(), - Collections.singletonList(0).iterator(), - 1, - 1, - new KvStateRegistry(), - new DisabledKvStateRequestStats()); - testServer.start(); - } catch (Throwable e) { - e.printStackTrace(); - } - } - - @AfterClass - public static void tearDown() throws Exception { - testServer.shutdown(); - } - - /** - * Tests a simple successful query via an EmbeddedChannel. - */ - @Test - public void testSimpleQuery() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Update the KvState and request it - int expectedValue = 712828289; - - int key = 99812822; - backend.setCurrentKey(key); - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - state.update(expectedValue); - - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - KvStateInternalRequest request = new KvStateInternalRequest( - registryListener.kvStateId, serializedKeyAndNamespace); - - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - long deserRequestId = MessageSerializer.getRequestId(buf); - KvStateResponse response = serializer.deserializeResponse(buf); - - assertEquals(requestId, deserRequestId); - - int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE); - assertEquals(expectedValue, actualValue); - - assertEquals(stats.toString(), 1, stats.getNumRequests()); - - // Wait for async successful request report - long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) { - Thread.sleep(10L); - } - - assertEquals(stats.toString(), 1L, stats.getNumSuccessful()); - } - - /** - * Tests the failure response with {@link UnknownKvStateIdException} as cause on - * queries for unregistered KvStateIDs. - */ - @Test - public void testQueryUnknownKvStateID() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - long requestId = Integer.MAX_VALUE + 182828L; - - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); - - assertEquals(requestId, response.getRequestId()); - - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateIdException); - - assertEquals(1L, stats.getNumRequests()); - assertEquals(1L, stats.getNumFailed()); - } - - /** - * Tests the failure response with {@link UnknownKeyOrNamespaceException} as cause - * on queries for non-existing keys. - */ - @Test - public void testQueryUnknownKey() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - 1238283, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 22982L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); - - assertEquals(requestId, response.getRequestId()); - - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespaceException); - - assertEquals(1L, stats.getNumRequests()); - assertEquals(1L, stats.getNumFailed()); - } - - /** - * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call. - */ - @Test - public void testFailureOnGetSerializedValue() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Failing KvState - InternalKvState<?> kvState = mock(InternalKvState.class); - when(kvState.getSerializedValue(any(byte[].class))) - .thenThrow(new RuntimeException("Expected test Exception")); - - KvStateID kvStateId = registry.registerKvState( - new JobID(), - new JobVertexID(), - new KeyGroupRange(0, 0), - "vanilla", - kvState); - - KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); - - assertTrue(response.getCause().getMessage().contains("Expected test Exception")); - - assertEquals(1L, stats.getNumRequests()); - assertEquals(1L, stats.getNumFailed()); - } - - /** - * Tests that the channel is closed if an Exception reaches the channel handler. - */ - @Test - public void testCloseChannelOnExceptionCaught() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(handler); - - channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); - Throwable response = MessageSerializer.deserializeServerFailure(buf); - - assertTrue(response.getMessage().contains("Expected test Exception")); - - channel.closeFuture().await(READ_TIMEOUT_MILLIS); - assertFalse(channel.isActive()); - } - - /** - * Tests the failure response on a rejected execution, because the query executor has been closed. - */ - @Test - public void testQueryExecutorShutDown() throws Throwable { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateServerImpl localTestServer = new KvStateServerImpl( - InetAddress.getLocalHost(), - Collections.singletonList(0).iterator(), - 1, - 1, - new KvStateRegistry(), - new DisabledKvStateRequestStats()); - - localTestServer.start(); - localTestServer.shutdown(); - assertTrue(localTestServer.isExecutorShutdown()); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(localTestServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); - - assertTrue(registryListener.registrationName.equals("vanilla")); - - KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); - - assertTrue(response.getCause().getMessage().contains("RejectedExecutionException")); - - assertEquals(1L, stats.getNumRequests()); - assertEquals(1L, stats.getNumFailed()); - - localTestServer.shutdown(); - } - - /** - * Tests response on unexpected messages. - */ - @Test - public void testUnexpectedMessage() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - // Write the request and wait for the response - ByteBuf unexpectedMessage = Unpooled.buffer(8); - unexpectedMessage.writeInt(4); - unexpectedMessage.writeInt(123238213); - - channel.writeInbound(unexpectedMessage); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); - Throwable response = MessageSerializer.deserializeServerFailure(buf); - - assertEquals(0L, stats.getNumRequests()); - assertEquals(0L, stats.getNumFailed()); - - KvStateResponse stateResponse = new KvStateResponse(new byte[0]); - unexpectedMessage = MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse); - - channel.writeInbound(unexpectedMessage); - - buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); - response = MessageSerializer.deserializeServerFailure(buf); - - assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException); - - assertEquals(0L, stats.getNumRequests()); - assertEquals(0L, stats.getNumFailed()); - } - - /** - * Tests that incoming buffer instances are recycled. - */ - @Test - public void testIncomingBufferIsRecycled() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); - - assertEquals(1L, serRequest.refCnt()); - - // Write regular request - channel.writeInbound(serRequest); - assertEquals("Buffer not recycled", 0L, serRequest.refCnt()); - - // Write unexpected msg - ByteBuf unexpected = channel.alloc().buffer(8); - unexpected.writeInt(4); - unexpected.writeInt(4); - - assertEquals(1L, unexpected.refCnt()); - - channel.writeInbound(unexpected); - assertEquals("Buffer not recycled", 0L, unexpected.refCnt()); - } - - /** - * Tests the failure response if the serializers don't match. - */ - @Test - public void testSerializerMismatch() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - int key = 99812822; - - // Update the KvState - backend.setCurrentKey(key); - state.update(712828289); - - byte[] wrongKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - "wrong-key-type", - StringSerializer.INSTANCE, - "wrong-namespace-type", - StringSerializer.INSTANCE); - - byte[] wrongNamespace = KvStateSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - "wrong-namespace-type", - StringSerializer.INSTANCE); - - assertTrue(registryListener.registrationName.equals("vanilla")); - - KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182828L, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - ByteBuf buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); - assertEquals(182828L, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IOException")); - - // Repeat with wrong namespace only - request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace); - serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182829L, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - buf = (ByteBuf) readInboundBlocking(channel); - buf.skipBytes(4); // skip frame length - - // Verify the response - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - response = MessageSerializer.deserializeRequestFailure(buf); - assertEquals(182829L, response.getRequestId()); - assertTrue(response.getCause().getMessage().contains("IOException")); - - assertEquals(2L, stats.getNumRequests()); - assertEquals(2L, stats.getNumFailed()); - } - - /** - * Tests that large responses are chunked. - */ - @Test - public void testChunkedResponse() throws Exception { - KvStateRegistry registry = new KvStateRegistry(); - KvStateRequestStats stats = new AtomicKvStateRequestStats(); - - MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); - EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); - - final TestRegistryListener registryListener = new TestRegistryListener(); - registry.registerListener(registryListener); - - // Register state - ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<byte[]> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - // Update KvState - byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()]; - - byte current = 0; - for (int i = 0; i < bytes.length; i++) { - bytes[i] = current++; - } - - int key = 99812822; - backend.setCurrentKey(key); - state.update(bytes); - - // Request - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace); - ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); - - // Write the request and wait for the response - channel.writeInbound(serRequest); - - Object msg = readInboundBlocking(channel); - assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf); - } - - // ------------------------------------------------------------------------ - - /** - * Queries the embedded channel for data. - */ - private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException { - final long sleepMillis = 50L; - - long sleptMillis = 0L; - - Object msg = null; - while (sleptMillis < READ_TIMEOUT_MILLIS && - (msg = channel.readOutbound()) == null) { - - Thread.sleep(sleepMillis); - sleptMillis += sleepMillis; - } - - if (msg == null) { - throw new TimeoutException(); - } else { - return msg; - } - } - - /** - * Frame length decoder (expected by the serialized messages). - */ - private ChannelHandler getFrameDecoder() { - return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4); - } - - /** - * A listener that keeps the last updated KvState information so that a test - * can retrieve it. - */ - static class TestRegistryListener implements KvStateRegistryListener { - volatile JobVertexID jobVertexID; - volatile KeyGroupRange keyGroupIndex; - volatile String registrationName; - volatile KvStateID kvStateId; - - @Override - public void notifyKvStateRegistered(JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName, - KvStateID kvStateId) { - this.jobVertexID = jobVertexId; - this.keyGroupIndex = keyGroupRange; - this.registrationName = registrationName; - this.kvStateId = kvStateId; - } - - @Override - public void notifyKvStateUnregistered(JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName) { - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java deleted file mode 100644 index 7abc84e..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; - -import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; - -import java.net.InetAddress; -import java.util.Collections; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link KvStateServerImpl}. - */ -public class KvStateServerTest { - - // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); - - private static final int TIMEOUT_MILLIS = 10000; - - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); - } - } - - /** - * Tests a simple successful query via a SocketChannel. - */ - @Test - public void testSimpleRequest() throws Throwable { - KvStateServerImpl server = null; - Bootstrap bootstrap = null; - try { - KvStateRegistry registry = new KvStateRegistry(); - KvStateRequestStats stats = new AtomicKvStateRequestStats(); - - server = new KvStateServerImpl(InetAddress.getLocalHost(), - Collections.singletonList(0).iterator(), 1, 1, registry, stats); - server.start(); - - KvStateServerAddress serverAddress = server.getServerAddress(); - int numKeyGroups = 1; - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(registry); - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - registry.createTaskRegistry(new JobID(), new JobVertexID())); - - final KvStateServerHandlerTest.TestRegistryListener registryListener = - new KvStateServerHandlerTest.TestRegistryListener(); - - registry.registerListener(registryListener); - - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("vanilla"); - - ValueState<Integer> state = backend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - // Update KvState - int expectedValue = 712828289; - - int key = 99812822; - backend.setCurrentKey(key); - state.update(expectedValue); - - // Request - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - // Connect to the server - final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>(); - bootstrap = createBootstrap( - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - responses.add((ByteBuf) msg); - } - }); - - Channel channel = bootstrap - .connect(serverAddress.getHost(), serverAddress.getPort()) - .sync().channel(); - - long requestId = Integer.MAX_VALUE + 182828L; - - assertTrue(registryListener.registrationName.equals("vanilla")); - - final KvStateInternalRequest request = new KvStateInternalRequest( - registryListener.kvStateId, - serializedKeyAndNamespace); - - ByteBuf serializeRequest = MessageSerializer.serializeRequest( - channel.alloc(), - requestId, - request); - - channel.writeAndFlush(serializeRequest); - - ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - assertEquals(requestId, MessageSerializer.getRequestId(buf)); - KvStateResponse response = server.getSerializer().deserializeResponse(buf); - - int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE); - assertEquals(expectedValue, actualValue); - } finally { - if (server != null) { - server.shutdown(); - } - - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(); - } - } - } - } - - /** - * Creates a client bootstrap. - */ - private Bootstrap createBootstrap(final ChannelHandler... handlers) { - return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(handlers); - } - }); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java deleted file mode 100644 index 32a0c9b..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.queryablestate.messages.KvStateInternalRequest; -import org.apache.flink.queryablestate.messages.KvStateResponse; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.network.messages.RequestFailure; -import org.apache.flink.runtime.query.KvStateID; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * Tests for {@link MessageSerializer}. - */ -@RunWith(Parameterized.class) -public class MessageSerializerTest { - - private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; - - @Parameterized.Parameters - public static Collection<Boolean> parameters() { - return Arrays.asList(false, true); - } - - @Parameterized.Parameter - public boolean async; - - /** - * Tests request serialization. - */ - @Test - public void testRequestSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1337L; - KvStateID kvStateId = new KvStateID(); - byte[] serializedKeyAndNamespace = randomByteArray(1024); - - final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - assertEquals(requestId, MessageSerializer.getRequestId(buf)); - KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); - - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(kvStateId, requestDeser.getKvStateId()); - assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); - } - - /** - * Tests request serialization with zero-length serialized key and namespace. - */ - @Test - public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { - - long requestId = Integer.MAX_VALUE + 1337L; - KvStateID kvStateId = new KvStateID(); - byte[] serializedKeyAndNamespace = new byte[0]; - - final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - assertEquals(requestId, MessageSerializer.getRequestId(buf)); - KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); - - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(kvStateId, requestDeser.getKvStateId()); - assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> key and namespace. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { - new KvStateInternalRequest(new KvStateID(), null); - } - - /** - * Tests response serialization. - */ - @Test - public void testResponseSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 72727278L; - byte[] serializedResult = randomByteArray(1024); - - final KvStateResponse response = new KvStateResponse(serializedResult); - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, response); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - assertEquals(requestId, MessageSerializer.getRequestId(buf)); - KvStateResponse responseDeser = serializer.deserializeResponse(buf); - - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedResult, responseDeser.getContent()); - } - - /** - * Tests response serialization with zero-length serialized result. - */ - @Test - public void testResponseSerializationWithZeroLengthSerializedResult() throws Exception { - byte[] serializedResult = new byte[0]; - - final KvStateResponse response = new KvStateResponse(serializedResult); - final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = - new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); - - ByteBuf buf = MessageSerializer.serializeResponse(alloc, 72727278L, response); - - int frameLength = buf.readInt(); - - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - assertEquals(72727278L, MessageSerializer.getRequestId(buf)); - KvStateResponse responseDeser = serializer.deserializeResponse(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedResult, responseDeser.getContent()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> results. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedResult() throws Exception { - new KvStateResponse((byte[]) null); - } - - /** - * Tests request failure serialization. - */ - @Test - public void testKvStateRequestFailureSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1111222L; - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, requestId, cause); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - RequestFailure requestFailure = MessageSerializer.deserializeRequestFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, requestFailure.getRequestId()); - assertEquals(cause.getClass(), requestFailure.getCause().getClass()); - assertEquals(cause.getMessage(), requestFailure.getCause().getMessage()); - } - - /** - * Tests server failure serialization. - */ - @Test - public void testServerFailureSerialization() throws Exception { - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause); - - int frameLength = buf.readInt(); - assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); - Throwable request = MessageSerializer.deserializeServerFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(cause.getClass(), request.getClass()); - assertEquals(cause.getMessage(), request.getMessage()); - } - - private byte[] randomByteArray(int capacity) { - byte[] bytes = new byte[capacity]; - ThreadLocalRandom.current().nextBytes(bytes); - return bytes; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java deleted file mode 100644 index 2e05f61..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableAggregatingStateTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.queryablestate.client.state.ImmutableAggregatingState; - -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; - -import static org.junit.Assert.assertEquals; - -/** - * Tests the {@link ImmutableAggregatingStateTest}. - */ -public class ImmutableAggregatingStateTest { - - private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc = - new AggregatingStateDescriptor<>( - "test", - new SumAggr(), - String.class); - - private ImmutableAggregatingState<Long, String> aggrState; - - @Before - public void setUp() throws Exception { - if (!aggrStateDesc.isSerializerInitialized()) { - aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - final String initValue = "42"; - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out)); - - aggrState = ImmutableAggregatingState.createState( - aggrStateDesc, - out.toByteArray() - ); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - String value = aggrState.get(); - assertEquals("42", value); - - aggrState.add(54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - String value = aggrState.get(); - assertEquals("42", value); - - aggrState.clear(); - } - - /** - * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. - */ - private static class SumAggr implements AggregateFunction<Long, String, String> { - - private static final long serialVersionUID = -6249227626701264599L; - - @Override - public String createAccumulator() { - return ""; - } - - @Override - public String add(Long value, String accumulator) { - accumulator += ", " + value; - return accumulator; - } - - @Override - public String getResult(String accumulator) { - return accumulator; - } - - @Override - public String merge(String a, String b) { - return a + ", " + b; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java deleted file mode 100644 index d2c9535..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableFoldingStateTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.state; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.queryablestate.client.state.ImmutableFoldingState; - -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; - -import static org.junit.Assert.assertEquals; - -/** - * Tests the {@link ImmutableFoldingState}. - */ -public class ImmutableFoldingStateTest { - - private final FoldingStateDescriptor<Long, String> foldingStateDesc = - new FoldingStateDescriptor<>( - "test", - "0", - new SumFold(), - StringSerializer.INSTANCE); - - private ImmutableFoldingState<Long, String> foldingState; - - @Before - public void setUp() throws Exception { - if (!foldingStateDesc.isSerializerInitialized()) { - foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); - } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out)); - - foldingState = ImmutableFoldingState.createState( - foldingStateDesc, - out.toByteArray() - ); - } - - @Test(expected = UnsupportedOperationException.class) - public void testUpdate() { - String value = foldingState.get(); - assertEquals("42", value); - - foldingState.add(54L); - } - - @Test(expected = UnsupportedOperationException.class) - public void testClear() { - String value = foldingState.get(); - assertEquals("42", value); - - foldingState.clear(); - } - - /** - * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. - */ - private static class SumFold implements FoldFunction<Long, String> { - - private static final long serialVersionUID = -6249227626701264599L; - - @Override - public String fold(String accumulator, Long value) throws Exception { - long acc = Long.valueOf(accumulator); - acc += value; - return Long.toString(acc); - } - } -}
