http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java new file mode 100644 index 0000000..c37c822 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -0,0 +1,728 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.queryablestate.UnknownKeyOrNamespace; +import org.apache.flink.queryablestate.UnknownKvStateID; +import org.apache.flink.queryablestate.messages.KvStateRequestFailure; +import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.server.ChunkedByteBuf; +import org.apache.flink.queryablestate.server.KvStateServerHandler; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistryListener; +import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.junit.AfterClass; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link KvStateServerHandler}. + */ +public class KvStateServerHandlerTest extends TestLogger { + + /** Shared Thread pool for query execution. */ + private static final ExecutorService TEST_THREAD_POOL = Executors.newSingleThreadExecutor(); + + private static final int READ_TIMEOUT_MILLIS = 10000; + + @AfterClass + public static void tearDown() throws Exception { + if (TEST_THREAD_POOL != null) { + TEST_THREAD_POOL.shutdown(); + } + } + + /** + * Tests a simple successful query via an EmbeddedChannel. + */ + @Test + public void testSimpleQuery() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + // Register state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("vanilla"); + + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + + // Update the KvState and request it + int expectedValue = 712828289; + + int key = 99812822; + backend.setCurrentKey(key); + ValueState<Integer> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + state.update(expectedValue); + + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + key, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + long requestId = Integer.MAX_VALUE + 182828L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + registryListener.kvStateId, + serializedKeyAndNamespace); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf); + + assertEquals(requestId, response.getRequestId()); + + int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); + assertEquals(expectedValue, actualValue); + + assertEquals(stats.toString(), 1, stats.getNumRequests()); + + // Wait for async successful request report + long deadline = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS); + while (stats.getNumSuccessful() != 1 && System.nanoTime() <= deadline) { + Thread.sleep(10); + } + + assertEquals(stats.toString(), 1, stats.getNumSuccessful()); + } + + /** + * Tests the failure response with {@link UnknownKvStateID} as cause on + * queries for unregistered KvStateIDs. + */ + @Test + public void testQueryUnknownKvStateID() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + long requestId = Integer.MAX_VALUE + 182828L; + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + new KvStateID(), + new byte[0]); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + + assertEquals(requestId, response.getRequestId()); + + assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKvStateID); + + assertEquals(1, stats.getNumRequests()); + assertEquals(1, stats.getNumFailed()); + } + + /** + * Tests the failure response with {@link UnknownKeyOrNamespace} as cause + * on queries for non-existing keys. + */ + @Test + public void testQueryUnknownKey() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + + // Register state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("vanilla"); + + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); + + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + 1238283, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + long requestId = Integer.MAX_VALUE + 22982L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + registryListener.kvStateId, + serializedKeyAndNamespace); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + + assertEquals(requestId, response.getRequestId()); + + assertTrue("Did not respond with expected failure cause", response.getCause() instanceof UnknownKeyOrNamespace); + + assertEquals(1, stats.getNumRequests()); + assertEquals(1, stats.getNumFailed()); + } + + /** + * Tests the failure response on a failure on the {@link InternalKvState#getSerializedValue(byte[])} + * call. + */ + @Test + public void testFailureOnGetSerializedValue() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + // Failing KvState + InternalKvState<?> kvState = mock(InternalKvState.class); + when(kvState.getSerializedValue(any(byte[].class))) + .thenThrow(new RuntimeException("Expected test Exception")); + + KvStateID kvStateId = registry.registerKvState( + new JobID(), + new JobVertexID(), + new KeyGroupRange(0, 0), + "vanilla", + kvState); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + 282872, + kvStateId, + new byte[0]); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + + assertTrue(response.getCause().getMessage().contains("Expected test Exception")); + + assertEquals(1, stats.getNumRequests()); + assertEquals(1, stats.getNumFailed()); + } + + /** + * Tests that the channel is closed if an Exception reaches the channel + * handler. + */ + @Test + public void testCloseChannelOnExceptionCaught() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(handler); + + channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception")); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + Throwable response = MessageSerializer.deserializeServerFailure(buf); + + assertTrue(response.getMessage().contains("Expected test Exception")); + + channel.closeFuture().await(READ_TIMEOUT_MILLIS); + assertFalse(channel.isActive()); + } + + /** + * Tests the failure response on a rejected execution, because the query + * executor has been closed. + */ + @Test + public void testQueryExecutorShutDown() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + ExecutorService closedExecutor = Executors.newSingleThreadExecutor(); + closedExecutor.shutdown(); + assertTrue(closedExecutor.isShutdown()); + + KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + KeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + + // Register state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("vanilla"); + + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, desc); + + assertTrue(registryListener.registrationName.equals("vanilla")); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + 282872, + registryListener.kvStateId, + new byte[0]); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + + assertTrue(response.getCause().getMessage().contains("RejectedExecutionException")); + + assertEquals(1, stats.getNumRequests()); + assertEquals(1, stats.getNumFailed()); + } + + /** + * Tests response on unexpected messages. + */ + @Test + public void testUnexpectedMessage() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + // Write the request and wait for the response + ByteBuf unexpectedMessage = Unpooled.buffer(8); + unexpectedMessage.writeInt(4); + unexpectedMessage.writeInt(123238213); + + channel.writeInbound(unexpectedMessage); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + Throwable response = MessageSerializer.deserializeServerFailure(buf); + + assertEquals(0, stats.getNumRequests()); + assertEquals(0, stats.getNumFailed()); + + unexpectedMessage = MessageSerializer.serializeKvStateRequestResult( + channel.alloc(), + 192, + new byte[0]); + + channel.writeInbound(unexpectedMessage); + + buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + response = MessageSerializer.deserializeServerFailure(buf); + + assertTrue("Unexpected failure cause " + response.getClass().getName(), response instanceof IllegalArgumentException); + + assertEquals(0, stats.getNumRequests()); + assertEquals(0, stats.getNumFailed()); + } + + /** + * Tests that incoming buffer instances are recycled. + */ + @Test + public void testIncomingBufferIsRecycled() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + 282872, + new KvStateID(), + new byte[0]); + + assertEquals(1, request.refCnt()); + + // Write regular request + channel.writeInbound(request); + assertEquals("Buffer not recycled", 0, request.refCnt()); + + // Write unexpected msg + ByteBuf unexpected = channel.alloc().buffer(8); + unexpected.writeInt(4); + unexpected.writeInt(4); + + assertEquals(1, unexpected.refCnt()); + + channel.writeInbound(unexpected); + assertEquals("Buffer not recycled", 0, unexpected.refCnt()); + } + + /** + * Tests the failure response if the serializers don't match. + */ + @Test + public void testSerializerMismatch() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + + // Register state + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("vanilla"); + + ValueState<Integer> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + int key = 99812822; + + // Update the KvState + backend.setCurrentKey(key); + state.update(712828289); + + byte[] wrongKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + "wrong-key-type", + StringSerializer.INSTANCE, + "wrong-namespace-type", + StringSerializer.INSTANCE); + + byte[] wrongNamespace = KvStateSerializer.serializeKeyAndNamespace( + key, + IntSerializer.INSTANCE, + "wrong-namespace-type", + StringSerializer.INSTANCE); + + assertTrue(registryListener.registrationName.equals("vanilla")); + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + 182828, + registryListener.kvStateId, + wrongKeyAndNamespace); + + // Write the request and wait for the response + channel.writeInbound(request); + + ByteBuf buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + KvStateRequestFailure response = MessageSerializer.deserializeKvStateRequestFailure(buf); + assertEquals(182828, response.getRequestId()); + assertTrue(response.getCause().getMessage().contains("IOException")); + + // Repeat with wrong namespace only + request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + 182829, + registryListener.kvStateId, + wrongNamespace); + + // Write the request and wait for the response + channel.writeInbound(request); + + buf = (ByteBuf) readInboundBlocking(channel); + buf.skipBytes(4); // skip frame length + + // Verify the response + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + response = MessageSerializer.deserializeKvStateRequestFailure(buf); + assertEquals(182829, response.getRequestId()); + assertTrue(response.getCause().getMessage().contains("IOException")); + + assertEquals(2, stats.getNumRequests()); + assertEquals(2, stats.getNumFailed()); + } + + /** + * Tests that large responses are chunked. + */ + @Test + public void testChunkedResponse() throws Exception { + KvStateRegistry registry = new KvStateRegistry(); + KvStateRequestStats stats = new AtomicKvStateRequestStats(); + + KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats); + EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler); + + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + + final TestRegistryListener registryListener = new TestRegistryListener(); + registry.registerListener(registryListener); + + // Register state + ValueStateDescriptor<byte[]> desc = new ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE); + desc.setQueryable("vanilla"); + + ValueState<byte[]> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + // Update KvState + byte[] bytes = new byte[2 * channel.config().getWriteBufferHighWaterMark()]; + + byte current = 0; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = current++; + } + + int key = 99812822; + backend.setCurrentKey(key); + state.update(bytes); + + // Request + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + key, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + long requestId = Integer.MAX_VALUE + 182828L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + registryListener.kvStateId, + serializedKeyAndNamespace); + + // Write the request and wait for the response + channel.writeInbound(request); + + Object msg = readInboundBlocking(channel); + assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf); + } + + // ------------------------------------------------------------------------ + + /** + * Queries the embedded channel for data. + */ + private Object readInboundBlocking(EmbeddedChannel channel) throws InterruptedException, TimeoutException { + final int sleepMillis = 50; + + int sleptMillis = 0; + + Object msg = null; + while (sleptMillis < READ_TIMEOUT_MILLIS && + (msg = channel.readOutbound()) == null) { + + Thread.sleep(sleepMillis); + sleptMillis += sleepMillis; + } + + if (msg == null) { + throw new TimeoutException(); + } else { + return msg; + } + } + + /** + * Frame length decoder (expected by the serialized messages). + */ + private ChannelHandler getFrameDecoder() { + return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4); + } + + /** + * A listener that keeps the last updated KvState information so that a test + * can retrieve it. + */ + static class TestRegistryListener implements KvStateRegistryListener { + volatile JobVertexID jobVertexID; + volatile KeyGroupRange keyGroupIndex; + volatile String registrationName; + volatile KvStateID kvStateId; + + @Override + public void notifyKvStateRegistered(JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName, + KvStateID kvStateId) { + this.jobVertexID = jobVertexId; + this.keyGroupIndex = keyGroupRange; + this.registrationName = registrationName; + this.kvStateId = kvStateId; + } + + @Override + public void notifyKvStateUnregistered(JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { + + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java new file mode 100644 index 0000000..9332e68 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.queryablestate.messages.KvStateRequestResult; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; + +import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; +import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; +import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.junit.AfterClass; +import org.junit.Test; + +import java.net.InetAddress; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link KvStateServer}. + */ +public class KvStateServerTest { + + // Thread pool for client bootstrap (shared between tests) + private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup(); + + private static final int TIMEOUT_MILLIS = 10000; + + @AfterClass + public static void tearDown() throws Exception { + if (NIO_GROUP != null) { + NIO_GROUP.shutdownGracefully(); + } + } + + /** + * Tests a simple successful query via a SocketChannel. + */ + @Test + public void testSimpleRequest() throws Exception { + KvStateServer server = null; + Bootstrap bootstrap = null; + try { + KvStateRegistry registry = new KvStateRegistry(); + KvStateRequestStats stats = new AtomicKvStateRequestStats(); + + server = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats); + server.start(); + + KvStateServerAddress serverAddress = server.getAddress(); + int numKeyGroups = 1; + AbstractStateBackend abstractBackend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + dummyEnv.setKvStateRegistry(registry); + AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + registry.createTaskRegistry(new JobID(), new JobVertexID())); + + final KvStateServerHandlerTest.TestRegistryListener registryListener = + new KvStateServerHandlerTest.TestRegistryListener(); + + registry.registerListener(registryListener); + + ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + desc.setQueryable("vanilla"); + + ValueState<Integer> state = backend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + desc); + + // Update KvState + int expectedValue = 712828289; + + int key = 99812822; + backend.setCurrentKey(key); + state.update(expectedValue); + + // Request + byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( + key, + IntSerializer.INSTANCE, + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + // Connect to the server + final BlockingQueue<ByteBuf> responses = new LinkedBlockingQueue<>(); + bootstrap = createBootstrap( + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + responses.add((ByteBuf) msg); + } + }); + + Channel channel = bootstrap + .connect(serverAddress.getHost(), serverAddress.getPort()) + .sync().channel(); + + long requestId = Integer.MAX_VALUE + 182828L; + + assertTrue(registryListener.registrationName.equals("vanilla")); + ByteBuf request = MessageSerializer.serializeKvStateRequest( + channel.alloc(), + requestId, + registryListener.kvStateId, + serializedKeyAndNamespace); + + channel.writeAndFlush(request); + + ByteBuf buf = responses.poll(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + KvStateRequestResult response = MessageSerializer.deserializeKvStateRequestResult(buf); + + assertEquals(requestId, response.getRequestId()); + int actualValue = KvStateSerializer.deserializeValue(response.getSerializedResult(), IntSerializer.INSTANCE); + assertEquals(expectedValue, actualValue); + } finally { + if (server != null) { + server.shutDown(); + } + + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(); + } + } + } + } + + /** + * Creates a client bootstrap. + */ + private Bootstrap createBootstrap(final ChannelHandler... handlers) { + return new Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(handlers); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java new file mode 100644 index 0000000..5d4a861 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.queryablestate.UnknownKvStateID; +import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation; +import org.apache.flink.queryablestate.client.KvStateClient; +import org.apache.flink.queryablestate.client.KvStateLocationLookupService; +import org.apache.flink.queryablestate.client.QueryableStateClient; +import org.apache.flink.queryablestate.server.KvStateServerImpl; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.heap.HeapValueState; +import org.apache.flink.runtime.state.heap.NestedMapsStateTable; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.MathUtils; + +import akka.actor.ActorSystem; +import akka.dispatch.Futures; +import org.junit.AfterClass; +import org.junit.Test; + +import java.net.ConnectException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link QueryableStateClient}. + */ +public class QueryableStateClientTest { + + private static final ActorSystem testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + + private static final FiniteDuration timeout = new FiniteDuration(100, TimeUnit.SECONDS); + + @AfterClass + public static void tearDown() throws Exception { + if (testActorSystem != null) { + testActorSystem.shutdown(); + } + } + + /** + * All failures should lead to a retry with a forced location lookup. + * + * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, UnknownKvStateLocation, + * ConnectException are checked explicitly as these indicate out-of-sync + * KvStateLocation. + */ + @Test + public void testForceLookupOnOutdatedLocation() throws Exception { + KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); + KvStateClient networkClient = mock(KvStateClient.class); + + QueryableStateClient client = new QueryableStateClient( + lookupService, + networkClient, + testActorSystem.dispatcher()); + + try { + JobID jobId = new JobID(); + int numKeyGroups = 4; + + // + // UnknownKvStateLocation + // + String query1 = "lucky"; + + Future<KvStateLocation> unknownKvStateLocation = Futures.failed( + new UnknownKvStateLocation(query1)); + + when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1))) + .thenReturn(unknownKvStateLocation); + + Future<Integer> result = client.getKvState( + jobId, + query1, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + try { + Await.result(result, timeout); + fail("Did not throw expected UnknownKvStateLocation exception"); + } catch (UnknownKvStateLocation ignored) { + // Expected + } + + verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query1)); + + // + // UnknownKvStateKeyGroupLocation + // + String query2 = "unlucky"; + + Future<KvStateLocation> unknownKeyGroupLocation = Futures.successful( + new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query2)); + + when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2))) + .thenReturn(unknownKeyGroupLocation); + + result = client.getKvState( + jobId, + query2, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + try { + Await.result(result, timeout); + fail("Did not throw expected UnknownKvStateKeyGroupLocation exception"); + } catch (UnknownKvStateKeyGroupLocation ignored) { + // Expected + } + + verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query2)); + + // + // UnknownKvStateID + // + String query3 = "water"; + KvStateID kvStateId = new KvStateID(); + Future<byte[]> unknownKvStateId = Futures.failed(new UnknownKvStateID(kvStateId)); + + KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323); + KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3); + for (int i = 0; i < numKeyGroups; i++) { + location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); + } + + when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3))) + .thenReturn(Futures.successful(location)); + + when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) + .thenReturn(unknownKvStateId); + + result = client.getKvState( + jobId, + query3, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + try { + Await.result(result, timeout); + fail("Did not throw expected UnknownKvStateID exception"); + } catch (UnknownKvStateID ignored) { + // Expected + } + + verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query3)); + + // + // ConnectException + // + String query4 = "space"; + Future<byte[]> connectException = Futures.failed(new ConnectException()); + kvStateId = new KvStateID(); + + serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123); + location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4); + for (int i = 0; i < numKeyGroups; i++) { + location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); + } + + when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4))) + .thenReturn(Futures.successful(location)); + + when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) + .thenReturn(connectException); + + result = client.getKvState( + jobId, + query4, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + try { + Await.result(result, timeout); + fail("Did not throw expected ConnectException exception"); + } catch (ConnectException ignored) { + // Expected + } + + verify(lookupService, times(2)).getKvStateLookupInfo(eq(jobId), eq(query4)); + + // + // Other Exceptions don't lead to a retry no retry + // + String query5 = "universe"; + Future<KvStateLocation> exception = Futures.failed(new RuntimeException("Test exception")); + when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5))) + .thenReturn(exception); + + client.getKvState( + jobId, + query5, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); + + verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5)); + } finally { + client.shutDown(); + } + } + + /** + * Tests queries against multiple servers. + * + * <p>The servers are populated with different keys and the client queries + * all available keys from all servers. + */ + @Test + public void testIntegrationWithKvStateServer() throws Exception { + // Config + int numServers = 2; + int numKeys = 1024; + int numKeyGroups = 1; + + JobID jobId = new JobID(); + JobVertexID jobVertexId = new JobVertexID(); + + KvStateServer[] servers = new KvStateServer[numServers]; + AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers]; + + QueryableStateClient client = null; + KvStateClient networkClient = null; + AtomicKvStateRequestStats networkClientStats = new AtomicKvStateRequestStats(); + + MemoryStateBackend backend = new MemoryStateBackend(); + DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0); + + AbstractKeyedStateBackend<Integer> keyedStateBackend = backend.createKeyedStateBackend(dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + + try { + KvStateRegistry[] registries = new KvStateRegistry[numServers]; + KvStateID[] kvStateIds = new KvStateID[numServers]; + List<HeapValueState<Integer, VoidNamespace, Integer>> kvStates = new ArrayList<>(); + + // Start the servers + for (int i = 0; i < numServers; i++) { + registries[i] = new KvStateRegistry(); + serverStats[i] = new AtomicKvStateRequestStats(); + servers[i] = new KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]); + servers[i].start(); + ValueStateDescriptor<Integer> descriptor = + new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + + RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + VoidNamespaceSerializer.INSTANCE, + IntSerializer.INSTANCE); + + // Register state + HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>( + descriptor, + new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo), + IntSerializer.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + kvStates.add(kvState); + + kvStateIds[i] = registries[i].registerKvState( + jobId, + new JobVertexID(), + new KeyGroupRange(i, i), + "choco", + kvState); + } + + int[] expectedRequests = new int[numServers]; + + for (int key = 0; key < numKeys; key++) { + int targetKeyGroupIndex = MathUtils.murmurHash(key) % numServers; + expectedRequests[targetKeyGroupIndex]++; + + HeapValueState<Integer, VoidNamespace, Integer> kvState = kvStates.get(targetKeyGroupIndex); + + keyedStateBackend.setCurrentKey(key); + kvState.setCurrentNamespace(VoidNamespace.INSTANCE); + kvState.update(1337 + key); + } + + // Location lookup service + KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco"); + for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) { + location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress()); + } + + KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); + when(lookupService.getKvStateLookupInfo(eq(jobId), eq("choco"))) + .thenReturn(Futures.successful(location)); + + // The client + networkClient = new KvStateClient(1, networkClientStats); + + client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher()); + + // Send all queries + List<Future<Integer>> futures = new ArrayList<>(numKeys); + for (int key = 0; key < numKeys; key++) { + ValueStateDescriptor<Integer> descriptor = + new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + futures.add(client.getKvState( + jobId, + "choco", + key, + BasicTypeInfo.INT_TYPE_INFO, + descriptor)); + } + + // Verify results + Future<Iterable<Integer>> future = Futures.sequence(futures, testActorSystem.dispatcher()); + Iterable<Integer> results = Await.result(future, timeout); + + int index = 0; + for (int buffer : results) { + assertEquals(1337 + index, buffer); + index++; + } + + // Verify requests + for (int i = 0; i < numServers; i++) { + int numRetries = 10; + for (int retry = 0; retry < numRetries; retry++) { + try { + assertEquals("Unexpected number of requests", expectedRequests[i], serverStats[i].getNumRequests()); + assertEquals("Unexpected success requests", expectedRequests[i], serverStats[i].getNumSuccessful()); + assertEquals("Unexpected failed requests", 0, serverStats[i].getNumFailed()); + break; + } catch (Throwable t) { + // Retry + if (retry == numRetries - 1) { + throw t; + } else { + Thread.sleep(100); + } + } + } + } + } finally { + if (client != null) { + client.shutDown(); + } + + if (networkClient != null) { + networkClient.shutDown(); + } + + for (KvStateServer server : servers) { + if (server != null) { + server.shutDown(); + } + } + } + } + + /** + * Tests that the QueryableState client correctly caches location lookups + * keyed by both job and name. This test is mainly due to a previous bug due + * to which cache entries were by name only. This is a problem, because the + * same client can be used to query multiple jobs. + */ + @Test + public void testLookupMultipleJobIds() throws Exception { + String name = "unique-per-job"; + + // Exact contents don't matter here + KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name); + location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892)); + + JobID jobId1 = new JobID(); + JobID jobId2 = new JobID(); + + KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); + + when(lookupService.getKvStateLookupInfo(any(JobID.class), anyString())) + .thenReturn(Futures.successful(location)); + + KvStateClient networkClient = mock(KvStateClient.class); + when(networkClient.getKvState(any(KvStateServerAddress.class), any(KvStateID.class), any(byte[].class))) + .thenReturn(Futures.successful(new byte[0])); + + QueryableStateClient client = new QueryableStateClient( + lookupService, + networkClient, + testActorSystem.dispatcher()); + + ValueStateDescriptor<Integer> stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE); + + // Query ies with same name, but different job IDs should lead to a + // single lookup per query and job ID. + client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); + client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); + + verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name)); + verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..10792cd --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR +log4j.logger.org.apache.zookeeper=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml new file mode 100644 index 0000000..e9e7496 --- /dev/null +++ b/flink-queryable-state/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.4-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-queryable-state</artifactId> + <name>flink-queryable-state</name> + <packaging>pom</packaging> + + <modules> + <module>flink-queryable-state-java</module> + <!-- <module>flink-state-client-scala</module>--> + </modules> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 9193859..53503ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java deleted file mode 100644 index a37a3ac..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java +++ /dev/null @@ -1,322 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.util.Preconditions; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.Recover; -import akka.pattern.Patterns; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; -import java.util.concurrent.Callable; - -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * Akka-based {@link KvStateLocationLookupService} that retrieves the current - * JobManager address and uses it for lookups. - */ -class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class); - - /** Future returned when no JobManager is available. */ - private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager()); - - /** Leader retrieval service to retrieve the current job manager. */ - private final LeaderRetrievalService leaderRetrievalService; - - /** The actor system used to resolve the JobManager address. */ - private final ActorSystem actorSystem; - - /** Timeout for JobManager ask-requests. */ - private final FiniteDuration askTimeout; - - /** Retry strategy factory on future failures. */ - private final LookupRetryStrategyFactory retryStrategyFactory; - - /** Current job manager future. */ - private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER; - - /** - * Creates the Akka-based {@link KvStateLocationLookupService}. - * - * @param leaderRetrievalService Leader retrieval service to use. - * @param actorSystem Actor system to use. - * @param askTimeout Timeout for JobManager ask-requests. - * @param retryStrategyFactory Retry strategy if no JobManager available. - */ - AkkaKvStateLocationLookupService( - LeaderRetrievalService leaderRetrievalService, - ActorSystem actorSystem, - FiniteDuration askTimeout, - LookupRetryStrategyFactory retryStrategyFactory) { - - this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service"); - this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system"); - this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout"); - this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory"); - } - - public void start() { - try { - leaderRetrievalService.start(this); - } catch (Exception e) { - LOG.error("Failed to start leader retrieval service", e); - throw new RuntimeException(e); - } - } - - public void shutDown() { - try { - leaderRetrievalService.stop(); - } catch (Exception e) { - LOG.error("Failed to stop leader retrieval service", e); - throw new RuntimeException(e); - } - } - - @Override - @SuppressWarnings("unchecked") - public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) { - return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy()); - } - - /** - * Returns a future holding the {@link KvStateLocation} for the given job - * and KvState registration name. - * - * <p>If there is currently no JobManager registered with the service, the - * request is retried. The retry behaviour is specified by the - * {@link LookupRetryStrategy} of the lookup service. - * - * @param jobId JobID the KvState instance belongs to - * @param registrationName Name under which the KvState has been registered - * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures. - * @return Future holding the {@link KvStateLocation} - */ - @SuppressWarnings("unchecked") - private Future<KvStateLocation> getKvStateLookupInfo( - final JobID jobId, - final String registrationName, - final LookupRetryStrategy lookupRetryStrategy) { - - return jobManagerFuture - .flatMap(new Mapper<ActorGateway, Future<Object>>() { - @Override - public Future<Object> apply(ActorGateway jobManager) { - // Lookup the KvStateLocation - Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName); - return jobManager.ask(msg, askTimeout); - } - }, actorSystem.dispatcher()) - .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)) - .recoverWith(new Recover<Future<KvStateLocation>>() { - @Override - public Future<KvStateLocation> recover(Throwable failure) throws Throwable { - // If the Future fails with UnknownJobManager, retry - // the request. Otherwise all Futures will be failed - // during the start up phase, when the JobManager did - // not notify this service yet or leadership is lost - // intermittently. - if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) { - return Patterns.after( - lookupRetryStrategy.getRetryDelay(), - actorSystem.scheduler(), - actorSystem.dispatcher(), - new Callable<Future<KvStateLocation>>() { - @Override - public Future<KvStateLocation> call() throws Exception { - return getKvStateLookupInfo( - jobId, - registrationName, - lookupRetryStrategy); - } - }); - } else { - return Futures.failed(failure); - } - } - }, actorSystem.dispatcher()); - } - - @Override - public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID); - } - - if (leaderAddress == null) { - jobManagerFuture = UNKNOWN_JOB_MANAGER; - } else { - jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout) - .map(new Mapper<ActorRef, ActorGateway>() { - @Override - public ActorGateway apply(ActorRef actorRef) { - return new AkkaActorGateway(actorRef, leaderSessionID); - } - }, actorSystem.dispatcher()); - } - } - - @Override - public void handleError(Exception exception) { - jobManagerFuture = Futures.failed(exception); - } - - // ------------------------------------------------------------------------ - - /** - * Retry strategy for failed lookups. - * - * <p>Usage: - * <pre> - * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create(); - * - * if (retryStrategy.tryRetry()) { - * // OK to retry - * FiniteDuration retryDelay = retryStrategy.getRetryDelay(); - * } - * </pre> - */ - interface LookupRetryStrategy { - - /** - * Returns the current retry. - * - * @return Current retry delay. - */ - FiniteDuration getRetryDelay(); - - /** - * Tries another retry and returns whether it is allowed or not. - * - * @return Whether it is allowed to do another restart or not. - */ - boolean tryRetry(); - - } - - /** - * Factory for retry strategies. - */ - interface LookupRetryStrategyFactory { - - /** - * Creates a new retry strategy. - * - * @return The retry strategy. - */ - LookupRetryStrategy createRetryStrategy(); - - } - - /** - * Factory for disabled retries. - */ - static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory { - - private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy(); - - @Override - public LookupRetryStrategy createRetryStrategy() { - return RETRY_STRATEGY; - } - - private static class DisabledLookupRetryStrategy implements LookupRetryStrategy { - - @Override - public FiniteDuration getRetryDelay() { - return FiniteDuration.Zero(); - } - - @Override - public boolean tryRetry() { - return false; - } - } - - } - - /** - * Factory for fixed delay retries. - */ - static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory { - - private final int maxRetries; - private final FiniteDuration retryDelay; - - FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) { - this.maxRetries = maxRetries; - this.retryDelay = retryDelay; - } - - @Override - public LookupRetryStrategy createRetryStrategy() { - return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay); - } - - private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy { - - private final Object retryLock = new Object(); - private final int maxRetries; - private final FiniteDuration retryDelay; - private int numRetries; - - public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) { - Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries"); - this.maxRetries = maxRetries; - this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay"); - } - - @Override - public FiniteDuration getRetryDelay() { - synchronized (retryLock) { - return retryDelay; - } - } - - @Override - public boolean tryRetry() { - synchronized (retryLock) { - if (numRetries < maxRetries) { - numRetries++; - return true; - } else { - return false; - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java index 86d1838..8a213bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java @@ -31,7 +31,7 @@ import java.util.Arrays; * Location information for all key groups of a {@link InternalKvState} instance. * * <p>This is populated by the {@link KvStateLocationRegistry} and used by the - * {@link QueryableStateClient} to target queries. + * Queryable State Client to target queries. */ public class KvStateLocation implements Serializable { @@ -166,7 +166,7 @@ public class KvStateLocation implements Serializable { * @param kvStateAddress Server address of the KvState instance at the key group index. * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups */ - void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) { + public void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) { if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); @@ -183,6 +183,10 @@ public class KvStateLocation implements Serializable { } } + public static long getSerialVersionUID() { + return serialVersionUID; + } + /** * Registers a KvState instance for the given key group index. * @@ -190,7 +194,7 @@ public class KvStateLocation implements Serializable { * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups * @throws IllegalArgumentException If no location information registered for a key group index in the range. */ - void unregisterKvState(KeyGroupRange keyGroupRange) { + public void unregisterKvState(KeyGroupRange keyGroupRange) { if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); } http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java deleted file mode 100644 index dfd9c14..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.api.common.JobID; - -import scala.concurrent.Future; - -/** - * {@link KvStateLocation} lookup service. - */ -public interface KvStateLocationLookupService { - - /** - * Starts the lookup service. - */ - void start(); - - /** - * Shuts down the lookup service. - */ - void shutDown(); - - /** - * Returns a future holding the {@link KvStateLocation} for the given job - * and KvState registration name. - * - * @param jobId JobID the KvState instance belongs to - * @param registrationName Name under which the KvState has been registered - * @return Future holding the {@link KvStateLocation} - */ - Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index 26b700c..90fa5cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.Task; http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java new file mode 100644 index 0000000..9b14c49 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +/** + * An interface for the Queryable State Server running on each Task Manager in the cluster. + * This server is responsible for serving requests coming from the Queryable State Client and + * requesting <b>locally</b> stored state. + */ +public interface KvStateServer { + + /** + * Returns the address of this server. + * + * @return Server address + */ + KvStateServerAddress getAddress(); + + + /** Starts the proxy. */ + void start() throws InterruptedException; + + /** + * Shuts down the server and all related thread pools. + */ + void shutDown(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java index 9ec25bc..2599855 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.query; -import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.util.Preconditions; import java.io.Serializable; @@ -88,4 +87,9 @@ public class KvStateServerAddress implements Serializable { result = 31 * result + port; return result; } + + @Override + public String toString() { + return hostAddress.getHostName() + ':' + port; + } }
