http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java new file mode 100644 index 0000000..1d7c94c --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.ByteArrayOutputStream; +import org.apache.nifi.io.DataOutputStream; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { + + private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Server Hostname") + .description("The name of the server that is running the DistributedSetCacheServer service") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Server Port") + .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .defaultValue(null) + .build(); + public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description( + "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); + private volatile ConfigurationContext configContext; + private volatile boolean closed = false; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(COMMUNICATIONS_TIMEOUT); + return descriptors; + } + + @OnConfigured + public void onConfigured(final ConfigurationContext context) { + this.configContext = context; + } + + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { + final String hostname = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + final CommsSession commsSession; + if (sslContextService == null) { + commsSession = new StandardCommsSession(hostname, port); + } else { + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + } + + commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return commsSession; + } + + private CommsSession leaseCommsSession() throws IOException { + CommsSession session = queue.poll(); + if (session != null && !session.isClosed()) { + return session; + } + + session = createCommsSession(configContext); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + try { + ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); + } catch (final HandshakeException e) { + try { + session.close(); + } catch (final IOException ioe) { + } + + throw new IOException(e); + } + + return session; + } + + @Override + public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("addIfAbsent", value, serializer); + } + + @Override + public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("contains", value, serializer); + } + + @Override + public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("remove", value, serializer); + } + + @Override + public void close() throws IOException { + this.closed = true; + + CommsSession commsSession; + while ((commsSession = queue.poll()) != null) { + try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { + dos.writeUTF("close"); + dos.flush(); + commsSession.close(); + } catch (final IOException e) { + } + } + logger.info("Closed {}", new Object[] { getIdentifier() }); + } + + @Override + protected void finalize() throws Throwable { + if (!closed) + close(); + logger.debug("Finalize called"); + } + + private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException { + if (closed) { + throw new IllegalStateException("Client is closed"); + } + + final CommsSession session = leaseCommsSession(); + try { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF(methodName); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + dos.writeInt(baos.size()); + baos.writeTo(dos); + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } catch (final IOException ioe) { + try { + session.close(); + } catch (final IOException ignored) { + } + + throw ioe; + } finally { + if (!session.isClosed()) { + if (this.closed) { + try { + session.close(); + } catch (final IOException ioe) { + } + } else { + queue.offer(session); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java new file mode 100644 index 0000000..c8be082 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; + +public class SSLCommsSession implements CommsSession { + private final SSLSocketChannel sslSocketChannel; + private final SSLContext sslContext; + private final String hostname; + private final int port; + + private final SSLSocketChannelInputStream in; + private final BufferedInputStream bufferedIn; + + private final SSLSocketChannelOutputStream out; + private final BufferedOutputStream bufferedOut; + + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { + sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + + in = new SSLSocketChannelInputStream(sslSocketChannel); + bufferedIn = new BufferedInputStream(in); + + out = new SSLSocketChannelOutputStream(sslSocketChannel); + bufferedOut = new BufferedOutputStream(out); + + this.sslContext = sslContext; + this.hostname = hostname; + this.port = port; + } + + @Override + public void interrupt() { + sslSocketChannel.interrupt(); + } + + @Override + public void close() throws IOException { + sslSocketChannel.close(); + } + + @Override + public void setTimeout(final long value, final TimeUnit timeUnit) { + sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return bufferedOut; + } + + @Override + public boolean isClosed() { + return sslSocketChannel.isClosed(); + } + + @Override + public String getHostname() { + return hostname; + } + + @Override + public int getPort() { + return port; + } + @Override + public SSLContext getSSLContext() { + return sslContext; + } + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java new file mode 100644 index 0000000..bbe2917 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.io.InterruptableInputStream; +import org.apache.nifi.remote.io.InterruptableOutputStream; +import org.apache.nifi.remote.io.socket.SocketChannelInputStream; +import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; + +public class StandardCommsSession implements CommsSession { + private final SocketChannel socketChannel; + private final String hostname; + private final int port; + private volatile long timeoutMillis; + + private final SocketChannelInputStream in; + private final InterruptableInputStream bufferedIn; + + private final SocketChannelOutputStream out; + private final InterruptableOutputStream bufferedOut; + + public StandardCommsSession(final String hostname, final int port) throws IOException { + socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + socketChannel.configureBlocking(false); + in = new SocketChannelInputStream(socketChannel); + bufferedIn = new InterruptableInputStream(new BufferedInputStream(in)); + + out = new SocketChannelOutputStream(socketChannel); + bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out)); + + this.hostname = hostname; + this.port = port; + } + + @Override + public void interrupt() { + bufferedIn.interrupt(); + bufferedOut.interrupt(); + } + + @Override + public void close() throws IOException { + socketChannel.close(); + } + + @Override + public void setTimeout(final long value, final TimeUnit timeUnit) { + in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return bufferedOut; + } + + @Override + public boolean isClosed() { + boolean closed = !socketChannel.isConnected(); + if (!closed) { + try { + this.in.isDataAvailable(); + } catch (IOException e) { + try { + close(); + } catch (IOException e1) { + } + closed = true; + } + } + return closed; + } + + @Override + public String getHostname() { + return hostname; + } + + @Override + public int getPort() { + return port; + } + + @Override + public SSLContext getSSLContext() { + return null; + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..a91f7ee --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService +org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html new file mode 100644 index 0000000..d5f3595 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html @@ -0,0 +1,78 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> +<meta charset="utf-8" /> +<title>Distributed Map Cache Client Service</title> +<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> + <h2>Description:</h2> + + <p>A Controller Service that can be used to communicate with a + Distributed Map Cache Server.</p> + + + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + + <ul> + <li><strong>Server Hostname</strong> + <ul> + <li>The name of the server that is running the DistributedMapCacheServer service</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Server Port</strong> + <ul> + <li>The port on the remote server that is to be used when communicating with the + <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li> + + <li>Default value: 4557</li> + <li>Supports expression language: false</li> + </ul></li> + <li>SSL Context Service + <ul> + <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Communications Timeout</strong> + <ul> + <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received + <li>Default value: 30 secs</li> + <li>Supports expression language: false</li> + </ul></li> + + </ul> + + + <i>See Also:</i> + <ul> + <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li> + <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li> + </ul> + +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html new file mode 100755 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml new file mode 100644 index 0000000..bc612ae --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-protocol</artifactId> + <name>Distributed Cache Protocol</name> + + <description> + Defines the communications protocol that is used between clients and servers + for the Distributed Cache services + </description> + + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java new file mode 100644 index 0000000..da2acad --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.protocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.remote.VersionNegotiator; + +public class ProtocolHandshake { + + public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' }; + + public static final int RESOURCE_OK = 20; + public static final int DIFFERENT_RESOURCE_VERSION = 21; + public static final int ABORT = 255; + + + public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + try { + dos.write(MAGIC_HEADER); + + initiateVersionNegotiation(versionNegotiator, dis, dos); + } finally { + dos.flush(); + } + } + + + public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + try { + final byte[] magicHeaderBuffer = new byte[4]; + dis.readFully(magicHeaderBuffer); + + receiveVersionNegotiation(versionNegotiator, dis, dos); + } finally { + dos.flush(); + } + } + + + private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + // Write the classname of the RemoteStreamCodec, followed by its version + dos.writeInt(negotiator.getVersion()); + dos.flush(); + + // wait for response from server. + final int statusCode = dis.read(); + switch (statusCode) { + case RESOURCE_OK: // server accepted our proposal of codec name/version + return; + case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version + // Get server's preferred version + final int newVersion = dis.readInt(); + + // Determine our new preferred version that is no greater than the server's preferred version. + final Integer newPreference = negotiator.getPreferredVersion(newVersion); + // If we could not agree with server on a version, fail now. + if ( newPreference == null ) { + throw new HandshakeException("Could not agree on protocol version"); + } + + negotiator.setVersion(newPreference); + + // Attempt negotiation of resource based on our new preferred version. + initiateVersionNegotiation(negotiator, dis, dos); + case ABORT: + throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); + default: + throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server"); + } + } + + private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + final int version = dis.readInt(); + if ( negotiator.isVersionSupported(version) ) { + dos.write(RESOURCE_OK); + dos.flush(); + + negotiator.setVersion(version); + } else { + final Integer preferred = negotiator.getPreferredVersion(version); + if ( preferred == null ) { + dos.write(ABORT); + dos.flush(); + throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol"); + } + dos.write(DIFFERENT_RESOURCE_VERSION); + dos.writeInt(preferred); + dos.flush(); + + receiveVersionNegotiation(negotiator, dis, dos); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java new file mode 100644 index 0000000..8049d42 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.protocol.exception; + +public class HandshakeException extends Exception { + public HandshakeException(final String message) { + super(message); + } + + public HandshakeException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml new file mode 100644 index 0000000..5dec322 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/pom.xml @@ -0,0 +1,81 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-server</artifactId> + + <name>Distributed Cache Server</name> + <description>Provides a Controller Service for hosting Distributed Caches</description> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-protocol</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>wali</groupId> + <artifactId>wali</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java new file mode 100644 index 0000000..9b4e70e --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.io.socket.SocketChannelInputStream; +import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractCacheServer implements CacheServer { + + private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class); + + private final String identifier; + private final int port; + private final SSLContext sslContext; + protected volatile boolean stopped = false; + private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();; + + private volatile ServerSocketChannel serverSocketChannel; + + public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) { + this.identifier = identifier; + this.port = port; + this.sslContext = sslContext; + } + + @Override + public void start() throws IOException { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(true); + serverSocketChannel.bind(new InetSocketAddress(port)); + + final Runnable runnable = new Runnable() { + + @Override + public void run() { + while (true) { + final SocketChannel socketChannel; + try { + socketChannel = serverSocketChannel.accept(); + logger.debug("Connected to {}", new Object[] { socketChannel }); + } catch (final IOException e) { + if (!stopped) { + logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + return; + } + + final Runnable processInputRunnable = new Runnable() { + @Override + public void run() { + final InputStream rawInputStream; + final OutputStream rawOutputStream; + final String peer = socketChannel.socket().getInetAddress().getHostName(); + + try { + if (sslContext == null) { + rawInputStream = new SocketChannelInputStream(socketChannel); + rawOutputStream = new SocketChannelOutputStream(socketChannel); + } else { + final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false); + sslSocketChannel.connect(); + rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel); + rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); + } + } catch (IOException e) { + logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + try { + socketChannel.close(); + } catch (IOException swallow) { + } + + return; + } + try (final InputStream in = new BufferedInputStream(rawInputStream); + final OutputStream out = new BufferedOutputStream(rawOutputStream)) { + + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + + ProtocolHandshake.receiveHandshake(in, out, versionNegotiator); + + boolean continueComms = true; + while (continueComms) { + continueComms = listen(in, out, versionNegotiator.getVersion()); + } + // client has issued 'close' + logger.debug("Client issued close on {}", new Object[] { socketChannel }); + } catch (final SocketTimeoutException e) { + logger.debug("30 sec timeout reached", e); + } catch (final IOException | HandshakeException e) { + if (!stopped) { + logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } finally { + processInputThreads.remove(Thread.currentThread()); + } + } + }; + + final Thread processInputThread = new Thread(processInputRunnable); + processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier); + processInputThread.setDaemon(true); + processInputThread.start(); + processInputThreads.add(processInputThread); + } + } + }; + + final Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName("Distributed Cache Server: " + identifier); + thread.start(); + } + + @Override + public void stop() throws IOException { + stopped = true; + logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); + + if (serverSocketChannel != null) { + serverSocketChannel.close(); + } + // need to close out the created SocketChannels...this is done by interrupting + // the created threads that loop on listen(). + for (Thread processInputThread : processInputThreads) { + processInputThread.interrupt(); + int i = 0; + while (!processInputThread.isInterrupted() && i++ < 5) { + try { + Thread.sleep(50); // allow thread to gracefully terminate + } catch (InterruptedException e) { + } + } + } + processInputThreads.clear(); + } + + @Override + public String toString() { + return "CacheServer[id=" + identifier + "]"; + } + + /** + * Listens for incoming data and communicates with remote peer + * + * @param in + * @param out + * @param version + * @return <code>true</code> if communications should continue, <code>false</code> otherwise + * @throws IOException + */ + protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java new file mode 100644 index 0000000..71ac56d --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class CacheRecord { + + private static final AtomicLong idGenerator = new AtomicLong(0L); + + private final long id; + private final long entryDate; + private volatile long lastHitDate; + private final AtomicInteger hitCount = new AtomicInteger(0); + + public CacheRecord() { + entryDate = System.currentTimeMillis(); + lastHitDate = entryDate; + id = idGenerator.getAndIncrement(); + } + + public long getEntryDate() { + return entryDate; + } + + public long getLastHitDate() { + return lastHitDate; + } + + public int getHitCount() { + return hitCount.get(); + } + + public void hit() { + hitCount.getAndIncrement(); + lastHitDate = System.currentTimeMillis(); + } + + public long getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java new file mode 100644 index 0000000..2c85cd8 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; + +public interface CacheServer { + + void start() throws IOException; + void stop() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java new file mode 100644 index 0000000..0f962d0 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.processor.annotation.OnShutdown; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +public abstract class DistributedCacheServer extends AbstractControllerService { + public static final String EVICTION_STRATEGY_LFU = "Least Frequently Used"; + public static final String EVICTION_STRATEGY_LRU = "Least Recently Used"; + public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; + + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Port") + .description("The port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, this service will be used to create an SSL Context that will be used to secure communications; if not specified, communications will not be secure") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .build(); + public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() + .name("Maximum Cache Entries") + .description("The maximum number of cache entries that the cache can hold") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() + .name("Eviction Strategy") + .description("Determines which strategy should be used to evict values from the cache to make room for new entries") + .required(true) + .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) + .defaultValue(EVICTION_STRATEGY_LFU) + .build(); + public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() + .name("Persistence Directory") + .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") + .required(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .build(); + + private volatile CacheServer cacheServer; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PORT); + properties.add(MAX_CACHE_ENTRIES); + properties.add(EVICTION_POLICY); + properties.add(PERSISTENCE_PATH); + properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(SSL_CONTEXT_SERVICE).allowableValues( + getControllerServiceLookup().getControllerServiceIdentifiers(SSLContextService.class)).build()); + return properties; + } + + @OnConfigured + public void startServer(final ConfigurationContext context) throws IOException { + if (cacheServer == null) { + cacheServer = createCacheServer(context); + cacheServer.start(); + } + } + + @OnShutdown + public void shutdownServer() throws IOException { + if (cacheServer != null) { + cacheServer.stop(); + } + cacheServer = null; + } + + @Override + protected void finalize() throws Throwable { + shutdownServer(); + } + + protected abstract CacheServer createCacheServer(ConfigurationContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java new file mode 100644 index 0000000..426573f --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.File; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +public class DistributedSetCacheServer extends DistributedCacheServer { + + @Override + protected CacheServer createCacheServer(final ConfigurationContext context) { + final int port = context.getProperty(PORT).asInteger(); + final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); + final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); + + final SSLContext sslContext; + if ( sslContextService == null ) { + sslContext = null; + } else { + sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); + } + + final EvictionPolicy evictionPolicy; + switch (evictionPolicyName) { + case EVICTION_STRATEGY_FIFO: + evictionPolicy = EvictionPolicy.FIFO; + break; + case EVICTION_STRATEGY_LFU: + evictionPolicy = EvictionPolicy.LFU; + break; + case EVICTION_STRATEGY_LRU: + evictionPolicy = EvictionPolicy.LRU; + break; + default: + throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); + } + + try { + final File persistenceDir = persistencePath == null ? null : new File(persistencePath); + + return new SetCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java new file mode 100644 index 0000000..60bd2c1 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/EvictionPolicy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.util.Comparator; + +public enum EvictionPolicy { + LFU(new LFUComparator()), + LRU(new LRUComparator()), + FIFO(new FIFOComparator()); + + private final Comparator<CacheRecord> comparator; + + private EvictionPolicy(final Comparator<CacheRecord> comparator) { + this.comparator = comparator; + } + + public Comparator<CacheRecord> getComparator() { + return comparator; + } + + public static class LFUComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int hitCountComparison = Integer.compare(o1.getHitCount(), o2.getHitCount()); + final int entryDateComparison = (hitCountComparison == 0) ? Long.compare(o1.getEntryDate(), o2.getEntryDate()) : hitCountComparison; + return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); + } + } + + public static class LRUComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int lastHitDateComparison = Long.compare(o1.getLastHitDate(), o2.getLastHitDate()); + return (lastHitDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : lastHitDateComparison); + } + } + + public static class FIFOComparator implements Comparator<CacheRecord> { + @Override + public int compare(final CacheRecord o1, final CacheRecord o2) { + if ( o1.equals(o2) ) { + return 0; + } + + final int entryDateComparison = Long.compare(o1.getEntryDate(), o2.getEntryDate()); + return (entryDateComparison == 0 ? Long.compare(o1.getId(), o2.getId()) : entryDateComparison); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java new file mode 100644 index 0000000..5d2c0f6 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.server.set.PersistentSetCache; +import org.apache.nifi.distributed.cache.server.set.SetCache; +import org.apache.nifi.distributed.cache.server.set.SetCacheResult; +import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; +import org.apache.nifi.io.DataOutputStream; + +public class SetCacheServer extends AbstractCacheServer { + + private final SetCache cache; + + public SetCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, + final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { + super(identifier, sslContext, port); + + final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, evictionPolicy); + + if (persistencePath == null) { + this.cache = simpleCache; + } else { + final PersistentSetCache persistentCache = new PersistentSetCache(identifier, persistencePath, simpleCache); + persistentCache.restore(); + this.cache = persistentCache; + } + } + + @Override + protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + final String action = dis.readUTF(); + if (action.equals("close")) { + return false; + } + + final int valueLength = dis.readInt(); + final byte[] value = new byte[valueLength]; + dis.readFully(value); + final ByteBuffer valueBuffer = ByteBuffer.wrap(value); + + final SetCacheResult response; + switch (action) { + case "addIfAbsent": + response = cache.addIfAbsent(valueBuffer); + break; + case "contains": + response = cache.contains(valueBuffer); + break; + case "remove": + response = cache.remove(valueBuffer); + break; + default: + throw new IOException("IllegalRequest"); + } + + dos.writeBoolean(response.getResult()); + dos.flush(); + + return true; + } + + @Override + public void stop() throws IOException { + try { + super.stop(); + } finally { + cache.shutdown(); + } + } + + @Override + protected void finalize() throws Throwable { + if (!stopped) + stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java new file mode 100644 index 0000000..920529d --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.File; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.server.CacheServer; +import org.apache.nifi.distributed.cache.server.DistributedCacheServer; +import org.apache.nifi.distributed.cache.server.EvictionPolicy; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +public class DistributedMapCacheServer extends DistributedCacheServer { + + @Override + protected CacheServer createCacheServer(final ConfigurationContext context) { + final int port = context.getProperty(PORT).asInteger(); + final String persistencePath = context.getProperty(PERSISTENCE_PATH).getValue(); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger(); + final String evictionPolicyName = context.getProperty(EVICTION_POLICY).getValue(); + + final SSLContext sslContext; + if ( sslContextService == null ) { + sslContext = null; + } else { + sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); + } + + final EvictionPolicy evictionPolicy; + switch (evictionPolicyName) { + case EVICTION_STRATEGY_FIFO: + evictionPolicy = EvictionPolicy.FIFO; + break; + case EVICTION_STRATEGY_LFU: + evictionPolicy = EvictionPolicy.LFU; + break; + case EVICTION_STRATEGY_LRU: + evictionPolicy = EvictionPolicy.LRU; + break; + default: + throw new IllegalArgumentException("Illegal Eviction Policy: " + evictionPolicyName); + } + + try { + final File persistenceDir = persistencePath == null ? null : new File(persistencePath); + + return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java new file mode 100644 index 0000000..534cb0b --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface MapCache { + + MapPutResult putIfAbsent(ByteBuffer key, ByteBuffer value) throws IOException; + boolean containsKey(ByteBuffer key) throws IOException; + ByteBuffer get(ByteBuffer key) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; + void shutdown() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java new file mode 100644 index 0000000..b0ab0c4 --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.nio.ByteBuffer; + +import org.apache.nifi.distributed.cache.server.CacheRecord; + +public class MapCacheRecord extends CacheRecord { + private final ByteBuffer key; + private final ByteBuffer value; + + public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { + this.key = key; + this.value = value; + } + + public ByteBuffer getKey() { + return key; + } + + public ByteBuffer getValue() { + return value; + } + + @Override + public int hashCode() { + return 2938476 + key.hashCode() * value.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if ( obj == this ) { + return true; + } + + if ( obj instanceof MapCacheRecord ) { + final MapCacheRecord that = ((MapCacheRecord) obj); + return key.equals(that.key) && value.equals(that.value); + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java new file mode 100644 index 0000000..3e8dd0e --- /dev/null +++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.server.AbstractCacheServer; +import org.apache.nifi.distributed.cache.server.EvictionPolicy; +import org.apache.nifi.io.DataOutputStream; + +public class MapCacheServer extends AbstractCacheServer { + + private final MapCache cache; + + public MapCacheServer(final String identifier, final SSLContext sslContext, final int port, final int maxSize, + final EvictionPolicy evictionPolicy, final File persistencePath) throws IOException { + super(identifier, sslContext, port); + + final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, evictionPolicy); + + if (persistencePath == null) { + this.cache = simpleCache; + } else { + final PersistentMapCache persistentCache = new PersistentMapCache(identifier, persistencePath, simpleCache); + persistentCache.restore(); + this.cache = persistentCache; + } + } + + @Override + protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + final String action = dis.readUTF(); + try { + switch (action) { + case "close": { + return false; + } + case "putIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + dos.writeBoolean(putResult.isSuccessful()); + break; + } + case "containsKey": { + final byte[] key = readValue(dis); + final boolean contains = cache.containsKey(ByteBuffer.wrap(key)); + dos.writeBoolean(contains); + break; + } + case "getAndPutIfAbsent": { + final byte[] key = readValue(dis); + final byte[] value = readValue(dis); + + final MapPutResult putResult = cache.putIfAbsent(ByteBuffer.wrap(key), ByteBuffer.wrap(value)); + if (putResult.isSuccessful()) { + // Put was successful. There was no old value to get. + dos.writeInt(0); + } else { + // we didn't put. Write back the previous value + final byte[] byteArray = putResult.getExistingValue().array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "get": { + final byte[] key = readValue(dis); + final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); + if (existingValue == null) { + // there was no existing value; we did a "put". + dos.writeInt(0); + } else { + // a value already existed. we did not update the map + final byte[] byteArray = existingValue.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + + break; + } + case "remove": { + final byte[] key = readValue(dis); + final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null; + dos.writeBoolean(removed); + break; + } + default: { + throw new IOException("Illegal Request"); + } + } + } finally { + dos.flush(); + } + + return true; + } + + @Override + public void stop() throws IOException { + try { + super.stop(); + } finally { + cache.shutdown(); + } + } + + @Override + protected void finalize() throws Throwable { + if (!stopped) + stop(); + } + + private byte[] readValue(final DataInputStream dis) throws IOException { + final int numBytes = dis.readInt(); + final byte[] buffer = new byte[numBytes]; + dis.readFully(buffer); + return buffer; + } + +}