http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java new file mode 100644 index 0000000..fc9b1d4 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandler.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; +import org.apache.flink.queryablestate.network.messages.MessageSerializer; +import org.apache.flink.queryablestate.network.messages.MessageType; +import org.apache.flink.queryablestate.network.messages.RequestFailure; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.channels.ClosedChannelException; + +/** + * The handler used by a {@link Client} to handling incoming messages. + * + * @param <REQ> the type of request the client will send. + * @param <RESP> the type of response the client expects to receive. + */ +@Internal +public class ClientHandler<REQ extends MessageBody, RESP extends MessageBody> extends ChannelInboundHandlerAdapter { + + private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class); + + private final String clientName; + + private final MessageSerializer<REQ, RESP> serializer; + + private final ClientHandlerCallback<RESP> callback; + + /** + * Creates a handler with the callback. + * + * @param clientName the name of the client. + * @param serializer the serializer used to (de-)serialize messages. + * @param callback Callback for responses. + */ + public ClientHandler( + final String clientName, + final MessageSerializer<REQ, RESP> serializer, + final ClientHandlerCallback<RESP> callback) { + + this.clientName = Preconditions.checkNotNull(clientName); + this.serializer = Preconditions.checkNotNull(serializer); + this.callback = Preconditions.checkNotNull(callback); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + ByteBuf buf = (ByteBuf) msg; + MessageType msgType = MessageSerializer.deserializeHeader(buf); + + if (msgType == MessageType.REQUEST_RESULT) { + long requestId = MessageSerializer.getRequestId(buf); + RESP result = serializer.deserializeResponse(buf); + callback.onRequestResult(requestId, result); + } else if (msgType == MessageType.REQUEST_FAILURE) { + RequestFailure failure = MessageSerializer.deserializeRequestFailure(buf); + callback.onRequestFailure(failure.getRequestId(), failure.getCause()); + } else if (msgType == MessageType.SERVER_FAILURE) { + throw MessageSerializer.deserializeServerFailure(buf); + } else { + throw new IllegalStateException("Unexpected response type '" + msgType + "'"); + } + } catch (Throwable t1) { + try { + callback.onFailure(t1); + } catch (Throwable t2) { + LOG.error("Failed to notify callback about failure", t2); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + try { + callback.onFailure(cause); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Only the client is expected to close the channel. Otherwise it + // indicates a failure. Note that this will be invoked in both cases + // though. If the callback closed the channel, the callback must be + // ignored. + try { + callback.onFailure(new ClosedChannelException()); + } catch (Throwable t) { + LOG.error("Failed to notify callback about failure", t); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java new file mode 100644 index 0000000..00ce1ed --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ClientHandlerCallback.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.queryablestate.network.messages.MessageBody; + +/** + * Callback for {@link ClientHandler}. + */ +@Internal +public interface ClientHandlerCallback<RESP extends MessageBody> { + + /** + * Called on a successful request. + * + * @param requestId ID of the request + * @param response The received response + */ + void onRequestResult(long requestId, RESP response); + + /** + * Called on a failed request. + * + * @param requestId ID of the request + * @param cause Cause of the request failure + */ + void onRequestFailure(long requestId, Throwable cause); + + /** + * Called on any failure, which is not related to a specific request. + * + * <p>This can be for example a caught Exception in the channel pipeline + * or an unexpected channel close. + * + * @param cause Cause of the failure + */ + void onFailure(Throwable cause); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java new file mode 100644 index 0000000..5e014b8 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/NettyBufferPool.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control + * over the number of created arenas. + */ +public class NettyBufferPool implements ByteBufAllocator { + + /** The wrapped buffer allocator. */ + private final PooledByteBufAllocator alloc; + + /** + * Creates Netty's buffer pool with the specified number of direct arenas. + * + * @param numberOfArenas Number of arenas (recommended: 2 * number of task + * slots) + */ + public NettyBufferPool(int numberOfArenas) { + checkArgument(numberOfArenas >= 1, "Number of arenas"); + + // We strictly prefer direct buffers and disallow heap allocations. + boolean preferDirect = true; + + // Arenas allocate chunks of pageSize << maxOrder bytes. With these + // defaults, this results in chunks of 16 MB. + int pageSize = 8192; + int maxOrder = 11; + + // Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e. + // we allocate numDirectArenas * 16 MB of direct memory. This can grow + // to multiple chunks per arena during runtime, but this should only + // happen with a large amount of connections per task manager. We + // control the memory allocations with low/high watermarks when writing + // to the TCP channels. Chunks are allocated lazily. + int numDirectArenas = numberOfArenas; + + // No heap arenas, please. + int numHeapArenas = 0; + + this.alloc = new PooledByteBufAllocator( + preferDirect, + numHeapArenas, + numDirectArenas, + pageSize, + maxOrder); + } + + // ------------------------------------------------------------------------ + // Delegate calls to the allocated and prohibit heap buffer allocations + // ------------------------------------------------------------------------ + + @Override + public ByteBuf buffer() { + return alloc.buffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return alloc.buffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return alloc.buffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return alloc.ioBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return alloc.ioBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return alloc.ioBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf directBuffer() { + return alloc.directBuffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return alloc.directBuffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return alloc.directBuffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return alloc.compositeBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return alloc.compositeBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return alloc.compositeDirectBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return alloc.compositeDirectBuffer(maxNumComponents); + } + + @Override + public boolean isDirectBufferPooled() { + return alloc.isDirectBufferPooled(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java new file mode 100644 index 0000000..f26c267 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageBody.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +/** + * The base class for every message exchanged during the communication between + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. + * + * <p>Every such message should also have a {@link MessageDeserializer}. + */ +@Internal +public abstract class MessageBody { + + /** + * Serializes the message into a byte array. + * @return A byte array with the serialized content of the message. + */ + public abstract byte[] serialize(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java new file mode 100644 index 0000000..436fb82 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageDeserializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; + +/** + * A utility used to deserialize a {@link MessageBody message}. + * @param <M> The type of the message to be deserialized. + * It has to extend {@link MessageBody} + */ +@Internal +public interface MessageDeserializer<M extends MessageBody> { + + /** + * Deserializes a message contained in a byte buffer. + * @param buf the buffer containing the message. + * @return The deserialized message. + */ + M deserializeMessage(ByteBuf buf); +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java new file mode 100644 index 0000000..c0a0d32 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageSerializer.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; + +/** + * Serialization and deserialization of messages exchanged between + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. + * + * <p>The binary messages have the following format: + * + * <pre> + * <------ Frame -------------------------> + * +----------------------------------------+ + * | HEADER (8) | PAYLOAD (VAR) | + * +------------------+----------------------------------------+ + * | FRAME LENGTH (4) | VERSION (4) | TYPE (4) | CONTENT (VAR) | + * +------------------+----------------------------------------+ + * </pre> + * + * <p>The concrete content of a message depends on the {@link MessageType}. + * + * @param <REQ> Type of the requests of the protocol. + * @param <RESP> Type of the responses of the protocol. + */ +@Internal +public final class MessageSerializer<REQ extends MessageBody, RESP extends MessageBody> { + + /** The serialization version ID. */ + private static final int VERSION = 0x79a1b710; + + /** Byte length of the header. */ + private static final int HEADER_LENGTH = 2 * Integer.BYTES; + + /** Byte length of the request id. */ + private static final int REQUEST_ID_SIZE = Long.BYTES; + + /** The constructor of the {@link MessageBody client requests}. Used for deserialization. */ + private final MessageDeserializer<REQ> requestDeserializer; + + /** The constructor of the {@link MessageBody server responses}. Used for deserialization. */ + private final MessageDeserializer<RESP> responseDeserializer; + + public MessageSerializer(MessageDeserializer<REQ> requestDeser, MessageDeserializer<RESP> responseDeser) { + requestDeserializer = Preconditions.checkNotNull(requestDeser); + responseDeserializer = Preconditions.checkNotNull(responseDeser); + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + /** + * Serializes the request sent to the + * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param request The request to be serialized. + * @return A {@link ByteBuf} containing the serialized message. + */ + public static <REQ extends MessageBody> ByteBuf serializeRequest( + final ByteBufAllocator alloc, + final long requestId, + final REQ request) { + Preconditions.checkNotNull(request); + return writePayload(alloc, requestId, MessageType.REQUEST, request.serialize()); + } + + /** + * Serializes the response sent to the + * {@link org.apache.flink.queryablestate.network.Client}. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param response The response to be serialized. + * @return A {@link ByteBuf} containing the serialized message. + */ + public static <RESP extends MessageBody> ByteBuf serializeResponse( + final ByteBufAllocator alloc, + final long requestId, + final RESP response) { + Preconditions.checkNotNull(response); + return writePayload(alloc, requestId, MessageType.REQUEST_RESULT, response.serialize()); + } + + /** + * Serializes the exception containing the failure message sent to the + * {@link org.apache.flink.queryablestate.network.Client} in case of + * protocol related errors. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param cause The exception thrown at the server. + * @return A {@link ByteBuf} containing the serialized message. + */ + public static ByteBuf serializeRequestFailure( + final ByteBufAllocator alloc, + final long requestId, + final Throwable cause) throws IOException { + + final ByteBuf buf = alloc.ioBuffer(); + + // Frame length is set at the end + buf.writeInt(0); + writeHeader(buf, MessageType.REQUEST_FAILURE); + buf.writeLong(requestId); + + try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); + ObjectOutput out = new ObjectOutputStream(bbos)) { + out.writeObject(cause); + } + + // Set frame length + int frameLength = buf.readableBytes() - Integer.BYTES; + buf.setInt(0, frameLength); + return buf; + } + + /** + * Serializes the failure message sent to the + * {@link org.apache.flink.queryablestate.network.Client} in case of + * server related errors. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param cause The exception thrown at the server. + * @return The failure message. + */ + public static ByteBuf serializeServerFailure( + final ByteBufAllocator alloc, + final Throwable cause) throws IOException { + + final ByteBuf buf = alloc.ioBuffer(); + + // Frame length is set at end + buf.writeInt(0); + writeHeader(buf, MessageType.SERVER_FAILURE); + + try (ByteBufOutputStream bbos = new ByteBufOutputStream(buf); + ObjectOutput out = new ObjectOutputStream(bbos)) { + out.writeObject(cause); + } + + // Set frame length + int frameLength = buf.readableBytes() - Integer.BYTES; + buf.setInt(0, frameLength); + return buf; + } + + /** + * Helper for serializing the header. + * + * @param buf The {@link ByteBuf} to serialize the header into. + * @param messageType The {@link MessageType} of the message this header refers to. + */ + private static void writeHeader(final ByteBuf buf, final MessageType messageType) { + buf.writeInt(VERSION); + buf.writeInt(messageType.ordinal()); + } + + /** + * Helper for serializing the messages. + * + * @param alloc The {@link ByteBufAllocator} used to allocate the buffer to serialize the message into. + * @param requestId The id of the request to which the message refers to. + * @param messageType The {@link MessageType type of the message}. + * @param payload The serialized version of the message. + * @return A {@link ByteBuf} containing the serialized message. + */ + private static ByteBuf writePayload( + final ByteBufAllocator alloc, + final long requestId, + final MessageType messageType, + final byte[] payload) { + + final int frameLength = HEADER_LENGTH + REQUEST_ID_SIZE + payload.length; + final ByteBuf buf = alloc.ioBuffer(frameLength + Integer.BYTES); + + buf.writeInt(frameLength); + writeHeader(buf, messageType); + buf.writeLong(requestId); + buf.writeBytes(payload); + return buf; + } + + // ------------------------------------------------------------------------ + // Deserialization + // ------------------------------------------------------------------------ + + /** + * De-serializes the header and returns the {@link MessageType}. + * <pre> + * <b>The buffer is expected to be at the header position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized header. + * @return The message type. + * @throws IllegalStateException If unexpected message version or message type. + */ + public static MessageType deserializeHeader(final ByteBuf buf) { + + // checking the version + int version = buf.readInt(); + Preconditions.checkState(version == VERSION, + "Version Mismatch: Found " + version + ", Expected: " + VERSION + '.'); + + // fetching the message type + int msgType = buf.readInt(); + MessageType[] values = MessageType.values(); + Preconditions.checkState(msgType >= 0 && msgType < values.length, + "Illegal message type with index " + msgType + '.'); + return values[msgType]; + } + + /** + * De-serializes the header and returns the {@link MessageType}. + * <pre> + * <b>The buffer is expected to be at the request id position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized request id. + * @return The request id. + */ + public static long getRequestId(final ByteBuf buf) { + return buf.readLong(); + } + + /** + * De-serializes the request sent to the + * {@link org.apache.flink.queryablestate.network.AbstractServerBase}. + * <pre> + * <b>The buffer is expected to be at the request position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized request. + * @return The request. + */ + public REQ deserializeRequest(final ByteBuf buf) { + Preconditions.checkNotNull(buf); + return requestDeserializer.deserializeMessage(buf); + } + + /** + * De-serializes the response sent to the + * {@link org.apache.flink.queryablestate.network.Client}. + * <pre> + * <b>The buffer is expected to be at the response position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized response. + * @return The response. + */ + public RESP deserializeResponse(final ByteBuf buf) { + Preconditions.checkNotNull(buf); + return responseDeserializer.deserializeMessage(buf); + } + + /** + * De-serializes the {@link RequestFailure} sent to the + * {@link org.apache.flink.queryablestate.network.Client} in case of + * protocol related errors. + * <pre> + * <b>The buffer is expected to be at the correct position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized failure message. + * @return The failure message. + */ + public static RequestFailure deserializeRequestFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { + long requestId = buf.readLong(); + + Throwable cause; + try (ByteBufInputStream bis = new ByteBufInputStream(buf); + ObjectInputStream in = new ObjectInputStream(bis)) { + cause = (Throwable) in.readObject(); + } + return new RequestFailure(requestId, cause); + } + + /** + * De-serializes the failure message sent to the + * {@link org.apache.flink.queryablestate.network.Client} in case of + * server related errors. + * <pre> + * <b>The buffer is expected to be at the correct position.</b> + * </pre> + * @param buf The {@link ByteBuf} containing the serialized failure message. + * @return The failure message. + */ + public static Throwable deserializeServerFailure(final ByteBuf buf) throws IOException, ClassNotFoundException { + try (ByteBufInputStream bis = new ByteBufInputStream(buf); + ObjectInputStream in = new ObjectInputStream(bis)) { + return (Throwable) in.readObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java new file mode 100644 index 0000000..562ce93 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +/** + * Expected message types during the communication between + * {@link org.apache.flink.queryablestate.network.Client client} and + * {@link org.apache.flink.queryablestate.network.AbstractServerBase server}. + */ +@Internal +public enum MessageType { + + /** The message is a request. */ + REQUEST, + + /** The message is a successful response. */ + REQUEST_RESULT, + + /** The message indicates a protocol-related failure. */ + REQUEST_FAILURE, + + /** The message indicates a server failure. */ + SERVER_FAILURE +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java new file mode 100644 index 0000000..106199f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/RequestFailure.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.messages; + +import org.apache.flink.annotation.Internal; + +/** + * A message indicating a protocol-related error. + */ +@Internal +public class RequestFailure { + + /** ID of the request responding to. */ + private final long requestId; + + /** Failure cause. Not allowed to be a user type. */ + private final Throwable cause; + + /** + * Creates a failure response to a {@link MessageBody}. + * + * @param requestId ID for the request responding to + * @param cause Failure cause (not allowed to be a user type) + */ + public RequestFailure(long requestId, Throwable cause) { + this.requestId = requestId; + this.cause = cause; + } + + /** + * Returns the request ID responding to. + * + * @return Request ID responding to + */ + public long getRequestId() { + return requestId; + } + + /** + * Returns the failure cause. + * + * @return Failure cause + */ + public Throwable getCause() { + return cause; + } + + @Override + public String toString() { + return "RequestFailure{" + + "requestId=" + requestId + + ", cause=" + cause + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java new file mode 100644 index 0000000..9ba5f84 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/AtomicKvStateRequestStats.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.stats; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Atomic {@link KvStateRequestStats} implementation. + */ +public class AtomicKvStateRequestStats implements KvStateRequestStats { + + /** + * Number of active connections. + */ + private final AtomicLong numConnections = new AtomicLong(); + + /** + * Total number of reported requests. + */ + private final AtomicLong numRequests = new AtomicLong(); + + /** + * Total number of successful requests (<= reported requests). + */ + private final AtomicLong numSuccessful = new AtomicLong(); + + /** + * Total duration of all successful requests. + */ + private final AtomicLong successfulDuration = new AtomicLong(); + + /** + * Total number of failed requests (<= reported requests). + */ + private final AtomicLong numFailed = new AtomicLong(); + + @Override + public void reportActiveConnection() { + numConnections.incrementAndGet(); + } + + @Override + public void reportInactiveConnection() { + numConnections.decrementAndGet(); + } + + @Override + public void reportRequest() { + numRequests.incrementAndGet(); + } + + @Override + public void reportSuccessfulRequest(long durationTotalMillis) { + numSuccessful.incrementAndGet(); + successfulDuration.addAndGet(durationTotalMillis); + } + + @Override + public void reportFailedRequest() { + numFailed.incrementAndGet(); + } + + public long getNumConnections() { + return numConnections.get(); + } + + public long getNumRequests() { + return numRequests.get(); + } + + public long getNumSuccessful() { + return numSuccessful.get(); + } + + public long getNumFailed() { + return numFailed.get(); + } + + @Override + public String toString() { + return "AtomicKvStateRequestStats{" + + "numConnections=" + numConnections + + ", numRequests=" + numRequests + + ", numSuccessful=" + numSuccessful + + ", numFailed=" + numFailed + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java new file mode 100644 index 0000000..b34ac3e --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.stats; + +/** + * Disabled {@link KvStateRequestStats} implementation. + */ +public class DisabledKvStateRequestStats implements KvStateRequestStats { + + @Override + public void reportActiveConnection() { + } + + @Override + public void reportInactiveConnection() { + } + + @Override + public void reportRequest() { + } + + @Override + public void reportSuccessfulRequest(long durationTotalMillis) { + } + + @Override + public void reportFailedRequest() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java new file mode 100644 index 0000000..8e9edd8 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/KvStateRequestStats.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.network.stats; + +/** + * Simple statistics for monitoring the state server + * and the client proxy. + */ +public interface KvStateRequestStats { + + /** + * Reports an active connection. + */ + void reportActiveConnection(); + + /** + * Reports an inactive connection. + */ + void reportInactiveConnection(); + + /** + * Reports an incoming request. + */ + void reportRequest(); + + /** + * Reports a successfully handled request. + * + * @param durationTotalMillis Duration of the request (in milliseconds). + */ + void reportSuccessfulRequest(long durationTotalMillis); + + /** + * Reports a failure during a request. + */ + void reportFailedRequest(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java new file mode 100644 index 0000000..ca11a32 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfoTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client; + +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** + * Test for {@link VoidNamespaceTypeInfo}. + */ +public class VoidNamespaceTypeInfoTest extends TypeInformationTestBase<VoidNamespaceTypeInfo> { + + @Override + protected VoidNamespaceTypeInfo[] getTestData() { + return new VoidNamespaceTypeInfo[] { VoidNamespaceTypeInfo.INSTANCE }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java new file mode 100644 index 0000000..ebbc896 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingStateTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableAggregatingStateTest}. + */ +public class ImmutableAggregatingStateTest { + + private final AggregatingStateDescriptor<Long, String, String> aggrStateDesc = + new AggregatingStateDescriptor<>( + "test", + new SumAggr(), + String.class); + + private ImmutableAggregatingState<Long, String> aggrState; + + @Before + public void setUp() throws Exception { + if (!aggrStateDesc.isSerializerInitialized()) { + aggrStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + final String initValue = "42"; + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + aggrStateDesc.getSerializer().serialize(initValue, new DataOutputViewStreamWrapper(out)); + + aggrState = ImmutableAggregatingState.createState( + aggrStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = aggrState.get(); + assertEquals("42", value); + + aggrState.clear(); + } + + /** + * Test {@link AggregateFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumAggr implements AggregateFunction<Long, String, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public String createAccumulator() { + return ""; + } + + @Override + public String add(Long value, String accumulator) { + accumulator += ", " + value; + return accumulator; + } + + @Override + public String getResult(String accumulator) { + return accumulator; + } + + @Override + public String merge(String a, String b) { + return a + ", " + b; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java new file mode 100644 index 0000000..9e8dfc9 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingStateTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableFoldingState}. + */ +public class ImmutableFoldingStateTest { + + private final FoldingStateDescriptor<Long, String> foldingStateDesc = + new FoldingStateDescriptor<>( + "test", + "0", + new SumFold(), + StringSerializer.INSTANCE); + + private ImmutableFoldingState<Long, String> foldingState; + + @Before + public void setUp() throws Exception { + if (!foldingStateDesc.isSerializerInitialized()) { + foldingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + StringSerializer.INSTANCE.serialize("42", new DataOutputViewStreamWrapper(out)); + + foldingState = ImmutableFoldingState.createState( + foldingStateDesc, + out.toByteArray() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + String value = foldingState.get(); + assertEquals("42", value); + + foldingState.clear(); + } + + /** + * Test {@link FoldFunction} concatenating the already stored string with the long passed as argument. + */ + private static class SumFold implements FoldFunction<Long, String> { + + private static final long serialVersionUID = -6249227626701264599L; + + @Override + public String fold(String accumulator, Long value) throws Exception { + long acc = Long.valueOf(accumulator); + acc += value; + return Long.toString(acc); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java new file mode 100644 index 0000000..a78ed1f --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableListStateTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableListState}. + */ +public class ImmutableListStateTest { + + private final ListStateDescriptor<Long> listStateDesc = + new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableListState<Long> listState; + + @Before + public void setUp() throws Exception { + if (!listStateDesc.isSerializerInitialized()) { + listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + List<Long> init = new ArrayList<>(); + init.add(42L); + + byte[] serInit = serializeInitValue(init); + listState = ImmutableListState.createState(listStateDesc, serInit); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + List<Long> list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + List<Long> list = getStateContents(); + assertEquals(1L, list.size()); + + long element = list.get(0); + assertEquals(42L, element); + + listState.clear(); + } + + /** + * Copied from HeapListState.getSerializedValue(Object, Object). + */ + private byte[] serializeInitValue(List<Long> toSerialize) throws IOException { + TypeSerializer<Long> serializer = listStateDesc.getElementSerializer(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); + + // write the same as RocksDB writes lists, with one ',' separator + for (int i = 0; i < toSerialize.size(); i++) { + serializer.serialize(toSerialize.get(i), view); + if (i < toSerialize.size() - 1) { + view.writeByte(','); + } + } + view.flush(); + + return baos.toByteArray(); + } + + private List<Long> getStateContents() { + List<Long> list = new ArrayList<>(); + for (Long elem: listState.get()) { + list.add(elem); + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java new file mode 100644 index 0000000..ffeabae --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; + +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link ImmutableMapState}. + */ +public class ImmutableMapStateTest { + + private final MapStateDescriptor<Long, Long> mapStateDesc = + new MapStateDescriptor<>( + "test", + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableMapState<Long, Long> mapState; + + @Before + public void setUp() throws Exception { + if (!mapStateDesc.isSerializerInitialized()) { + mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + Map<Long, Long> initMap = new HashMap<>(); + initMap.put(1L, 5L); + initMap.put(2L, 5L); + + byte[] initSer = KvStateSerializer.serializeMap( + initMap.entrySet(), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()), + BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + + mapState = ImmutableMapState.createState(mapStateDesc, initSer); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPut() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testPutAll() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Map<Long, Long> nMap = new HashMap<>(); + nMap.put(1L, 7L); + nMap.put(2L, 7L); + + mapState.putAll(nMap); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.put(2L, 54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterator() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testIterable() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterable<Map.Entry<Long, Long>> iterable = mapState.entries(); + Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator(); + while (iterator.hasNext()) { + assertEquals(5L, (long) iterator.next().getValue()); + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testKeys() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Long> iterator = mapState.keys().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testValues() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + Iterator<Long> iterator = mapState.values().iterator(); + while (iterator.hasNext()) { + iterator.remove(); + } + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + assertTrue(mapState.contains(1L)); + long value = mapState.get(1L); + assertEquals(5L, value); + + assertTrue(mapState.contains(2L)); + value = mapState.get(2L); + assertEquals(5L, value); + + mapState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java new file mode 100644 index 0000000..9694f55 --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableReducingStateTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableReducingState}. + */ +public class ImmutableReducingStateTest { + + private final ReducingStateDescriptor<Long> reducingStateDesc = + new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableReducingState<Long> reduceState; + + @Before + public void setUp() throws Exception { + if (!reducingStateDesc.isSerializerInitialized()) { + reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + reduceState = ImmutableReducingState.createState( + reducingStateDesc, + ByteBuffer.allocate(Long.BYTES).putLong(42L).array() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.add(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + long value = reduceState.get(); + assertEquals(42L, value); + + reduceState.clear(); + } + + /** + * Test {@link ReduceFunction} summing up its two arguments. + */ + private static class SumReduce implements ReduceFunction<Long> { + + private static final long serialVersionUID = 6041237513913189144L; + + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java new file mode 100644 index 0000000..a0da43d --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableValueStateTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.queryablestate.client.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; + +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests the {@link ImmutableValueState}. + */ +public class ImmutableValueStateTest { + + private final ValueStateDescriptor<Long> valueStateDesc = + new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO); + + private ImmutableValueState<Long> valueState; + + @Before + public void setUp() throws Exception { + if (!valueStateDesc.isSerializerInitialized()) { + valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig()); + } + + valueState = ImmutableValueState.createState( + valueStateDesc, + ByteBuffer.allocate(Long.BYTES).putLong(42L).array() + ); + } + + @Test(expected = UnsupportedOperationException.class) + public void testUpdate() { + long value = valueState.value(); + assertEquals(42L, value); + + valueState.update(54L); + } + + @Test(expected = UnsupportedOperationException.class) + public void testClear() { + long value = valueState.value(); + assertEquals(42L, value); + + valueState.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..10792cd --- /dev/null +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/resources/log4j-test.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR +log4j.logger.org.apache.zookeeper=OFF http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/pom.xml b/flink-queryable-state/flink-queryable-state-java/pom.xml deleted file mode 100644 index e60c6f3..0000000 --- a/flink-queryable-state/flink-queryable-state-java/pom.xml +++ /dev/null @@ -1,137 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-queryable-state</artifactId> - <version>1.4-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-queryable-state-java_${scala.binary.version}</artifactId> - <name>flink-queryable-state-java</name> - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <!-- =================================================== - Testing - =================================================== --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils-junit</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-test</artifactId> - <version>${curator.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java deleted file mode 100644 index fa2604b..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate; - -import org.apache.flink.annotation.Internal; - -/** - * Exception to fail Future if the Task Manager on which the - * {@link org.apache.flink.runtime.query.KvStateClientProxy} - * is running on, does not know the active Job Manager. - */ -@Internal -public class UnknownJobManagerException extends Exception { - - private static final long serialVersionUID = 9092442511708951209L; - - public UnknownJobManagerException() { - super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java deleted file mode 100644 index c497a72..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.BadRequestException; - -/** - * Thrown if the KvState does not hold any state for the given key or namespace. - */ -@Internal -public class UnknownKeyOrNamespaceException extends BadRequestException { - - private static final long serialVersionUID = 1L; - - /** - * Creates the exception. - * @param serverName the name of the server that threw the exception. - */ - public UnknownKeyOrNamespaceException(String serverName) { - super(serverName, "No state for the specified key/namespace."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java deleted file mode 100644 index 59ba081..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.BadRequestException; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.util.Preconditions; - -/** - * Thrown if no KvState with the given ID cannot found by the server handler. - */ -@Internal -public class UnknownKvStateIdException extends BadRequestException { - - private static final long serialVersionUID = 1L; - - /** - * Creates the exception. - * @param serverName the name of the server that threw the exception. - * @param kvStateId the state id for which no state was found. - */ - public UnknownKvStateIdException(String serverName, KvStateID kvStateId) { - super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.'); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java deleted file mode 100644 index 0d6588a..0000000 --- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.queryablestate; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.queryablestate.network.BadRequestException; -import org.apache.flink.runtime.query.KvStateLocation; - -/** - * Exception thrown if there is no location information available for the given - * key group in a {@link KvStateLocation} instance. - */ -@Internal -public class UnknownKvStateKeyGroupLocationException extends BadRequestException { - - private static final long serialVersionUID = 1L; - - /** - * Creates the exception. - * @param serverName the name of the server that threw the exception. - */ - public UnknownKvStateKeyGroupLocationException(String serverName) { - super(serverName, "Unknown key-group location."); - } -}
