http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java deleted file mode 100644 index 4b73fbb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.Preconditions; - -/** - * A {@link InternalKvState} instance request for a specific key and namespace. - */ -public final class KvStateRequest { - - /** ID for this request. */ - private final long requestId; - - /** ID of the requested KvState instance. */ - private final KvStateID kvStateId; - - /** Serialized key and namespace to request from the KvState instance. */ - private final byte[] serializedKeyAndNamespace; - - /** - * Creates a KvState instance request. - * - * @param requestId ID for this request - * @param kvStateId ID of the requested KvState instance - * @param serializedKeyAndNamespace Serialized key and namespace to request from the KvState - * instance - */ - KvStateRequest(long requestId, KvStateID kvStateId, byte[] serializedKeyAndNamespace) { - this.requestId = requestId; - this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID"); - this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - } - - /** - * Returns the request ID. - * - * @return Request ID - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the ID of the requested KvState instance. - * - * @return ID of the requested KvState instance - */ - public KvStateID getKvStateId() { - return kvStateId; - } - - /** - * Returns the serialized key and namespace to request from the KvState - * instance. - * - * @return Serialized key and namespace to request from the KvState instance - */ - public byte[] getSerializedKeyAndNamespace() { - return serializedKeyAndNamespace; - } - - @Override - public String toString() { - return "KvStateRequest{" + - "requestId=" + requestId + - ", kvStateId=" + kvStateId + - ", serializedKeyAndNamespace.length=" + serializedKeyAndNamespace.length + - '}'; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java deleted file mode 100644 index 06a3ce8..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestFailure.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -/** - * A failure response to a {@link KvStateRequest}. - */ -public final class KvStateRequestFailure { - - /** ID of the request responding to. */ - private final long requestId; - - /** Failure cause. Not allowed to be a user type. */ - private final Throwable cause; - - /** - * Creates a failure response to a {@link KvStateRequest}. - * - * @param requestId ID for the request responding to - * @param cause Failure cause (not allowed to be a user type) - */ - KvStateRequestFailure(long requestId, Throwable cause) { - this.requestId = requestId; - this.cause = cause; - } - - /** - * Returns the request ID responding to. - * - * @return Request ID responding to - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the failure cause. - * - * @return Failure cause - */ - public Throwable getCause() { - return cause; - } - - @Override - public String toString() { - return "KvStateRequestFailure{" + - "requestId=" + requestId + - ", cause=" + cause + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java deleted file mode 100644 index 2bd8a36..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestResult.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.util.Preconditions; - -/** - * A successful response to a {@link KvStateRequest} containing the serialized - * result for the requested key and namespace. - */ -public final class KvStateRequestResult { - - /** ID of the request responding to. */ - private final long requestId; - - /** - * Serialized result for the requested key and namespace. If no result was - * available for the specified key and namespace, this is <code>null</code>. - */ - private final byte[] serializedResult; - - /** - * Creates a successful {@link KvStateRequestResult} response. - * - * @param requestId ID of the request responding to - * @param serializedResult Serialized result or <code>null</code> if none - */ - KvStateRequestResult(long requestId, byte[] serializedResult) { - this.requestId = requestId; - this.serializedResult = Preconditions.checkNotNull(serializedResult, "Serialization result"); - } - - /** - * Returns the request ID responding to. - * - * @return Request ID responding to - */ - public long getRequestId() { - return requestId; - } - - /** - * Returns the serialized result or <code>null</code> if none available. - * - * @return Serialized result or <code>null</code> if none available. - */ - public byte[] getSerializedResult() { - return serializedResult; - } - - @Override - public String toString() { - return "KvStateRequestResult{" + - "requestId=" + requestId + - ", serializedResult.length=" + serializedResult.length + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java deleted file mode 100644 index 68f06e3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.netty.KvStateClient; -import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.util.DataInputDeserializer; -import org.apache.flink.runtime.util.DataOutputSerializer; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Serialization and deserialization of messages exchanged between - * {@link KvStateClient} and {@link KvStateServer}. - * - * <p>The binary messages have the following format: - * - * <pre> - * <------ Frame -------------------------> - * +----------------------------------------+ - * | HEADER (8) | PAYLOAD (VAR) | - * +------------------+----------------------------------------+ - * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) | - * +------------------+----------------------------------------+ - * </pre> - * - * <p>The concrete content of a message depends on the {@link KvStateRequestType}. - */ -public final class KvStateRequestSerializer { - - /** The serialization version ID. */ - private static final int VERSION = 0x79a1b710; - - /** Byte length of the header. */ - private static final int HEADER_LENGTH = 8; - - // ------------------------------------------------------------------------ - // Serialization - // ------------------------------------------------------------------------ - - /** - * Allocates a buffer and serializes the KvState request into it. - * - * @param alloc ByteBuf allocator for the buffer to - * serialize message into - * @param requestId ID for this request - * @param kvStateId ID of the requested KvState instance - * @param serializedKeyAndNamespace Serialized key and namespace to request - * from the KvState instance. - * @return Serialized KvState request message - */ - public static ByteBuf serializeKvStateRequest( - ByteBufAllocator alloc, - long requestId, - KvStateID kvStateId, - byte[] serializedKeyAndNamespace) { - - // Header + request ID + KvState ID + Serialized namespace - int frameLength = HEADER_LENGTH + 8 + (8 + 8) + (4 + serializedKeyAndNamespace.length); - ByteBuf buf = alloc.ioBuffer(frameLength + 4); // +4 for frame length - - buf.writeInt(frameLength); - - writeHeader(buf, KvStateRequestType.REQUEST); - - buf.writeLong(requestId); - buf.writeLong(kvStateId.getLowerPart()); - buf.writeLong(kvStateId.getUpperPart()); - buf.writeInt(serializedKeyAndNamespace.length); - buf.writeBytes(serializedKeyAndNamespace); - - return buf; - } - - /** - * Allocates a buffer and serializes the KvState request result into it. - * - * @param alloc ByteBuf allocator for the buffer to serialize message into - * @param requestId ID for this request - * @param serializedResult Serialized Result - * @return Serialized KvState request result message - */ - public static ByteBuf serializeKvStateRequestResult( - ByteBufAllocator alloc, - long requestId, - byte[] serializedResult) { - - Preconditions.checkNotNull(serializedResult, "Serialized result"); - - // Header + request ID + serialized result - int frameLength = HEADER_LENGTH + 8 + 4 + serializedResult.length; - - ByteBuf buf = alloc.ioBuffer(frameLength); - - buf.writeInt(frameLength); - writeHeader(buf, KvStateRequestType.REQUEST_RESULT); - buf.writeLong(requestId); - - buf.writeInt(serializedResult.length); - buf.writeBytes(serializedResult); - - return buf; - } - - /** - * Allocates a buffer and serializes the KvState request failure into it. - * - * @param alloc ByteBuf allocator for the buffer to serialize message into - * @param requestId ID of the request responding to - * @param cause Failure cause - * @return Serialized KvState request failure message - * @throws IOException Serialization failures are forwarded - */ - public static ByteBuf serializeKvStateRequestFailure( - ByteBufAllocator alloc, - long requestId, - Throwable cause) throws IOException { - - ByteBuf buf = alloc.ioBuffer(); - - // Frame length is set at the end - buf.writeInt(0); - - writeHeader(buf, KvStateRequestType.REQUEST_FAILURE); - - // Message - buf.writeLong(requestId); - - try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); - ObjectOutputStream out = new ObjectOutputStream(bbos)) { - - out.writeObject(cause); - } - - // Set frame length - int frameLength = buf.readableBytes() - 4; - buf.setInt(0, frameLength); - - return buf; - } - - /** - * Allocates a buffer and serializes the server failure into it. - * - * <p>The cause must not be or contain any user types as causes. - * - * @param alloc ByteBuf allocator for the buffer to serialize message into - * @param cause Failure cause - * @return Serialized server failure message - * @throws IOException Serialization failures are forwarded - */ - public static ByteBuf serializeServerFailure(ByteBufAllocator alloc, Throwable cause) throws IOException { - ByteBuf buf = alloc.ioBuffer(); - - // Frame length is set at end - buf.writeInt(0); - - writeHeader(buf, KvStateRequestType.SERVER_FAILURE); - - try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); - ObjectOutputStream out = new ObjectOutputStream(bbos)) { - - out.writeObject(cause); - } - - // Set frame length - int frameLength = buf.readableBytes() - 4; - buf.setInt(0, frameLength); - - return buf; - } - - // ------------------------------------------------------------------------ - // Deserialization - // ------------------------------------------------------------------------ - - /** - * Deserializes the header and returns the request type. - * - * @param buf Buffer to deserialize (expected to be at header position) - * @return Deserialzied request type - * @throws IllegalArgumentException If unexpected message version or message type - */ - public static KvStateRequestType deserializeHeader(ByteBuf buf) { - // Check the version - int version = buf.readInt(); - if (version != VERSION) { - throw new IllegalArgumentException("Illegal message version " + version + - ". Expected: " + VERSION + "."); - } - - // Get the message type - int msgType = buf.readInt(); - KvStateRequestType[] values = KvStateRequestType.values(); - if (msgType >= 0 && msgType < values.length) { - return values[msgType]; - } else { - throw new IllegalArgumentException("Illegal message type with index " + msgType); - } - } - - /** - * Deserializes the KvState request message. - * - * <p><strong>Important</strong>: the returned buffer is sliced from the - * incoming ByteBuf stream and retained. Therefore, it needs to be recycled - * by the consumer. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequest - */ - public static KvStateRequest deserializeKvStateRequest(ByteBuf buf) { - long requestId = buf.readLong(); - KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong()); - - // Serialized key and namespace - int length = buf.readInt(); - - if (length < 0) { - throw new IllegalArgumentException("Negative length for serialized key and namespace. " + - "This indicates a serialization error."); - } - - // Copy the buffer in order to be able to safely recycle the ByteBuf - byte[] serializedKeyAndNamespace = new byte[length]; - if (length > 0) { - buf.readBytes(serializedKeyAndNamespace); - } - - return new KvStateRequest(requestId, kvStateId, serializedKeyAndNamespace); - } - - /** - * Deserializes the KvState request result. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequestResult - */ - public static KvStateRequestResult deserializeKvStateRequestResult(ByteBuf buf) { - long requestId = buf.readLong(); - - // Serialized KvState - int length = buf.readInt(); - - if (length < 0) { - throw new IllegalArgumentException("Negative length for serialized result. " + - "This indicates a serialization error."); - } - - byte[] serializedValue = new byte[length]; - - if (length > 0) { - buf.readBytes(serializedValue); - } - - return new KvStateRequestResult(requestId, serializedValue); - } - - /** - * Deserializes the KvState request failure. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequestFailure - */ - public static KvStateRequestFailure deserializeKvStateRequestFailure(ByteBuf buf) throws IOException, ClassNotFoundException { - long requestId = buf.readLong(); - - Throwable cause; - try (ByteBufInputStream bbis = new ByteBufInputStream(buf); - ObjectInputStream in = new ObjectInputStream(bbis)) { - - cause = (Throwable) in.readObject(); - } - - return new KvStateRequestFailure(requestId, cause); - } - - /** - * Deserializes the KvState request failure. - * - * @param buf Buffer to deserialize (expected to be positioned after header) - * @return Deserialized KvStateRequestFailure - * @throws IOException Serialization failure are forwarded - * @throws ClassNotFoundException If Exception type can not be loaded - */ - public static Throwable deserializeServerFailure(ByteBuf buf) throws IOException, ClassNotFoundException { - try (ByteBufInputStream bbis = new ByteBufInputStream(buf); - ObjectInputStream in = new ObjectInputStream(bbis)) { - - return (Throwable) in.readObject(); - } - } - - // ------------------------------------------------------------------------ - // Generic serialization utils - // ------------------------------------------------------------------------ - - /** - * Serializes the key and namespace into a {@link ByteBuffer}. - * - * <p>The serialized format matches the RocksDB state backend key format, i.e. - * the key and namespace don't have to be deserialized for RocksDB lookups. - * - * @param key Key to serialize - * @param keySerializer Serializer for the key - * @param namespace Namespace to serialize - * @param namespaceSerializer Serializer for the namespace - * @param <K> Key type - * @param <N> Namespace type - * @return Buffer holding the serialized key and namespace - * @throws IOException Serialization errors are forwarded - */ - public static <K, N> byte[] serializeKeyAndNamespace( - K key, - TypeSerializer<K> keySerializer, - N namespace, - TypeSerializer<N> namespaceSerializer) throws IOException { - - DataOutputSerializer dos = new DataOutputSerializer(32); - - keySerializer.serialize(key, dos); - dos.writeByte(42); - namespaceSerializer.serialize(namespace, dos); - - return dos.getCopyOfBuffer(); - } - - /** - * Deserializes the key and namespace into a {@link Tuple2}. - * - * @param serializedKeyAndNamespace Serialized key and namespace - * @param keySerializer Serializer for the key - * @param namespaceSerializer Serializer for the namespace - * @param <K> Key type - * @param <N> Namespace - * @return Tuple2 holding deserialized key and namespace - * @throws IOException if the deserialization fails for any reason - */ - public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace( - byte[] serializedKeyAndNamespace, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer) throws IOException { - - DataInputDeserializer dis = new DataInputDeserializer( - serializedKeyAndNamespace, - 0, - serializedKeyAndNamespace.length); - - try { - K key = keySerializer.deserialize(dis); - byte magicNumber = dis.readByte(); - if (magicNumber != 42) { - throw new IOException("Unexpected magic number " + magicNumber + "."); - } - N namespace = namespaceSerializer.deserialize(dis); - - if (dis.available() > 0) { - throw new IOException("Unconsumed bytes in the serialized key and namespace."); - } - - return new Tuple2<>(key, namespace); - } catch (IOException e) { - throw new IOException("Unable to deserialize key " + - "and namespace. This indicates a mismatch in the key/namespace " + - "serializers used by the KvState instance and this access.", e); - } - } - - /** - * Serializes the value with the given serializer. - * - * @param value Value of type T to serialize - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Serialized value or <code>null</code> if value <code>null</code> - * @throws IOException On failure during serialization - */ - public static <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException { - if (value != null) { - // Serialize - DataOutputSerializer dos = new DataOutputSerializer(32); - serializer.serialize(value, dos); - return dos.getCopyOfBuffer(); - } else { - return null; - } - } - - /** - * Deserializes the value with the given serializer. - * - * @param serializedValue Serialized value of type T - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Deserialized value or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { - if (serializedValue == null) { - return null; - } else { - final DataInputDeserializer deser = new DataInputDeserializer( - serializedValue, 0, serializedValue.length); - final T value = serializer.deserialize(deser); - if (deser.available() > 0) { - throw new IOException( - "Unconsumed bytes in the deserialized value. " + - "This indicates a mismatch in the value serializers " + - "used by the KvState instance and this access."); - } - return value; - } - } - - /** - * Deserializes all values with the given serializer. - * - * @param serializedValue Serialized value of type List<T> - * @param serializer Serializer for T - * @param <T> Type of the value - * @return Deserialized list or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { - if (serializedValue != null) { - final DataInputDeserializer in = new DataInputDeserializer( - serializedValue, 0, serializedValue.length); - - try { - final List<T> result = new ArrayList<>(); - while (in.available() > 0) { - result.add(serializer.deserialize(in)); - - // The expected binary format has a single byte separator. We - // want a consistent binary format in order to not need any - // special casing during deserialization. A "cleaner" format - // would skip this extra byte, but would require a memory copy - // for RocksDB, which stores the data serialized in this way - // for lists. - if (in.available() > 0) { - in.readByte(); - } - } - - return result; - } catch (IOException e) { - throw new IOException( - "Unable to deserialize value. " + - "This indicates a mismatch in the value serializers " + - "used by the KvState instance and this access.", e); - } - } else { - return null; - } - } - - /** - * Serializes all values of the Iterable with the given serializer. - * - * @param entries Key-value pairs to serialize - * @param keySerializer Serializer for UK - * @param valueSerializer Serializer for UV - * @param <UK> Type of the keys - * @param <UV> Type of the values - * @return Serialized values or <code>null</code> if values <code>null</code> or empty - * @throws IOException On failure during serialization - */ - public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { - if (entries != null) { - // Serialize - DataOutputSerializer dos = new DataOutputSerializer(32); - - for (Map.Entry<UK, UV> entry : entries) { - keySerializer.serialize(entry.getKey(), dos); - - if (entry.getValue() == null) { - dos.writeBoolean(true); - } else { - dos.writeBoolean(false); - valueSerializer.serialize(entry.getValue(), dos); - } - } - - return dos.getCopyOfBuffer(); - } else { - return null; - } - } - - /** - * Deserializes all kv pairs with the given serializer. - * - * @param serializedValue Serialized value of type Map<UK, UV> - * @param keySerializer Serializer for UK - * @param valueSerializer Serializer for UV - * @param <UK> Type of the key - * @param <UV> Type of the value. - * @return Deserialized map or <code>null</code> if the serialized value - * is <code>null</code> - * @throws IOException On failure during deserialization - */ - public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { - if (serializedValue != null) { - DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); - - Map<UK, UV> result = new HashMap<>(); - while (in.available() > 0) { - UK key = keySerializer.deserialize(in); - - boolean isNull = in.readBoolean(); - UV value = isNull ? null : valueSerializer.deserialize(in); - - result.put(key, value); - } - - return result; - } else { - return null; - } - } - - // ------------------------------------------------------------------------ - - /** - * Helper for writing the header. - * - * @param buf Buffer to serialize header into - * @param requestType Result type to serialize - */ - private static void writeHeader(ByteBuf buf, KvStateRequestType requestType) { - buf.writeInt(VERSION); - buf.writeInt(requestType.ordinal()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java deleted file mode 100644 index de7270a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty.message; - -import org.apache.flink.runtime.query.netty.KvStateServer; - -/** - * Expected message types when communicating with the {@link KvStateServer}. - */ -public enum KvStateRequestType { - - /** Request a KvState instance. */ - REQUEST, - - /** Successful response to a KvStateRequest. */ - REQUEST_RESULT, - - /** Failure response to a KvStateRequest. */ - REQUEST_FAILURE, - - /** Generic server failure. */ - SERVER_FAILURE - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java new file mode 100644 index 0000000..44ee571 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query.netty.message; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.util.DataInputDeserializer; +import org.apache.flink.runtime.util.DataOutputSerializer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Serialization and deserialization the different state types and namespaces. + */ +public final class KvStateSerializer { + + // ------------------------------------------------------------------------ + // Generic serialization utils + // ------------------------------------------------------------------------ + + /** + * Serializes the key and namespace into a {@link ByteBuffer}. + * + * <p>The serialized format matches the RocksDB state backend key format, i.e. + * the key and namespace don't have to be deserialized for RocksDB lookups. + * + * @param key Key to serialize + * @param keySerializer Serializer for the key + * @param namespace Namespace to serialize + * @param namespaceSerializer Serializer for the namespace + * @param <K> Key type + * @param <N> Namespace type + * @return Buffer holding the serialized key and namespace + * @throws IOException Serialization errors are forwarded + */ + public static <K, N> byte[] serializeKeyAndNamespace( + K key, + TypeSerializer<K> keySerializer, + N namespace, + TypeSerializer<N> namespaceSerializer) throws IOException { + + DataOutputSerializer dos = new DataOutputSerializer(32); + + keySerializer.serialize(key, dos); + dos.writeByte(42); + namespaceSerializer.serialize(namespace, dos); + + return dos.getCopyOfBuffer(); + } + + /** + * Deserializes the key and namespace into a {@link Tuple2}. + * + * @param serializedKeyAndNamespace Serialized key and namespace + * @param keySerializer Serializer for the key + * @param namespaceSerializer Serializer for the namespace + * @param <K> Key type + * @param <N> Namespace + * @return Tuple2 holding deserialized key and namespace + * @throws IOException if the deserialization fails for any reason + */ + public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace( + byte[] serializedKeyAndNamespace, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) throws IOException { + + DataInputDeserializer dis = new DataInputDeserializer( + serializedKeyAndNamespace, + 0, + serializedKeyAndNamespace.length); + + try { + K key = keySerializer.deserialize(dis); + byte magicNumber = dis.readByte(); + if (magicNumber != 42) { + throw new IOException("Unexpected magic number " + magicNumber + "."); + } + N namespace = namespaceSerializer.deserialize(dis); + + if (dis.available() > 0) { + throw new IOException("Unconsumed bytes in the serialized key and namespace."); + } + + return new Tuple2<>(key, namespace); + } catch (IOException e) { + throw new IOException("Unable to deserialize key " + + "and namespace. This indicates a mismatch in the key/namespace " + + "serializers used by the KvState instance and this access.", e); + } + } + + /** + * Serializes the value with the given serializer. + * + * @param value Value of type T to serialize + * @param serializer Serializer for T + * @param <T> Type of the value + * @return Serialized value or <code>null</code> if value <code>null</code> + * @throws IOException On failure during serialization + */ + public static <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException { + if (value != null) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + serializer.serialize(value, dos); + return dos.getCopyOfBuffer(); + } else { + return null; + } + } + + /** + * Deserializes the value with the given serializer. + * + * @param serializedValue Serialized value of type T + * @param serializer Serializer for T + * @param <T> Type of the value + * @return Deserialized value or <code>null</code> if the serialized value + * is <code>null</code> + * @throws IOException On failure during deserialization + */ + public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { + if (serializedValue == null) { + return null; + } else { + final DataInputDeserializer deser = new DataInputDeserializer( + serializedValue, 0, serializedValue.length); + final T value = serializer.deserialize(deser); + if (deser.available() > 0) { + throw new IOException( + "Unconsumed bytes in the deserialized value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access."); + } + return value; + } + } + + /** + * Deserializes all values with the given serializer. + * + * @param serializedValue Serialized value of type List<T> + * @param serializer Serializer for T + * @param <T> Type of the value + * @return Deserialized list or <code>null</code> if the serialized value + * is <code>null</code> + * @throws IOException On failure during deserialization + */ + public static <T> List<T> deserializeList(byte[] serializedValue, TypeSerializer<T> serializer) throws IOException { + if (serializedValue != null) { + final DataInputDeserializer in = new DataInputDeserializer( + serializedValue, 0, serializedValue.length); + + try { + final List<T> result = new ArrayList<>(); + while (in.available() > 0) { + result.add(serializer.deserialize(in)); + + // The expected binary format has a single byte separator. We + // want a consistent binary format in order to not need any + // special casing during deserialization. A "cleaner" format + // would skip this extra byte, but would require a memory copy + // for RocksDB, which stores the data serialized in this way + // for lists. + if (in.available() > 0) { + in.readByte(); + } + } + + return result; + } catch (IOException e) { + throw new IOException( + "Unable to deserialize value. " + + "This indicates a mismatch in the value serializers " + + "used by the KvState instance and this access.", e); + } + } else { + return null; + } + } + + /** + * Serializes all values of the Iterable with the given serializer. + * + * @param entries Key-value pairs to serialize + * @param keySerializer Serializer for UK + * @param valueSerializer Serializer for UV + * @param <UK> Type of the keys + * @param <UV> Type of the values + * @return Serialized values or <code>null</code> if values <code>null</code> or empty + * @throws IOException On failure during serialization + */ + public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { + if (entries != null) { + // Serialize + DataOutputSerializer dos = new DataOutputSerializer(32); + + for (Map.Entry<UK, UV> entry : entries) { + keySerializer.serialize(entry.getKey(), dos); + + if (entry.getValue() == null) { + dos.writeBoolean(true); + } else { + dos.writeBoolean(false); + valueSerializer.serialize(entry.getValue(), dos); + } + } + + return dos.getCopyOfBuffer(); + } else { + return null; + } + } + + /** + * Deserializes all kv pairs with the given serializer. + * + * @param serializedValue Serialized value of type Map<UK, UV> + * @param keySerializer Serializer for UK + * @param valueSerializer Serializer for UV + * @param <UK> Type of the key + * @param <UV> Type of the value. + * @return Deserialized map or <code>null</code> if the serialized value + * is <code>null</code> + * @throws IOException On failure during deserialization + */ + public static <UK, UV> Map<UK, UV> deserializeMap(byte[] serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) throws IOException { + if (serializedValue != null) { + DataInputDeserializer in = new DataInputDeserializer(serializedValue, 0, serializedValue.length); + + Map<UK, UV> result = new HashMap<>(); + while (in.available() > 0) { + UK key = keySerializer.deserialize(in); + + boolean isNull = in.readBoolean(); + UV value = isNull ? null : valueSerializer.deserialize(in); + + result.put(key, value); + } + + return result; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java deleted file mode 100644 index 7e8de40..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/package-info.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains all Netty-based client/server classes used to query - * KvState instances. - * - * <h2>Server and Client</h2> - * - * <p>Both server and client expect received binary messages to contain a frame - * length field. Netty's {@link io.netty.handler.codec.LengthFieldBasedFrameDecoder} - * is used to fully receive the frame before giving it to the respective client - * or server handler. - * - * <p>Connection establishment and release happens by the client. The server - * only closes a connection if a fatal failure happens that cannot be resolved - * otherwise. - * - * <p>The is a single server per task manager and a single client can be shared - * by multiple Threads. - * - * <p>See also: - * <ul> - * <li>{@link org.apache.flink.runtime.query.netty.KvStateServer}</li> - * <li>{@link org.apache.flink.runtime.query.netty.KvStateServerHandler}</li> - * <li>{@link org.apache.flink.runtime.query.netty.KvStateClient}</li> - * <li>{@link org.apache.flink.runtime.query.netty.KvStateClientHandler}</li> - * </ul> - * - * <h2>Serialization</h2> - * - * <p>The exchanged binary messages have the following format: - * - * <pre> - * <------ Frame -------------------------> - * +----------------------------------------+ - * | HEADER (8) | PAYLOAD (VAR) | - * +------------------+----------------------------------------+ - * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) | - * +------------------+----------------------------------------+ - * </pre> - * - * <p>For frame decoding, both server and client use Netty's {@link - * io.netty.handler.codec.LengthFieldBasedFrameDecoder}. Message serialization - * is done via static helpers in {@link org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}. - * The serialization helpers return {@link io.netty.buffer.ByteBuf} instances, - * which are ready to be sent to the client or server respectively as they - * contain the frame length. - * - * <p>See also: - * <ul> - * <li>{@link org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}</li> - * </ul> - * - * <h2>Statistics</h2> - * - * <p>Both server and client keep track of request statistics via {@link - * org.apache.flink.runtime.query.netty.KvStateRequestStats}. - * - * <p>See also: - * <ul> - * <li>{@link org.apache.flink.runtime.query.netty.KvStateRequestStats}</li> - * </ul> - */ -package org.apache.flink.runtime.query.netty; http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java deleted file mode 100644 index 07a4396..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/package-info.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains all KvState query related classes. - * - * <h2>TaskManager and JobManager</h2> - * - * <p>State backends register queryable state instances at the {@link - * org.apache.flink.runtime.query.KvStateRegistry}. - * There is one registry per TaskManager. Registered KvState instances are - * reported to the JobManager, where they are aggregated at the {@link - * org.apache.flink.runtime.query.KvStateLocationRegistry}. - * - * <p>Instances of {@link org.apache.flink.runtime.query.KvStateLocation} contain - * all information needed for a client to query a KvState instance. - * - * <p>See also: - * <ul> - * <li>{@link org.apache.flink.runtime.query.KvStateRegistry}</li> - * <li>{@link org.apache.flink.runtime.query.TaskKvStateRegistry}</li> - * <li>{@link org.apache.flink.runtime.query.KvStateLocation}</li> - * <li>{@link org.apache.flink.runtime.query.KvStateLocationRegistry}</li> - * </ul> - * - * <h2>Client</h2> - * - * The {@link org.apache.flink.runtime.query.QueryableStateClient} is used - * to query KvState instances. The client takes care of {@link - * org.apache.flink.runtime.query.KvStateLocation} lookup and caching. Queries - * are then dispatched via the network client. - * - * <h3>JobManager Communication</h3> - * - * <p>The JobManager is queried for {@link org.apache.flink.runtime.query.KvStateLocation} - * instances via the {@link org.apache.flink.runtime.query.KvStateLocationLookupService}. - * The client caches resolved locations and dispatches queries directly to the - * TaskManager. - * - * <h3>TaskManager Communication</h3> - * - * <p>After the location has been resolved, the TaskManager is queried via the - * {@link org.apache.flink.runtime.query.netty.KvStateClient}. - */ -package org.apache.flink.runtime.query; http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java index 7e1123d..97b6bcd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.util.Preconditions; @@ -90,7 +90,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - Tuple2<K, N> keyAndNamespace = KvStateRequestSerializer.deserializeKeyAndNamespace( + Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace( serializedKeyAndNamespace, keySerializer, namespaceSerializer); return getSerializedValue(keyAndNamespace.f0, keyAndNamespace.f1); @@ -108,7 +108,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St @SuppressWarnings("unchecked,rawtypes") TypeSerializer serializer = stateDesc.getSerializer(); - return KvStateRequestSerializer.serializeValue(result, serializer); + return KvStateSerializer.serializeValue(result, serializer); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index f393237..f981b9f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.Preconditions; @@ -153,6 +153,6 @@ public class HeapMapState<K, N, UK, UV> TypeSerializer<UK> userKeySerializer = stateDesc.getKeySerializer(); TypeSerializer<UV> userValueSerializer = stateDesc.getValueSerializer(); - return KvStateRequestSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer); + return KvStateSerializer.serializeMap(result.entrySet(), userKeySerializer, userValueSerializer); } } http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index fa1ae54..37d28de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -47,7 +47,7 @@ public class QueryableStateConfiguration { // ------------------------------------------------------------------------ /** - * Returns whether queryable state is enabled. + * Returns whether queryable state is enabled. */ public boolean enabled() { return enabled; @@ -70,7 +70,7 @@ public class QueryableStateConfiguration { /** * Returns the number of threads for the thread pool that performs the actual state lookup. - * These threads perform the actual state lookup. + * These threads perform the actual state lookup. */ public int numQueryThreads() { return numQueryThreads; @@ -90,7 +90,7 @@ public class QueryableStateConfiguration { // ------------------------------------------------------------------------ /** - * Gets the configuration describing the queryable state as deactivated. + * Gets the configuration describing the queryable state as deactivated. */ public static QueryableStateConfiguration disabled() { return new QueryableStateConfiguration(false, 0, 0, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 1c30ff6..7c5c830 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -39,8 +39,9 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.QueryableStateUtils; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.query.KvStateServer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer; @@ -48,6 +49,7 @@ import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,7 +140,7 @@ public class TaskManagerServices { public FileCache getFileCache() { return fileCache; } - + public TaskSlotTable getTaskSlotTable() { return taskSlotTable; } @@ -214,7 +216,7 @@ public class TaskManagerServices { final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); - + return new TaskManagerServices( taskManagerLocation, memoryManager, @@ -354,7 +356,7 @@ public class TaskManagerServices { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); KvStateRegistry kvStateRegistry = new KvStateRegistry(); - KvStateServer kvStateServer; + KvStateServer kvStateServer = null; if (taskManagerServicesConfiguration.getQueryableStateConfig().enabled()) { QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); @@ -365,15 +367,13 @@ public class TaskManagerServices { int numQueryThreads = qsConfig.numQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads(); - kvStateServer = new KvStateServer( - taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.port(), - numNetworkThreads, - numQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()); - } else { - kvStateServer = null; + kvStateServer = QueryableStateUtils.createKvStateServer( + taskManagerServicesConfiguration.getTaskManagerAddress(), + qsConfig.port(), + numNetworkThreads, + numQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); } // we start the network first, to make sure it can allocate its buffers first @@ -395,7 +395,7 @@ public class TaskManagerServices { * Calculates the amount of memory used for network buffers based on the total memory to use and * the according configuration parameters. * - * The following configuration parameters are involved: + * <p>The following configuration parameters are involved: * <ul> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li> @@ -458,11 +458,11 @@ public class TaskManagerServices { * Calculates the amount of memory used for network buffers inside the current JVM instance * based on the available heap or the max heap size and the according configuration parameters. * - * For containers or when started via scripts, if started with a memory limit and set to use + * <p>For containers or when started via scripts, if started with a memory limit and set to use * off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able * to extract the intended values from this. * - * The following configuration parameters are involved: + * <p>The following configuration parameters are involved: * <ul> * <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li> * <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li> @@ -629,7 +629,7 @@ public class TaskManagerServices { if (LOG.isInfoEnabled()) { long totalSpaceGb = file.getTotalSpace() >> 30; long usableSpaceGb = file.getUsableSpace() >> 30; - double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; + double usablePercentage = (double) usableSpaceGb / totalSpaceGb * 100; String path = file.getAbsolutePath(); LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", path, totalSpaceGb, usableSpaceGb, usablePercentage)); http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 6cc7569..f1f7d39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.MathUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Configuration for the task manager services such as the network environment, the memory manager, - * the io manager and the metric registry + * the io manager and the metric registry. */ public class TaskManagerServicesConfiguration { private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServicesConfiguration.class); @@ -106,7 +107,6 @@ public class TaskManagerServicesConfiguration { // Getter/Setter // -------------------------------------------------------------------------------------------- - public InetAddress getTaskManagerAddress() { return taskManagerAddress; } @@ -291,7 +291,7 @@ public class TaskManagerServicesConfiguration { if (!hasNewNetworkBufConf(configuration)) { // map old config to new one: - networkBufMin = networkBufMax = ((long)numNetworkBuffers) * pageSize; + networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize; } else { if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { LOG.info("Ignoring old (but still present) network buffer configuration via {}.", @@ -439,9 +439,8 @@ public class TaskManagerServicesConfiguration { static void checkConfigParameter(boolean condition, Object parameter, String name, String errorMessage) throws IllegalConfigurationException { if (!condition) { - throw new IllegalConfigurationException("Invalid configuration value for " + + throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); } } } - http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java deleted file mode 100644 index edefcf8..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.akka.FlinkUntypedActor; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy; -import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory; -import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.Status; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests for {@link AkkaKvStateLocationLookupService}. - */ -public class AkkaKvStateLocationLookupServiceTest extends TestLogger { - - /** The default timeout. */ - private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS); - - /** Test actor system shared between the tests. */ - private static ActorSystem testActorSystem; - - @BeforeClass - public static void setUp() throws Exception { - testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); - } - - @AfterClass - public static void tearDown() throws Exception { - if (testActorSystem != null) { - testActorSystem.shutdown(); - } - } - - /** - * Tests responses if no leader notification has been reported or leadership - * has been lost (leaderAddress = <code>null</code>). - */ - @Test - public void testNoJobManagerRegistered() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - null, - null); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // - // No leader registered initially => fail with UnknownJobManager - // - try { - JobID jobId = new JobID(); - String name = "coffee"; - - Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name); - - Await.result(locationFuture, TIMEOUT); - fail("Did not throw expected Exception"); - } catch (UnknownJobManager ignored) { - // Expected - } - - assertEquals("Received unexpected lookup", 0, received.size()); - - // - // Leader registration => communicate with new leader - // - UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID; - KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea"); - - ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected); - - String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - // Notify the service about a leader - leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId); - - JobID jobId = new JobID(); - String name = "tea"; - - // Verify that the leader response is handled - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT); - assertEquals(expected, location); - - // Verify that the correct message was sent to the leader - assertEquals(1, received.size()); - - verifyLookupMsg(received.poll(), jobId, name); - - // - // Leader loss => fail with UnknownJobManager - // - leaderRetrievalService.notifyListener(null, null); - - try { - Future<KvStateLocation> locationFuture = lookupService - .getKvStateLookupInfo(new JobID(), "coffee"); - - Await.result(locationFuture, TIMEOUT); - fail("Did not throw expected Exception"); - } catch (UnknownJobManager ignored) { - // Expected - } - - // No new messages received - assertEquals(0, received.size()); - } - - /** - * Tests that messages are properly decorated with the leader session ID. - */ - @Test - public void testLeaderSessionIdChange() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - "localhost", - HighAvailabilityServices.DEFAULT_LEADER_ID); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // Create test actors with random leader session IDs - KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt"); - UUID leaderSessionId1 = UUID.randomUUID(); - ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1); - String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1); - - KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper"); - UUID leaderSessionId2 = UUID.randomUUID(); - ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2); - String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2); - - JobID jobId = new JobID(); - - // - // Notify about first leader - // - leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1); - - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT); - assertEquals(expected1, location); - - assertEquals(1, received.size()); - verifyLookupMsg(received.poll(), jobId, "rock"); - - // - // Notify about second leader - // - leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2); - - location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT); - assertEquals(expected2, location); - - assertEquals(1, received.size()); - verifyLookupMsg(received.poll(), jobId, "roll"); - } - - /** - * Tests that lookups are retried when no leader notification is available. - */ - @Test - public void testRetryOnUnknownJobManager() throws Exception { - final Queue<LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>(); - - LookupRetryStrategyFactory retryStrategy = - new LookupRetryStrategyFactory() { - @Override - public LookupRetryStrategy createRetryStrategy() { - return retryStrategies.poll(); - } - }; - - final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - null, - null); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - retryStrategy); - - lookupService.start(); - - // - // Test call to retry - // - final AtomicBoolean hasRetried = new AtomicBoolean(); - retryStrategies.add( - new LookupRetryStrategy() { - @Override - public FiniteDuration getRetryDelay() { - return FiniteDuration.Zero(); - } - - @Override - public boolean tryRetry() { - if (hasRetried.compareAndSet(false, true)) { - return true; - } - return false; - } - }); - - Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir"); - - Await.ready(locationFuture, TIMEOUT); - assertTrue("Did not retry ", hasRetried.get()); - - // - // Test leader notification after retry - // - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic"); - ActorRef testActor = LookupResponseActor.create(received, null, expected); - final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - retryStrategies.add(new LookupRetryStrategy() { - @Override - public FiniteDuration getRetryDelay() { - return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); - } - - @Override - public boolean tryRetry() { - leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID); - return true; - } - }); - - KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT); - assertEquals(expected, location); - } - - @Test - public void testUnexpectedResponseType() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( - "localhost", - HighAvailabilityServices.DEFAULT_LEADER_ID); - Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>(); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - testActorSystem, - TIMEOUT, - new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory()); - - lookupService.start(); - - // Create test actors with random leader session IDs - String expected = "unexpected-response-type"; - ActorRef testActor = LookupResponseActor.create(received, null, expected); - String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor); - - leaderRetrievalService.notifyListener(testActorAddress, null); - - try { - Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT); - fail("Did not throw expected Exception"); - } catch (Throwable ignored) { - // Expected - } - } - - private static final class LookupResponseActor extends FlinkUntypedActor { - - /** Received lookup messages. */ - private final Queue<LookupKvStateLocation> receivedLookups; - - /** Responses on KvStateMessage.LookupKvStateLocation messages. */ - private final Queue<Object> lookupResponses; - - /** The leader session ID. */ - private UUID leaderSessionId; - - public LookupResponseActor( - Queue<LookupKvStateLocation> receivedLookups, - UUID leaderSessionId, Object... lookupResponses) { - - this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups"); - this.leaderSessionId = leaderSessionId; - this.lookupResponses = new ArrayDeque<>(); - - if (lookupResponses != null) { - for (Object resp : lookupResponses) { - this.lookupResponses.add(resp); - } - } - } - - @Override - public void handleMessage(Object message) throws Exception { - if (message instanceof LookupKvStateLocation) { - // Add to received lookups queue - receivedLookups.add((LookupKvStateLocation) message); - - Object msg = lookupResponses.poll(); - if (msg != null) { - if (msg instanceof Throwable) { - sender().tell(new Status.Failure((Throwable) msg), self()); - } else { - sender().tell(new Status.Success(msg), self()); - } - } - } else if (message instanceof UUID) { - this.leaderSessionId = (UUID) message; - } else { - LOG.debug("Received unhandled message: {}", message); - } - } - - @Override - protected UUID getLeaderSessionID() { - return leaderSessionId; - } - - private static ActorRef create( - Queue<LookupKvStateLocation> receivedLookups, - UUID leaderSessionId, - Object... lookupResponses) { - - return testActorSystem.actorOf(Props.create( - LookupResponseActor.class, - receivedLookups, - leaderSessionId, - lookupResponses)); - } - } - - private static void verifyLookupMsg( - LookupKvStateLocation lookUpMsg, - JobID expectedJobId, - String expectedName) { - - assertNotNull(lookUpMsg); - assertEquals(expectedJobId, lookUpMsg.getJobId()); - assertEquals(expectedName, lookUpMsg.getRegistrationName()); - } - -}
