http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java new file mode 100644 index 0000000..acaa067 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.queryablestate.messages.KvStateInternalRequest; +import org.apache.flink.queryablestate.messages.KvStateResponse; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.network.messages.RequestFailure; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link MessageSerializer}. + */ +@RunWith(Parameterized.class) +public class MessageSerializerTest { + + private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT; + + @Parameterized.Parameters + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean async; + + /** + * Tests request serialization. + */ + @Test + public void testRequestSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1337L; + KvStateID kvStateId = new KvStateID(); + byte[] serializedKeyAndNamespace = randomByteArray(1024); + + final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(kvStateId, requestDeser.getKvStateId()); + assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); + } + + /** + * Tests request serialization with zero-length serialized key and namespace. + */ + @Test + public void testRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception { + + long requestId = Integer.MAX_VALUE + 1337L; + KvStateID kvStateId = new KvStateID(); + byte[] serializedKeyAndNamespace = new byte[0]; + + final KvStateInternalRequest request = new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeRequest(alloc, requestId, request); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateInternalRequest requestDeser = serializer.deserializeRequest(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(kvStateId, requestDeser.getKvStateId()); + assertArrayEquals(serializedKeyAndNamespace, requestDeser.getSerializedKeyAndNamespace()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> key and namespace. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception { + new KvStateInternalRequest(new KvStateID(), null); + } + + /** + * Tests response serialization. + */ + @Test + public void testResponseSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 72727278L; + byte[] serializedResult = randomByteArray(1024); + + final KvStateResponse response = new KvStateResponse(serializedResult); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeResponse(alloc, requestId, response); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + assertEquals(requestId, MessageSerializer.getRequestId(buf)); + KvStateResponse responseDeser = serializer.deserializeResponse(buf); + + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedResult, responseDeser.getContent()); + } + + /** + * Tests response serialization with zero-length serialized result. + */ + @Test + public void testResponseSerializationWithZeroLengthSerializedResult() throws Exception { + byte[] serializedResult = new byte[0]; + + final KvStateResponse response = new KvStateResponse(serializedResult); + final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = + new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()); + + ByteBuf buf = MessageSerializer.serializeResponse(alloc, 72727278L, response); + + int frameLength = buf.readInt(); + + assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf)); + assertEquals(72727278L, MessageSerializer.getRequestId(buf)); + KvStateResponse responseDeser = serializer.deserializeResponse(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertArrayEquals(serializedResult, responseDeser.getContent()); + } + + /** + * Tests that we don't try to be smart about <code>null</code> results. + * They should be treated explicitly. + */ + @Test(expected = NullPointerException.class) + public void testNullPointerExceptionOnNullSerializedResult() throws Exception { + new KvStateResponse((byte[]) null); + } + + /** + * Tests request failure serialization. + */ + @Test + public void testKvStateRequestFailureSerialization() throws Exception { + long requestId = Integer.MAX_VALUE + 1111222L; + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, requestId, cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf)); + RequestFailure requestFailure = MessageSerializer.deserializeRequestFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(requestId, requestFailure.getRequestId()); + assertEquals(cause.getClass(), requestFailure.getCause().getClass()); + assertEquals(cause.getMessage(), requestFailure.getCause().getMessage()); + } + + /** + * Tests server failure serialization. + */ + @Test + public void testServerFailureSerialization() throws Exception { + IllegalStateException cause = new IllegalStateException("Expected test"); + + ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause); + + int frameLength = buf.readInt(); + assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf)); + Throwable request = MessageSerializer.deserializeServerFailure(buf); + assertEquals(buf.readerIndex(), frameLength + 4); + + assertEquals(cause.getClass(), request.getClass()); + assertEquals(cause.getMessage(), request.getMessage()); + } + + private byte[] randomByteArray(int capacity) { + byte[] bytes = new byte[capacity]; + ThreadLocalRandom.current().nextBytes(bytes); + return bytes; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..10792cd --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR +log4j.logger.org.apache.zookeeper=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml index e2579f6..9300fb3 100644 --- a/flink-queryable-state/pom.xml +++ b/flink-queryable-state/pom.xml @@ -35,8 +35,9 @@ under the License. <packaging>pom</packaging> <modules> - <module>flink-queryable-state-java</module> - <!-- <module>flink-state-client-scala</module>--> + <module>flink-queryable-state-runtime</module> + <module>flink-queryable-state-client-java</module> + <!-- <module>flink-state-client-scala</module>--> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 83ac781..134c414 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -50,6 +50,12 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <!-- The Hadoop FS support has only an optional dependency on Hadoop and gracefully handles absence of Hadoop classes --> http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index f60f561..4535290 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.StoppingException; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; @@ -74,10 +75,8 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateLocationRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -104,6 +103,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -586,7 +586,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress) + final InetSocketAddress kvStateServerAddress) { if (log.isDebugEnabled()) { log.debug("Key value state registered for job {} under name {}.", http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index d59feed..2c7e438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -36,9 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.FencedRpcGateway; @@ -49,6 +48,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -165,7 +165,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp final KeyGroupRange keyGroupRange, final String registrationName, final KvStateID kvStateId, - final KvStateServerAddress kvStateServerAddress); + final InetSocketAddress kvStateServerAddress); /** * Notifies that queryable state has been unregistered. http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java deleted file mode 100644 index c122508..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.AbstractID; - -/** - * Identifier for {@link InternalKvState} instances. - * - * <p>Assigned when registering state at the {@link KvStateRegistry}. - */ -public class KvStateID extends AbstractID { - - private static final long serialVersionUID = 1L; - - public KvStateID() { - super(); - } - - public KvStateID(long lowerPart, long upperPart) { - super(lowerPart, upperPart); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java index 03e8238..e4fdda5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.net.InetSocketAddress; import java.util.Arrays; /** @@ -56,7 +58,7 @@ public class KvStateLocation implements Serializable { * Server address for each KvState instance where array index corresponds to * key group index. */ - private final KvStateServerAddress[] kvStateAddresses; + private final InetSocketAddress[] kvStateAddresses; /** Current number of registered key groups. */ private int numRegisteredKeyGroups; @@ -76,7 +78,7 @@ public class KvStateLocation implements Serializable { this.numKeyGroups = numKeyGroups; this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name"); this.kvStateIds = new KvStateID[numKeyGroups]; - this.kvStateAddresses = new KvStateServerAddress[numKeyGroups]; + this.kvStateAddresses = new InetSocketAddress[numKeyGroups]; } /** @@ -142,15 +144,15 @@ public class KvStateLocation implements Serializable { } /** - * Returns the registered KvStateServerAddress for the key group index or + * Returns the registered server address for the key group index or * <code>null</code> if none is registered yet. * * @param keyGroupIndex Key group index to get server address for. - * @return KvStateServerAddress for the key group index or <code>null</code> + * @return the server address for the key group index or <code>null</code> * if none is registered yet * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups */ - public KvStateServerAddress getKvStateServerAddress(int keyGroupIndex) { + public InetSocketAddress getKvStateServerAddress(int keyGroupIndex) { if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); } @@ -166,7 +168,7 @@ public class KvStateLocation implements Serializable { * @param kvStateAddress Server address of the KvState instance at the key group index. * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups */ - public void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) { + public void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, InetSocketAddress kvStateAddress) { if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java index cb61905..05ee017 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -89,7 +91,7 @@ public class KvStateLocationRegistry { KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, - KvStateServerAddress kvStateServerAddress) { + InetSocketAddress kvStateServerAddress) { KvStateLocation location = lookupTable.get(registrationName); http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java index 04684ee..e94d2f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.net.InetSocketAddress; /** * Actor messages for {@link InternalKvState} lookup and registration. @@ -114,7 +116,7 @@ public interface KvStateMessage extends Serializable { private final KvStateID kvStateId; /** Server address where to find the KvState instance. */ - private final KvStateServerAddress kvStateServerAddress; + private final InetSocketAddress kvStateServerAddress; /** * Notifies the JobManager about a registered {@link InternalKvState} instance. @@ -132,7 +134,7 @@ public interface KvStateMessage extends Serializable { KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, - KvStateServerAddress kvStateServerAddress) { + InetSocketAddress kvStateServerAddress) { this.jobId = Preconditions.checkNotNull(jobId, "JobID"); this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID"); @@ -140,7 +142,7 @@ public interface KvStateMessage extends Serializable { this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name"); this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID"); - this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress"); + this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress"); } /** @@ -193,7 +195,7 @@ public interface KvStateMessage extends Serializable { * * @return Server address where to find the KvState instance */ - public KvStateServerAddress getKvStateServerAddress() { + public InetSocketAddress getKvStateServerAddress() { return kvStateServerAddress; } http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index 90fa5cc..af19d81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java index 13862c9..4b9834a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java @@ -19,10 +19,13 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.state.KeyGroupRange; +import java.net.InetSocketAddress; + /** * A gateway to listen for {@code KvState} registrations. */ @@ -42,7 +45,7 @@ public interface KvStateRegistryGateway extends RpcGateway { KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, - KvStateServerAddress kvStateServerAddress); + InetSocketAddress kvStateServerAddress); /** * Notifies the listener about an unregistered KvState instance. http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java index 29bee9a..dc90c96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java index 17ffe0d..ae58714 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.query; +import java.net.InetSocketAddress; + /** * An interface for the Queryable State Server running on each Task Manager in the cluster. * This server is responsible for serving requests coming from the {@link KvStateClientProxy @@ -26,10 +28,10 @@ package org.apache.flink.runtime.query; public interface KvStateServer { /** - * Returns the {@link KvStateServerAddress address} the server is listening to. + * Returns the {@link InetSocketAddress address} the server is listening to. * @return Server address. */ - KvStateServerAddress getServerAddress(); + InetSocketAddress getServerAddress(); /** Starts the server. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java deleted file mode 100644 index 2599855..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; -import java.net.InetAddress; - -/** - * The (host, port)-address of a {@link KvStateServer}. - */ -public class KvStateServerAddress implements Serializable { - - private static final long serialVersionUID = 1L; - - /** KvStateServer host address. */ - private final InetAddress hostAddress; - - /** KvStateServer port. */ - private final int port; - - /** - * Creates a KvStateServerAddress for the given KvStateServer host address - * and port. - * - * @param hostAddress KvStateServer host address - * @param port KvStateServer port - */ - public KvStateServerAddress(InetAddress hostAddress, int port) { - this.hostAddress = Preconditions.checkNotNull(hostAddress, "Host address"); - Preconditions.checkArgument(port > 0 && port <= 65535, "Port " + port + " is out of range 1-65535"); - this.port = port; - } - - /** - * Returns the host address of the KvStateServer. - * - * @return KvStateServer host address - */ - public InetAddress getHost() { - return hostAddress; - } - - /** - * Returns the port of the KvStateServer. - * - * @return KvStateServer port - */ - public int getPort() { - return port; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - KvStateServerAddress that = (KvStateServerAddress) o; - - return port == that.port && hostAddress.equals(that.hostAddress); - } - - @Override - public int hashCode() { - int result = hostAddress.hashCode(); - result = 31 * result + port; - return result; - } - - @Override - public String toString() { - return hostAddress.getHostName() + ':' + port; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java index fa021df..adbe15d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.query; -import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.queryablestate.network.stats.KvStateRequestStats; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -74,7 +74,8 @@ public final class QueryableStateUtils { return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, stats); } catch (ClassNotFoundException e) { LOG.warn("Could not load Queryable State Client Proxy. " + - "Probable reason: flink-queryable-state is not in the classpath"); + "Probable reason: flink-queryable-state-runtime is not in the classpath. " + + "Please put the corresponding jar from the opt to the lib folder."); LOG.debug("Caught exception", e); return null; } catch (InvocationTargetException e) { @@ -128,7 +129,8 @@ public final class QueryableStateUtils { return constructor.newInstance(address, ports, eventLoopThreads, queryThreads, kvStateRegistry, stats); } catch (ClassNotFoundException e) { LOG.warn("Could not load Queryable State Server. " + - "Probable reason: flink-queryable-state is not in the classpath"); + "Probable reason: flink-queryable-state-runtime is not in the classpath. " + + "Please put the corresponding jar from the opt to the lib folder."); LOG.debug("Caught exception", e); return null; } catch (InvocationTargetException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java index 8d0eede..f799b5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java deleted file mode 100644 index 1d80bab..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import java.util.concurrent.atomic.AtomicLong; - -/** - * Atomic {@link KvStateRequestStats} implementation. - */ -public class AtomicKvStateRequestStats implements KvStateRequestStats { - - /** - * Number of active connections. - */ - private final AtomicLong numConnections = new AtomicLong(); - - /** - * Total number of reported requests. - */ - private final AtomicLong numRequests = new AtomicLong(); - - /** - * Total number of successful requests (<= reported requests). - */ - private final AtomicLong numSuccessful = new AtomicLong(); - - /** - * Total duration of all successful requests. - */ - private final AtomicLong successfulDuration = new AtomicLong(); - - /** - * Total number of failed requests (<= reported requests). - */ - private final AtomicLong numFailed = new AtomicLong(); - - @Override - public void reportActiveConnection() { - numConnections.incrementAndGet(); - } - - @Override - public void reportInactiveConnection() { - numConnections.decrementAndGet(); - } - - @Override - public void reportRequest() { - numRequests.incrementAndGet(); - } - - @Override - public void reportSuccessfulRequest(long durationTotalMillis) { - numSuccessful.incrementAndGet(); - successfulDuration.addAndGet(durationTotalMillis); - } - - @Override - public void reportFailedRequest() { - numFailed.incrementAndGet(); - } - - public long getNumConnections() { - return numConnections.get(); - } - - public long getNumRequests() { - return numRequests.get(); - } - - public long getNumSuccessful() { - return numSuccessful.get(); - } - - public long getNumFailed() { - return numFailed.get(); - } - - @Override - public String toString() { - return "AtomicKvStateRequestStats{" + - "numConnections=" + numConnections + - ", numRequests=" + numRequests + - ", numSuccessful=" + numSuccessful + - ", numFailed=" + numFailed + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java deleted file mode 100644 index de8824d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -/** - * Disabled {@link KvStateRequestStats} implementation. - */ -public class DisabledKvStateRequestStats implements KvStateRequestStats { - - @Override - public void reportActiveConnection() { - } - - @Override - public void reportInactiveConnection() { - } - - @Override - public void reportRequest() { - } - - @Override - public void reportSuccessfulRequest(long durationTotalMillis) { - } - - @Override - public void reportFailedRequest() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java deleted file mode 100644 index 19caf92..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -/** - * Simple statistics for - * {@link org.apache.flink.runtime.query.KvStateServer} and - * {@link org.apache.flink.runtime.query.KvStateClientProxy} monitoring. - */ -public interface KvStateRequestStats { - - /** - * Reports an active connection. - */ - void reportActiveConnection(); - - /** - * Reports an inactive connection. - */ - void reportInactiveConnection(); - - /** - * Reports an incoming request. - */ - void reportRequest(); - - /** - * Reports a successfully handled request. - * - * @param durationTotalMillis Duration of the request (in milliseconds). - */ - void reportSuccessfulRequest(long durationTotalMillis); - - /** - * Reports a failure during a request. - */ - void reportFailedRequest(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java deleted file mode 100644 index 44ee571..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Serialization and deserialization the different state types and namespaces. - */ -public final class KvStateSerializer { - - // ------------------------------------------------------------------------ - // Generic serialization utils - // ------------------------------------------------------------------------ - - /** - * Serializes the key and namespace into a {@link ByteBuffer}. - * - * <p>The serialized format matches the RocksDB state backend key format, i.e. - * the key and namespace don't have to be deserialized for RocksDB lookups. - * - * @param key Key to serialize - * @param keySerializer Serializer for the key - * @param namespace Namespace to serialize - * @param namespaceSerializer Serializer for the namespace - * @param <K> Key type - * @param <N> Namespace type - * @return Buffer holding the serialized key and namespace - * @throws IOException Serialization errors are forwarded - */ - public static <K, N> byte[] serializeKeyAndNamespace( - K key, - TypeSerializer<K> keySerializer, - N namespace, - TypeSerializer<N> namespaceSerializer) throws IOException { - - DataOutputSerializer dos = new DataOutputSerializer(32); - - keySerializer.serialize(key, dos); - dos.writeByte(42); - namespaceSerializer.serialize(namespace, dos); - - return dos.getCopyOfBuffer(); - } - - /** - * Deserializes the key and namespace into a {@link Tuple2}. - * - * @param serializedKeyAndNamespace Serialized key and namespace - * @param keySerializer Serializer for the key - * @param namespaceSerializer Serializer for the namespace - * @param <K> Key type - * @param <N> Namespace - * @return Tuple2 holding deserialized key and namespace - * @throws IOException if the deserialization fails for any reason - */ - public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace( - byte[] serializedKeyAndNamespace, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer) throws IOException { - - DataInputDeserializer dis = new DataInputDeserializer( - serializedKeyAndNamespace, - 0, - serializedKeyAndNamespace.length); - - try { - K key = keySerializer.deserialize(dis); - byte magicNumber = dis.readByte(); - if (magicNumber != 42) { - throw new IOException("Unexpected magic number " + magicNumber + "."); - } - N namespace = namespaceSerializer.deserialize(dis); - - if (dis.available() > 0) { - throw new IOException("Unconsumed bytes in the serialized key and namespace."); - } - - return new Tuple2<>(key, namespace); - } catch (IOException e) { - throw new IOException("Unable to deserialize key " + - "and namespace. This indicates a mismatch in the key/namespace " + - "serializers used by the KvState instance and this access.", e); - } - } - - /** - * Serializes the value with the given serializer. - * - * @param value Value of type T to serialize - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Serialized value or <code>null</code> if value <code>null</code> - * @throws IOException On failure during serialization - */ - public static <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException { - if (value != null) { - // Serialize - DataOutputSerializer dos = new DataOutputSerializer(32); - serializer.serialize(value, dos); - return dos.getCopyOfBuffer(); - } else { - return null; - } - } - - /** - * Deserializes the value with the given serializer. - * - * @param serializedValue Serialized value of type T - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Deserialized value or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { - if (serializedValue == null) { - return null; - } else { - final DataInputDeserializer deser = new DataInputDeserializer( - serializedValue, 0, serializedValue.length); - final T value = serializer.deserialize(deser); - if (deser.available() > 0) { - throw new IOException( - "Unconsumed bytes in the deserialized value. " + - "This indicates a mismatch in the value serializers " + - "used by the KvState instance and this access."); - } - return value; - } - } - - /** - * Deserializes all values with the given serializer. - * - * @param serializedValue Serialized value of type List<T> - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Deserialized list or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { - if (serializedValue != null) { - final DataInputDeserializer in = new DataInputDeserializer( - serializedValue, 0, serializedValue.length); - - try { - final List<T> result = new ArrayList<>(); - while (in.available() > 0) { - result.add(serializer.deserialize(in)); - - // The expected binary format has a single byte separator. We - // want a consistent binary format in order to not need any - // special casing during deserialization. A "cleaner" format - // would skip this extra byte, but would require a memory copy - // for RocksDB, which stores the data serialized in this way - // for lists. - if (in.available() > 0) { - in.readByte(); - } - } - - return result; - } catch (IOException e) { - throw new IOException( - "Unable to deserialize value. " + - "This indicates a mismatch in the value serializers " + - "used by the KvState instance and this access.", e); - } - } else { - return null; - } - } - - /** - * Serializes all values of the Iterable with the given serializer. - * - * @param entries Key-value pairs to serialize - * @param keySerializer Serializer for UK - * @param valueSerializer Serializer for UV - * @param <UK> Type of the keys - * @param <UV> Type of the values - * @return Serialized values or <code>null</code> if values <code>null</code> or empty - * @throws IOException On failure during serialization - */ - public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { - if (entries != null) { - // Serialize - DataOutputSerializer dos = new DataOutputSerializer(32); - - for (Map.Entry<UK, UV> entry : entries) { - keySerializer.serialize(entry.getKey(), dos); - - if (entry.getValue() == null) { - dos.writeBoolean(true); - } else { - dos.writeBoolean(false); - valueSerializer.serialize(entry.getValue(), dos); - } - } - - return dos.getCopyOfBuffer(); - } else { - return null; - } - } - - /** - * Deserializes all kv pairs with the given serializer. - * - * @param serializedValue Serialized value of type Map<UK, UV> - * @param keySerializer Serializer for UK - * @param valueSerializer Serializer for UV - * @param <UK> Type of the key - * @param <UV> Type of the value. - * @return Deserialized map or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { - if (serializedValue != null) { - DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - - Map<UK, UV> result = new HashMap<>(); - while (in.available() > 0) { - UK key = keySerializer.deserialize(in); - - boolean isNull = in.readBoolean(); - UV value = isNull ? null : valueSerializer.deserialize(in); - - result.put(key, value); - } - - return result; - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 97b6bcd..66360e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; @@ -41,7 +41,7 @@ import org.apache.flink.util.Preconditions; public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> implements InternalKvState<N> { - /** Map containing the actual key/value pairs */ + /** Map containing the actual key/value pairs. */ protected final StateTable<K, N, SV> stateTable; /** This holds the name of the state and can create an initial default value for the state. */ @@ -118,4 +118,4 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St public StateTable<K, N, SV> getStateTable() { return stateTable; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index f981b9f..206f10a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateSerializer; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 1cc94d2a..2baf644 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -42,7 +43,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.query.QueryableStateUtils; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java index 3692a71..6312d08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java @@ -19,22 +19,23 @@ package org.apache.flink.runtime.taskexecutor.rpc; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateRegistryGateway; import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.util.Preconditions; +import java.net.InetSocketAddress; + public class RpcKvStateRegistryListener implements KvStateRegistryListener { private final KvStateRegistryGateway kvStateRegistryGateway; - private final KvStateServerAddress kvStateServerAddress; + private final InetSocketAddress kvStateServerAddress; public RpcKvStateRegistryListener( KvStateRegistryGateway kvStateRegistryGateway, - KvStateServerAddress kvStateServerAddress) { + InetSocketAddress kvStateServerAddress) { this.kvStateRegistryGateway = Preconditions.checkNotNull(kvStateRegistryGateway); this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress); } http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java index 4404867..63bda99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java @@ -19,15 +19,16 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.util.Preconditions; +import java.net.InetSocketAddress; + /** * This implementation uses {@link ActorGateway} to forward key-value state notifications to the job * manager. The notifications are wrapped in an actor message and send to the given actor gateway. @@ -36,14 +37,14 @@ public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListe private ActorGateway jobManager; - private KvStateServerAddress kvStateServerAddress; + private InetSocketAddress kvStateServerAddress; public ActorGatewayKvStateRegistryListener( ActorGateway jobManager, - KvStateServerAddress kvStateServerAddress) { + InetSocketAddress kvStateServerAddress) { this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager"); - this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress"); + this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress"); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 68da362..889191f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; @@ -74,12 +75,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint; import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess; import org.apache.flink.runtime.messages.RegistrationMessages; -import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered; import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -119,6 +118,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -717,7 +717,7 @@ public class JobManagerTest extends TestLogger { new KeyGroupRange(0, 0), "any-name", new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 1233)); + new InetSocketAddress(InetAddress.getLocalHost(), 1233)); jobManager.tell(registerNonExistingJob); @@ -742,7 +742,7 @@ public class JobManagerTest extends TestLogger { new KeyGroupRange(0, 0), "register-me", new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); + new InetSocketAddress(InetAddress.getLocalHost(), 1293)); jobManager.tell(registerForExistingJob); @@ -797,7 +797,7 @@ public class JobManagerTest extends TestLogger { new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); + new InetSocketAddress(InetAddress.getLocalHost(), 1293)); NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered( jobGraph.getJobID(), @@ -805,7 +805,7 @@ public class JobManagerTest extends TestLogger { new KeyGroupRange(0, 0), "duplicate-me", // ...same name new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); + new InetSocketAddress(InetAddress.getLocalHost(), 1293)); Future<TestingJobManagerMessages.JobStatusIs> failedFuture = jobManager .ask(new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft()) http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java index 7bf9ee7..74e16a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.junit.Test; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -63,7 +65,7 @@ public class KvStateLocationRegistryTest { } } - KvStateServerAddress server = new KvStateServerAddress(InetAddress.getLocalHost(), 12032); + InetSocketAddress server = new InetSocketAddress(InetAddress.getLocalHost(), 12032); // Create registry Map<JobVertexID, ExecutionJobVertex> vertexMap = createVertexMap(vertices); @@ -129,7 +131,7 @@ public class KvStateLocationRegistryTest { new KeyGroupRange(0, 0), registrationName, new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 12328)); + new InetSocketAddress(InetAddress.getLocalHost(), 12328)); try { // Second operator registers same name @@ -138,7 +140,7 @@ public class KvStateLocationRegistryTest { new KeyGroupRange(0, 0), registrationName, new KvStateID(), - new KvStateServerAddress(InetAddress.getLocalHost(), 12032)); + new InetSocketAddress(InetAddress.getLocalHost(), 12032)); fail("Did not throw expected Exception after duplicated name"); } catch (IllegalStateException ignored) { @@ -187,7 +189,7 @@ public class KvStateLocationRegistryTest { new KeyGroupRange(0, 0), name, new KvStateID(), - mock(KvStateServerAddress.class)); + mock(InetSocketAddress.class)); try { // Unregister not registered keyGroupIndex http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java index 116deea..3c79948 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.query; import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.KeyGroupRange; import org.junit.Test; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -65,7 +67,7 @@ public class KvStateLocationTest { KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName); KvStateID[] kvStateIds = new KvStateID[numRanges]; - KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges]; + InetSocketAddress[] serverAddresses = new InetSocketAddress[numRanges]; InetAddress host = InetAddress.getLocalHost(); @@ -73,7 +75,7 @@ public class KvStateLocationTest { int registeredCount = 0; for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { kvStateIds[rangeIdx] = new KvStateID(); - serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx); + serverAddresses[rangeIdx] = new InetSocketAddress(host, 1024 + rangeIdx); KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]); registeredCount += keyGroupRange.getNumberOfKeyGroups(); @@ -92,7 +94,7 @@ public class KvStateLocationTest { // Overwrite for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { kvStateIds[rangeIdx] = new KvStateID(); - serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx); + serverAddresses[rangeIdx] = new InetSocketAddress(host, 1024 + rangeIdx); location.registerKvState(keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]); assertEquals(registeredCount, location.getNumRegisteredKeyGroups()); http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java deleted file mode 100644 index aa4e6d8..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.runtime.state.internal.InternalMapState; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; - -/** - * Tests for {@link KvStateSerializer}. - */ -@RunWith(Parameterized.class) -public class KvStateRequestSerializerTest { - - @Parameterized.Parameters - public static Collection<Boolean> parameters() { - return Arrays.asList(false, true); - } - - @Parameterized.Parameter - public boolean async; - - /** - * Tests key and namespace serialization utils. - */ - @Test - public void testKeyAndNamespaceSerialization() throws Exception { - TypeSerializer<Long> keySerializer = LongSerializer.INSTANCE; - TypeSerializer<String> namespaceSerializer = StringSerializer.INSTANCE; - - long expectedKey = Integer.MAX_VALUE + 12323L; - String expectedNamespace = "knilf"; - - byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace( - expectedKey, keySerializer, expectedNamespace, namespaceSerializer); - - Tuple2<Long, String> actual = KvStateSerializer.deserializeKeyAndNamespace( - serializedKeyAndNamespace, keySerializer, namespaceSerializer); - - assertEquals(expectedKey, actual.f0.longValue()); - assertEquals(expectedNamespace, actual.f1); - } - - /** - * Tests key and namespace deserialization utils with too few bytes. - */ - @Test(expected = IOException.class) - public void testKeyAndNamespaceDeserializationEmpty() throws Exception { - KvStateSerializer.deserializeKeyAndNamespace( - new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); - } - - /** - * Tests key and namespace deserialization utils with too few bytes. - */ - @Test(expected = IOException.class) - public void testKeyAndNamespaceDeserializationTooShort() throws Exception { - KvStateSerializer.deserializeKeyAndNamespace( - new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); - } - - /** - * Tests key and namespace deserialization utils with too many bytes. - */ - @Test(expected = IOException.class) - public void testKeyAndNamespaceDeserializationTooMany1() throws Exception { - // Long + null String + 1 byte - KvStateSerializer.deserializeKeyAndNamespace( - new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, LongSerializer.INSTANCE, - StringSerializer.INSTANCE); - } - - /** - * Tests key and namespace deserialization utils with too many bytes. - */ - @Test(expected = IOException.class) - public void testKeyAndNamespaceDeserializationTooMany2() throws Exception { - // Long + null String + 2 bytes - KvStateSerializer.deserializeKeyAndNamespace( - new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, LongSerializer.INSTANCE, - StringSerializer.INSTANCE); - } - - /** - * Tests value serialization utils. - */ - @Test - public void testValueSerialization() throws Exception { - TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; - long expectedValue = Long.MAX_VALUE - 1292929292L; - - byte[] serializedValue = KvStateSerializer.serializeValue(expectedValue, valueSerializer); - long actualValue = KvStateSerializer.deserializeValue(serializedValue, valueSerializer); - - assertEquals(expectedValue, actualValue); - } - - /** - * Tests value deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeValueEmpty() throws Exception { - KvStateSerializer.deserializeValue(new byte[] {}, LongSerializer.INSTANCE); - } - - /** - * Tests value deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeValueTooShort() throws Exception { - // 1 byte (incomplete Long) - KvStateSerializer.deserializeValue(new byte[] {1}, LongSerializer.INSTANCE); - } - - /** - * Tests value deserialization with too many bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeValueTooMany1() throws Exception { - // Long + 1 byte - KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2}, - LongSerializer.INSTANCE); - } - - /** - * Tests value deserialization with too many bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeValueTooMany2() throws Exception { - // Long + 2 bytes - KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, - LongSerializer.INSTANCE); - } - - /** - * Tests list serialization utils. - */ - @Test - public void testListSerialization() throws Exception { - final long key = 0L; - - // objects for heap state list serialisation - final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = - new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - ClassLoader.getSystemClassLoader(), - 1, - new KeyGroupRange(0, 0), - async, - new ExecutionConfig() - ); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState( - VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); - - testListSerialization(key, listState); - } - - /** - * Verifies that the serialization of a list using the given list state - * matches the deserialization with {@link KvStateSerializer#deserializeList}. - * - * @param key - * key of the list state - * @param listState - * list state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance - * - * @throws Exception - */ - public static void testListSerialization( - final long key, - final InternalListState<VoidNamespace, Long> listState) throws Exception { - - TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; - listState.setCurrentNamespace(VoidNamespace.INSTANCE); - - // List - final int numElements = 10; - - final List<Long> expectedValues = new ArrayList<>(); - for (int i = 0; i < numElements; i++) { - final long value = ThreadLocalRandom.current().nextLong(); - expectedValues.add(value); - listState.add(value); - } - - final byte[] serializedKey = - KvStateSerializer.serializeKeyAndNamespace( - key, LongSerializer.INSTANCE, - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - - final byte[] serializedValues = listState.getSerializedValue(serializedKey); - - List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer); - assertEquals(expectedValues, actualValues); - - // Single value - long expectedValue = ThreadLocalRandom.current().nextLong(); - byte[] serializedValue = KvStateSerializer.serializeValue(expectedValue, valueSerializer); - List<Long> actualValue = KvStateSerializer.deserializeList(serializedValue, valueSerializer); - assertEquals(1, actualValue.size()); - assertEquals(expectedValue, actualValue.get(0).longValue()); - } - - /** - * Tests list deserialization with too few bytes. - */ - @Test - public void testDeserializeListEmpty() throws Exception { - List<Long> actualValue = KvStateSerializer - .deserializeList(new byte[] {}, LongSerializer.INSTANCE); - assertEquals(0, actualValue.size()); - } - - /** - * Tests list deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeListTooShort1() throws Exception { - // 1 byte (incomplete Long) - KvStateSerializer.deserializeList(new byte[] {1}, LongSerializer.INSTANCE); - } - - /** - * Tests list deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeListTooShort2() throws Exception { - // Long + 1 byte (separator) + 1 byte (incomplete Long) - KvStateSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, - LongSerializer.INSTANCE); - } - - /** - * Tests map serialization utils. - */ - @Test - public void testMapSerialization() throws Exception { - final long key = 0L; - - // objects for heap state list serialisation - final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = - new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - LongSerializer.INSTANCE, - ClassLoader.getSystemClassLoader(), - 1, - new KeyGroupRange(0, 0), - async, - new ExecutionConfig() - ); - longHeapKeyedStateBackend.setCurrentKey(key); - - final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>) longHeapKeyedStateBackend.getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE)); - - testMapSerialization(key, mapState); - } - - /** - * Verifies that the serialization of a map using the given map state - * matches the deserialization with {@link KvStateSerializer#deserializeList}. - * - * @param key - * key of the map state - * @param mapState - * map state using the {@link VoidNamespace}, must also be a {@link InternalKvState} instance - * - * @throws Exception - */ - public static void testMapSerialization( - final long key, - final InternalMapState<VoidNamespace, Long, String> mapState) throws Exception { - - TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE; - TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE; - mapState.setCurrentNamespace(VoidNamespace.INSTANCE); - - // Map - final int numElements = 10; - - final Map<Long, String> expectedValues = new HashMap<>(); - for (int i = 1; i <= numElements; i++) { - final long value = ThreadLocalRandom.current().nextLong(); - expectedValues.put(value, Long.toString(value)); - mapState.put(value, Long.toString(value)); - } - - expectedValues.put(0L, null); - mapState.put(0L, null); - - final byte[] serializedKey = - KvStateSerializer.serializeKeyAndNamespace( - key, LongSerializer.INSTANCE, - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - - final byte[] serializedValues = mapState.getSerializedValue(serializedKey); - - Map<Long, String> actualValues = KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, userValueSerializer); - assertEquals(expectedValues.size(), actualValues.size()); - for (Map.Entry<Long, String> actualEntry : actualValues.entrySet()) { - assertEquals(expectedValues.get(actualEntry.getKey()), actualEntry.getValue()); - } - - // Single value - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - long expectedKey = ThreadLocalRandom.current().nextLong(); - String expectedValue = Long.toString(expectedKey); - byte[] isNull = {0}; - - baos.write(KvStateSerializer.serializeValue(expectedKey, userKeySerializer)); - baos.write(isNull); - baos.write(KvStateSerializer.serializeValue(expectedValue, userValueSerializer)); - byte[] serializedValue = baos.toByteArray(); - - Map<Long, String> actualValue = KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, userValueSerializer); - assertEquals(1, actualValue.size()); - assertEquals(expectedValue, actualValue.get(expectedKey)); - } - - /** - * Tests map deserialization with too few bytes. - */ - @Test - public void testDeserializeMapEmpty() throws Exception { - Map<Long, String> actualValue = KvStateSerializer - .deserializeMap(new byte[] {}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); - assertEquals(0, actualValue.size()); - } - - /** - * Tests map deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeMapTooShort1() throws Exception { - // 1 byte (incomplete Key) - KvStateSerializer.deserializeMap(new byte[] {1}, LongSerializer.INSTANCE, StringSerializer.INSTANCE); - } - - /** - * Tests map deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeMapTooShort2() throws Exception { - // Long (Key) + 1 byte (incomplete Value) - KvStateSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0}, - LongSerializer.INSTANCE, LongSerializer.INSTANCE); - } - - /** - * Tests map deserialization with too few bytes. - */ - @Test(expected = IOException.class) - public void testDeserializeMapTooShort3() throws Exception { - // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte (incomplete Key2) - KvStateSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3}, - LongSerializer.INSTANCE, LongSerializer.INSTANCE); - } - - private byte[] randomByteArray(int capacity) { - byte[] bytes = new byte[capacity]; - ThreadLocalRandom.current().nextBytes(bytes); - return bytes; - } -}
