http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java deleted file mode 100644 index a2850b3..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java +++ /dev/null @@ -1,752 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.queryablestate.client.KvStateClient; -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.server.KvStateServerImpl; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.util.NetUtils; - -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; - -import org.junit.AfterClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.ConnectException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link KvStateClient}. - */ -public class KvStateClientTest { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class); - - // Thread pool for client bootstrap (shared between tests) - private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); - - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); - - @AfterClass - public static void tearDown() throws Exception { - if (NIO_GROUP != null) { - NIO_GROUP.shutdownGracefully(); - } - } - - /** - * Tests simple queries, of which half succeed and half fail. - */ - @Test - public void testSimpleRequests() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - // Random result - final byte[] expected = new byte[1024]; - ThreadLocalRandom.current().nextBytes(expected); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - List<Future<byte[]>> futures = new ArrayList<>(); - - int numQueries = 1024; - - for (int i = 0; i < numQueries; i++) { - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - } - - // Respond to messages - Exception testException = new RuntimeException("Expected test Exception"); - - for (int i = 0; i < numQueries; i++) { - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); - - buf.release(); - - if (i % 2 == 0) { - ByteBuf response = MessageSerializer.serializeKvStateRequestResult( - serverChannel.alloc(), - request.getRequestId(), - expected); - - ch.writeAndFlush(response); - } else { - ByteBuf response = MessageSerializer.serializeKvStateRequestFailure( - serverChannel.alloc(), - request.getRequestId(), - testException); - - ch.writeAndFlush(response); - } - } - - for (int i = 0; i < numQueries; i++) { - if (i % 2 == 0) { - byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft()); - assertArrayEquals(expected, serializedResult); - } else { - try { - Await.result(futures.get(i), deadline.timeLeft()); - fail("Did not throw expected Exception"); - } catch (RuntimeException ignored) { - // Expected - } - } - } - - assertEquals(numQueries, stats.getNumRequests()); - int expectedRequests = numQueries / 2; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests || - stats.getNumFailed() != expectedRequests)) { - Thread.sleep(100); - } - - assertEquals(expectedRequests, stats.getNumSuccessful()); - assertEquals(expectedRequests, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a request to an unavailable host is failed with ConnectException. - */ - @Test - public void testRequestUnavailableHost() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateClient client = null; - - try { - client = new KvStateClient(1, stats); - - int availablePort = NetUtils.getAvailablePort(); - - KvStateServerAddress serverAddress = new KvStateServerAddress( - InetAddress.getLocalHost(), - availablePort); - - Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); - - try { - Await.result(future, deadline.timeLeft()); - fail("Did not throw expected ConnectException"); - } catch (ConnectException ignored) { - // Expected - } - } finally { - if (client != null) { - client.shutDown(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Multiple threads concurrently fire queries. - */ - @Test - public void testConcurrentQueries() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - ExecutorService executor = null; - KvStateClient client = null; - Channel serverChannel = null; - - final byte[] serializedResult = new byte[1024]; - ThreadLocalRandom.current().nextBytes(serializedResult); - - try { - int numQueryTasks = 4; - final int numQueriesPerTask = 1024; - - executor = Executors.newFixedThreadPool(numQueryTasks); - - client = new KvStateClient(1, stats); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf buf = (ByteBuf) msg; - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); - - buf.release(); - - ByteBuf response = MessageSerializer.serializeKvStateRequestResult( - ctx.alloc(), - request.getRequestId(), - serializedResult); - - ctx.channel().writeAndFlush(response); - } - }); - - final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - final KvStateClient finalClient = client; - Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() { - @Override - public List<Future<byte[]>> call() throws Exception { - List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask); - - for (int i = 0; i < numQueriesPerTask; i++) { - results.add(finalClient.getKvState( - serverAddress, - new KvStateID(), - new byte[0])); - } - - return results; - } - }; - - // Submit query tasks - List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>(); - for (int i = 0; i < numQueryTasks; i++) { - futures.add(executor.submit(queryTask)); - } - - // Verify results - for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) { - List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - for (Future<byte[]> result : results) { - byte[] actual = Await.result(result, deadline.timeLeft()); - assertArrayEquals(serializedResult, actual); - } - } - - int totalQueries = numQueryTasks * numQueriesPerTask; - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { - Thread.sleep(100); - } - - assertEquals(totalQueries, stats.getNumRequests()); - assertEquals(totalQueries, stats.getNumSuccessful()); - } finally { - if (executor != null) { - executor.shutdown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - if (client != null) { - client.shutDown(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a server failure closes the connection and removes it from - * the established connections. - */ - @Test - public void testFailureClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.add((ByteBuf) msg); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - List<Future<byte[]>> futures = new ArrayList<>(); - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0])); - - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - assertNotNull("Receive timed out", buf); - buf.release(); - - assertEquals(1, stats.getNumConnections()); - - Channel ch = channel.get(); - assertNotNull("Channel not active", ch); - - // Respond with failure - ch.writeAndFlush(MessageSerializer.serializeServerFailure( - serverChannel.alloc(), - new RuntimeException("Expected test server failure"))); - - try { - Await.result(futures.remove(0), deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (RuntimeException ignored) { - // Expected - } - - try { - Await.result(futures.remove(0), deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (RuntimeException ignored) { - // Expected - } - - assertEquals(0, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || - stats.getNumFailed() != 2)) { - Thread.sleep(100); - } - - assertEquals(2, stats.getNumRequests()); - assertEquals(0, stats.getNumSuccessful()); - assertEquals(2, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests that a server channel close, closes the connection and removes it - * from the established connections. - */ - @Test - public void testServerClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); - AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - Channel serverChannel = null; - - try { - client = new KvStateClient(1, stats); - - final AtomicBoolean received = new AtomicBoolean(); - final AtomicReference<Channel> channel = new AtomicReference<>(); - - serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - received.set(true); - } - }); - - KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel); - - // Requests - Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]); - - while (!received.get() && deadline.hasTimeLeft()) { - Thread.sleep(50); - } - assertTrue("Receive timed out", received.get()); - - assertEquals(1, stats.getNumConnections()); - - channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - try { - Await.result(future, deadline.timeLeft()); - fail("Did not throw expected server failure"); - } catch (ClosedChannelException ignored) { - // Expected - } - - assertEquals(0, stats.getNumConnections()); - - // Counts can take some time to propagate - while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 || - stats.getNumFailed() != 1)) { - Thread.sleep(100); - } - - assertEquals(1, stats.getNumRequests()); - assertEquals(0, stats.getNumSuccessful()); - assertEquals(1, stats.getNumFailed()); - } finally { - if (client != null) { - client.shutDown(); - } - - if (serverChannel != null) { - serverChannel.close(); - } - - assertEquals("Channel leak", 0, stats.getNumConnections()); - } - } - - /** - * Tests multiple clients querying multiple servers until 100k queries have - * been processed. At this point, the client is shut down and its verified - * that all ongoing requests are failed. - */ - @Test - public void testClientServerIntegration() throws Exception { - // Config - final int numServers = 2; - final int numServerEventLoopThreads = 2; - final int numServerQueryThreads = 2; - - final int numClientEventLoopThreads = 4; - final int numClientsTasks = 8; - - final int batchSize = 16; - - final int numKeyGroups = 1; - - AbstractStateBackend abstractBackend = new MemoryStateBackend(); - KvStateRegistry dummyRegistry = new KvStateRegistry(); - DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); - dummyEnv.setKvStateRegistry(dummyRegistry); - - AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); - - final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); - - KvStateClient client = null; - ExecutorService clientTaskExecutor = null; - final KvStateServer[] server = new KvStateServer[numServers]; - - try { - client = new KvStateClient(numClientEventLoopThreads, clientStats); - clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks); - - // Create state - ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); - desc.setQueryable("any"); - - // Create servers - KvStateRegistry[] registry = new KvStateRegistry[numServers]; - AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; - final KvStateID[] ids = new KvStateID[numServers]; - - for (int i = 0; i < numServers; i++) { - registry[i] = new KvStateRegistry(); - serverStats[i] = new AtomicKvStateRequestStats(); - server[i] = new KvStateServerImpl( - InetAddress.getLocalHost(), - 0, - numServerEventLoopThreads, - numServerQueryThreads, - registry[i], - serverStats[i]); - - server[i].start(); - - backend.setCurrentKey(1010 + i); - - // Value per server - ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - desc); - - state.update(201 + i); - - // we know it must be a KvStat but this is not exposed to the user via State - InternalKvState<?> kvState = (InternalKvState<?>) state; - - // Register KvState (one state instance for all server) - ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); - } - - final KvStateClient finalClient = client; - Callable<Void> queryTask = new Callable<Void>() { - @Override - public Void call() throws Exception { - while (true) { - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - // Random server permutation - List<Integer> random = new ArrayList<>(); - for (int j = 0; j < batchSize; j++) { - random.add(j); - } - Collections.shuffle(random); - - // Dispatch queries - List<Future<byte[]>> futures = new ArrayList<>(batchSize); - - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - 1010 + targetServer, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - futures.add(finalClient.getKvState( - server[targetServer].getAddress(), - ids[targetServer], - serializedKeyAndNamespace)); - } - - // Verify results - for (int j = 0; j < batchSize; j++) { - int targetServer = random.get(j) % numServers; - - Future<byte[]> future = futures.get(j); - byte[] buf = Await.result(future, timeout); - int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE); - assertEquals(201 + targetServer, value); - } - } - } - }; - - // Submit tasks - List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>(); - for (int i = 0; i < numClientsTasks; i++) { - taskFutures.add(clientTaskExecutor.submit(queryTask)); - } - - long numRequests; - while ((numRequests = clientStats.getNumRequests()) < 100_000) { - Thread.sleep(100); - LOG.info("Number of requests {}/100_000", numRequests); - } - - // Shut down - client.shutDown(); - - for (java.util.concurrent.Future<Void> future : taskFutures) { - try { - future.get(); - fail("Did not throw expected Exception after shut down"); - } catch (ExecutionException t) { - if (t.getCause() instanceof ClosedChannelException || - t.getCause() instanceof IllegalStateException) { - // Expected - } else { - t.printStackTrace(); - fail("Failed with unexpected Exception type: " + t.getClass().getName()); - } - } - } - - assertEquals("Connection leak (client)", 0, clientStats.getNumConnections()); - for (int i = 0; i < numServers; i++) { - boolean success = false; - int numRetries = 0; - while (!success) { - try { - assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections()); - success = true; - } catch (Throwable t) { - if (numRetries < 10) { - LOG.info("Retrying connection leak check (server)"); - Thread.sleep((numRetries + 1) * 50); - numRetries++; - } else { - throw t; - } - } - } - } - } finally { - if (client != null) { - client.shutDown(); - } - - for (int i = 0; i < numServers; i++) { - if (server[i] != null) { - server[i].shutDown(); - } - } - - if (clientTaskExecutor != null) { - clientTaskExecutor.shutdown(); - } - } - } - - // ------------------------------------------------------------------------ - - private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException { - ServerBootstrap bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(InetAddress.getLocalHost(), 0) - // NIO server channels - .group(NIO_GROUP) - .channel(NioServerSocketChannel.class) - // See initializer for pipeline details - .childHandler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(handlers); - } - }); - - return bootstrap.bind().sync().channel(); - } - - private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) { - InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress(); - - return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java deleted file mode 100644 index f28ca68..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate.network; - -import org.apache.flink.queryablestate.messages.KvStateRequest; -import org.apache.flink.queryablestate.messages.KvStateRequestFailure; -import org.apache.flink.queryablestate.messages.KvStateRequestResult; -import org.apache.flink.queryablestate.network.messages.MessageSerializer; -import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collection; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -/** - * Tests for {@link KvStateSerializer}. - */ -@RunWith(Parameterized.class) -public class KvStateRequestSerializerTest { - - private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; - - @Parameterized.Parameters - public static Collection<Boolean> parameters() { - return Arrays.asList(false, true); - } - - @Parameterized.Parameter - public boolean async; - - /** - * Tests KvState request serialization. - */ - @Test - public void testKvStateRequestSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1337L; - KvStateID kvStateId = new KvStateID(); - byte[] serializedKeyAndNamespace = randomByteArray(1024); - - ByteBuf buf = MessageSerializer.serializeKvStateRequest( - alloc, - requestId, - kvStateId, - serializedKeyAndNamespace); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - assertEquals(kvStateId, request.getKvStateId()); - assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); - } - - /** - * Tests KvState request serialization with zero-length serialized key and namespace. - */ - @Test - public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { - byte[] serializedKeyAndNamespace = new byte[0]; - - ByteBuf buf = MessageSerializer.serializeKvStateRequest( - alloc, - 1823, - new KvStateID(), - serializedKeyAndNamespace); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); - KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> key and namespace. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { - new KvStateRequest(0, new KvStateID(), null); - } - - /** - * Tests KvState request result serialization. - */ - @Test - public void testKvStateRequestResultSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 72727278L; - byte[] serializedResult = randomByteArray(1024); - - ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( - alloc, - requestId, - serializedResult); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - - assertArrayEquals(serializedResult, request.getSerializedResult()); - } - - /** - * Tests KvState request result serialization with zero-length serialized result. - */ - @Test - public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception { - byte[] serializedResult = new byte[0]; - - ByteBuf buf = MessageSerializer.serializeKvStateRequestResult( - alloc, - 72727278, - serializedResult); - - int frameLength = buf.readInt(); - - assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertArrayEquals(serializedResult, request.getSerializedResult()); - } - - /** - * Tests that we don't try to be smart about <code>null</code> results. - * They should be treated explicitly. - */ - @Test(expected = NullPointerException.class) - public void testNullPointerExceptionOnNullSerializedResult() throws Exception { - new KvStateRequestResult(0, null); - } - - /** - * Tests KvState request failure serialization. - */ - @Test - public void testKvStateRequestFailureSerialization() throws Exception { - long requestId = Integer.MAX_VALUE + 1111222L; - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure( - alloc, - requestId, - cause); - - int frameLength = buf.readInt(); - assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(requestId, request.getRequestId()); - assertEquals(cause.getClass(), request.getCause().getClass()); - assertEquals(cause.getMessage(), request.getCause().getMessage()); - } - - /** - * Tests KvState server failure serialization. - */ - @Test - public void testServerFailureSerialization() throws Exception { - IllegalStateException cause = new IllegalStateException("Expected test"); - - ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause); - - int frameLength = buf.readInt(); - assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); - Throwable request = MessageSerializer.deserializeServerFailure(buf); - assertEquals(buf.readerIndex(), frameLength + 4); - - assertEquals(cause.getClass(), request.getClass()); - assertEquals(cause.getMessage(), request.getMessage()); - } - - private byte[] randomByteArray(int capacity) { - byte[] bytes = new byte[capacity]; - ThreadLocalRandom.current().nextBytes(bytes); - return bytes; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index c37c822..944349ee 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -24,20 +24,22 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.queryablestate.UnknownKeyOrNamespace; -import org.apache.flink.queryablestate.UnknownKvStateID; -import org.apache.flink.queryablestate.messages.KvStateRequestFailure; -import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.UnknownKeyOrNamespaceException; +import org.apache.flink.queryablestate.UnknownKvStateIdException; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.queryablestate.network.messages.MessageType; -import org.apache.flink.queryablestate.server.ChunkedByteBuf; +import org.apache.flink.queryablestate.network.messages.RequestFailure; import org.apache.flink.queryablestate.server.KvStateServerHandler; +import org.apache.flink.queryablestate.server.KvStateServerImpl; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateRegistryListener; import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateRequestStats; import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -57,10 +59,11 @@ import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -76,16 +79,28 @@ import static org.mockito.Mockito.when; */ public class KvStateServerHandlerTest extends TestLogger { - /** Shared Thread pool for query execution. */ - private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor(); - - private static final int READ_TIMEOUT_MILLIS = 10000; + private static KvStateServerImpl testServer; + + private static final long READ_TIMEOUT_MILLIS = 10000L; + + @BeforeClass + public static void setup() { + try { + testServer = new KvStateServerImpl( + InetAddress.getLocalHost(), + 0, + 1, + 1, + new KvStateRegistry(), + new DisabledKvStateRequestStats()); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } @AfterClass public static void tearDown() throws Exception { - if (TEST_THREAD_POOL != null) { - TEST_THREAD_POOL.shutdown(); - } + testServer.shutdown(); } /** @@ -96,7 +111,10 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Register state @@ -141,40 +159,40 @@ public class KvStateServerHandlerTest extends TestLogger { assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); + KvStateInternalRequest request = new KvStateInternalRequest( + registryListener.kvStateId, serializedKeyAndNamespace); + + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf); + long deserRequestId = MessageSerializer.getRequestId(buf); + KvStateResponse response = serializer.deserializeResponse(buf); - assertEquals(requestId, response.getRequestId()); + assertEquals(requestId, deserRequestId); - int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); + int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE); assertEquals(expectedValue, actualValue); assertEquals(stats.toString(), 1, stats.getNumRequests()); // Wait for async successful request report long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); - while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) { - Thread.sleep(10); + while (stats.getNumSuccessful() != 1L && System.nanoTime() <= deadline) { + Thread.sleep(10L); } - assertEquals(stats.toString(), 1, stats.getNumSuccessful()); + assertEquals(stats.toString(), 1L, stats.getNumSuccessful()); } /** - * Tests the failure response with {@link UnknownKvStateID} as cause on + * Tests the failure response with {@link UnknownKvStateIdException} as cause on * queries for unregistered KvStateIDs. */ @Test @@ -182,36 +200,38 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); long requestId = Integer.MAX_VALUE + 182828L; - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - new KvStateID(), - new byte[0]); + + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); assertEquals(requestId, response.getRequestId()); - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID); + assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateIdException); - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); + assertEquals(1L, stats.getNumRequests()); + assertEquals(1L, stats.getNumFailed()); } /** - * Tests the failure response with {@link UnknownKeyOrNamespace} as cause + * Tests the failure response with {@link UnknownKeyOrNamespaceException} as cause * on queries for non-existing keys. */ @Test @@ -219,7 +239,10 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); int numKeyGroups = 1; @@ -254,40 +277,39 @@ public class KvStateServerHandlerTest extends TestLogger { assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); + KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); assertEquals(requestId, response.getRequestId()); - assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace); + assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespaceException); - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); + assertEquals(1L, stats.getNumRequests()); + assertEquals(1L, stats.getNumFailed()); } /** - * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} - * call. + * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} call. */ @Test public void testFailureOnGetSerializedValue() throws Exception { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Failing KvState @@ -302,38 +324,37 @@ public class KvStateServerHandlerTest extends TestLogger { "vanilla", kvState); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - kvStateId, - new byte[0]); + KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, new byte[0]); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); assertTrue(response.getCause().getMessage().contains("Expected test Exception")); - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); + assertEquals(1L, stats.getNumRequests()); + assertEquals(1L, stats.getNumFailed()); } /** - * Tests that the channel is closed if an Exception reaches the channel - * handler. + * Tests that the channel is closed if an Exception reaches the channel handler. */ @Test public void testCloseChannelOnExceptionCaught() throws Exception { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(handler); channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); @@ -352,19 +373,28 @@ public class KvStateServerHandlerTest extends TestLogger { } /** - * Tests the failure response on a rejected execution, because the query - * executor has been closed. + * Tests the failure response on a rejected execution, because the query executor has been closed. */ @Test public void testQueryExecutorShutDown() throws Exception { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - ExecutorService closedExecutor = Executors.newSingleThreadExecutor(); - closedExecutor.shutdown(); - assertTrue(closedExecutor.isShutdown()); + KvStateServerImpl localTestServer = new KvStateServerImpl( + InetAddress.getLocalHost(), + 0, + 1, + 1, + new KvStateRegistry(), + new DisabledKvStateRequestStats()); + + localTestServer.shutdown(); + assertTrue(localTestServer.isExecutorShutdown()); - KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(localTestServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); int numKeyGroups = 1; @@ -391,26 +421,25 @@ public class KvStateServerHandlerTest extends TestLogger { assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - registryListener.kvStateId, - new byte[0]); + KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, new byte[0]); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); assertTrue(response.getCause().getMessage().contains("RejectedExecutionException")); - assertEquals(1, stats.getNumRequests()); - assertEquals(1, stats.getNumFailed()); + assertEquals(1L, stats.getNumRequests()); + assertEquals(1L, stats.getNumFailed()); + + localTestServer.shutdown(); } /** @@ -421,7 +450,10 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); // Write the request and wait for the response @@ -438,13 +470,11 @@ public class KvStateServerHandlerTest extends TestLogger { assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); Throwable response = MessageSerializer.deserializeServerFailure(buf); - assertEquals(0, stats.getNumRequests()); - assertEquals(0, stats.getNumFailed()); + assertEquals(0L, stats.getNumRequests()); + assertEquals(0L, stats.getNumFailed()); - unexpectedMessage = MessageSerializer.serializeKvStateRequestResult( - channel.alloc(), - 192, - new byte[0]); + KvStateResponse stateResponse = new KvStateResponse(new byte[0]); + unexpectedMessage = MessageSerializer.serializeResponse(channel.alloc(), 192L, stateResponse); channel.writeInbound(unexpectedMessage); @@ -457,8 +487,8 @@ public class KvStateServerHandlerTest extends TestLogger { assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException); - assertEquals(0, stats.getNumRequests()); - assertEquals(0, stats.getNumFailed()); + assertEquals(0L, stats.getNumRequests()); + assertEquals(0L, stats.getNumFailed()); } /** @@ -469,30 +499,30 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - 282872, - new KvStateID(), - new byte[0]); + KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 282872L, request); - assertEquals(1, request.refCnt()); + assertEquals(1L, serRequest.refCnt()); // Write regular request - channel.writeInbound(request); - assertEquals("Buffer not recycled", 0, request.refCnt()); + channel.writeInbound(serRequest); + assertEquals("Buffer not recycled", 0L, serRequest.refCnt()); // Write unexpected msg ByteBuf unexpected = channel.alloc().buffer(8); unexpected.writeInt(4); unexpected.writeInt(4); - assertEquals(1, unexpected.refCnt()); + assertEquals(1L, unexpected.refCnt()); channel.writeInbound(unexpected); - assertEquals("Buffer not recycled", 0, unexpected.refCnt()); + assertEquals("Buffer not recycled", 0L, unexpected.refCnt()); } /** @@ -503,7 +533,10 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); int numKeyGroups = 1; @@ -550,45 +583,40 @@ public class KvStateServerHandlerTest extends TestLogger { StringSerializer.INSTANCE); assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - 182828, - registryListener.kvStateId, - wrongKeyAndNamespace); + + KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, wrongKeyAndNamespace); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182828L, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); ByteBuf buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(182828, response.getRequestId()); + RequestFailure response = MessageSerializer.deserializeRequestFailure(buf); + assertEquals(182828L, response.getRequestId()); assertTrue(response.getCause().getMessage().contains("IOException")); // Repeat with wrong namespace only - request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - 182829, - registryListener.kvStateId, - wrongNamespace); + request = new KvStateInternalRequest(registryListener.kvStateId, wrongNamespace); + serRequest = MessageSerializer.serializeRequest(channel.alloc(), 182829L, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); buf = (ByteBuf) readInboundBlocking(channel); buf.skipBytes(4); // skip frame length // Verify the response assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); - response = MessageSerializer.deserializeKvStateRequestFailure(buf); - assertEquals(182829, response.getRequestId()); + response = MessageSerializer.deserializeRequestFailure(buf); + assertEquals(182829L, response.getRequestId()); assertTrue(response.getCause().getMessage().contains("IOException")); - assertEquals(2, stats.getNumRequests()); - assertEquals(2, stats.getNumFailed()); + assertEquals(2L, stats.getNumRequests()); + assertEquals(2L, stats.getNumFailed()); } /** @@ -599,7 +627,10 @@ public class KvStateServerHandlerTest extends TestLogger { KvStateRegistry registry = new KvStateRegistry(); KvStateRequestStats stats = new AtomicKvStateRequestStats(); - KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + KvStateServerHandler handler = new KvStateServerHandler(testServer, registry, serializer, stats); EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); int numKeyGroups = 1; @@ -650,14 +681,11 @@ public class KvStateServerHandlerTest extends TestLogger { assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - registryListener.kvStateId, - serializedKeyAndNamespace); + KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace); + ByteBuf serRequest = MessageSerializer.serializeRequest(channel.alloc(), requestId, request); // Write the request and wait for the response - channel.writeInbound(request); + channel.writeInbound(serRequest); Object msg = readInboundBlocking(channel); assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf); @@ -669,9 +697,9 @@ public class KvStateServerHandlerTest extends TestLogger { * Queries the embedded channel for data. */ private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException { - final int sleepMillis = 50; + final long sleepMillis = 50L; - int sleptMillis = 0; + long sleptMillis = 0L; Object msg = null; while (sleptMillis < READ_TIMEOUT_MILLIS && http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java index 9332e68..b7f489a 100644 --- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -22,14 +22,14 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; import org.apache.flink.queryablestate.network.messages.MessageSerializer; import org.apache.flink.queryablestate.network.messages.MessageType; import org.apache.flink.queryablestate.server.KvStateServerImpl; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateRequestStats; @@ -66,7 +66,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** - * Tests for {@link KvStateServer}. + * Tests for {@link KvStateServerImpl}. */ public class KvStateServerTest { @@ -87,7 +87,7 @@ public class KvStateServerTest { */ @Test public void testSimpleRequest() throws Exception { - KvStateServer server = null; + KvStateServerImpl server = null; Bootstrap bootstrap = null; try { KvStateRegistry registry = new KvStateRegistry(); @@ -96,7 +96,7 @@ public class KvStateServerTest { server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats); server.start(); - KvStateServerAddress serverAddress = server.getAddress(); + KvStateServerAddress serverAddress = server.getServerAddress(); int numKeyGroups = 1; AbstractStateBackend abstractBackend = new MemoryStateBackend(); DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); @@ -155,25 +155,29 @@ public class KvStateServerTest { long requestId = Integer.MAX_VALUE + 182828L; assertTrue(registryListener.registrationName.equals("vanilla")); - ByteBuf request = MessageSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, + + final KvStateInternalRequest request = new KvStateInternalRequest( registryListener.kvStateId, serializedKeyAndNamespace); - channel.writeAndFlush(request); + ByteBuf serializeRequest = MessageSerializer.serializeRequest( + channel.alloc(), + requestId, + request); + + channel.writeAndFlush(serializeRequest); ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); - KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateResponse response = server.getSerializer().deserializeResponse(buf); - assertEquals(requestId, response.getRequestId()); - int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); + int actualValue = KvStateSerializer.deserializeValue(response.getContent(), IntSerializer.INSTANCE); assertEquals(expectedValue, actualValue); } finally { if (server != null) { - server.shutDown(); + server.shutdown(); } if (bootstrap != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java new file mode 100644 index 0000000..32a0c9b --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.network.messages.RequestFailure; +import org.apache.flink.runtime.query.KvStateID; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link MessageSerializer}. + */ +@RunWith(Parameterized.class) +public class MessageSerializerTest { + + private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + + @Parameterized.Parameters + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean async; + + /** + * Tests request serialization. + */ + @Test + public void testRequestSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1337L; + KvStateID kvStateId = new KvStateID(); + byte[] serializedKeyAndNamespace = randomByteArray(1024); + + final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(kvStateId, requestDeser.getKvStateId()); + assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); + } + + /** + * Tests request serialization with zero-length serialized key and namespace. + */ + @Test + public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { + + long requestId = Integer.MAX_VALUE + 1337L; + KvStateID kvStateId = new KvStateID(); + byte[] serializedKeyAndNamespace = new byte[0]; + + final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(kvStateId, requestDeser.getKvStateId()); + assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> key and namespace. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { + new KvStateInternalRequest(new KvStateID(), null); + } + + /** + * Tests response serialization. + */ + @Test + public void testResponseSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 72727278L; + byte[] serializedResult = randomByteArray(1024); + + final KvStateResponse response = new KvStateResponse(serializedResult); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, response); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateResponse responseDeser = serializer.deserializeResponse(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedResult, responseDeser.getContent()); + } + + /** + * Tests response serialization with zero-length serialized result. + */ + @Test + public void testResponseSerializationWithZeroLengthSerializedResult() throws Exception { + byte[] serializedResult = new byte[0]; + + final KvStateResponse response = new KvStateResponse(serializedResult); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeResponse(alloc, 72727278L, response); + + int frameLength = buf.readInt(); + + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + assertEquals(72727278L, MessageSerializer.getRequestId(buf)); + KvStateResponse responseDeser = serializer.deserializeResponse(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedResult, responseDeser.getContent()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> results. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedResult() throws Exception { + new KvStateResponse((byte[]) null); + } + + /** + * Tests request failure serialization. + */ + @Test + public void testKvStateRequestFailureSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1111222L; + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, requestId, cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + RequestFailure requestFailure = MessageSerializer.deserializeRequestFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(requestId, requestFailure.getRequestId()); + assertEquals(cause.getClass(), requestFailure.getCause().getClass()); + assertEquals(cause.getMessage(), requestFailure.getCause().getMessage()); + } + + /** + * Tests server failure serialization. + */ + @Test + public void testServerFailureSerialization() throws Exception { + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + Throwable request = MessageSerializer.deserializeServerFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(cause.getClass(), request.getClass()); + assertEquals(cause.getMessage(), request.getMessage()); + } + + private byte[] randomByteArray(int capacity) { + byte[] bytes = new byte[capacity]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } +}
