http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java new file mode 100644 index 0000000..5bead8c --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; + +import org.apache.nifi.controller.ControllerService; + +/** + * This interface defines an API that can be used for interacting with a + * Distributed Cache that functions similarly to a {@link java.util.Map Map}. + * + */ +public interface DistributedMapCacheClient extends ControllerService { + + /** + * Adds the specified key and value to the cache, if they are not already + * present, serializing the key and value with the given + * {@link Serializer}s. + * + * @param <K> + * @param <V> + * @param key the key for into the map + * @param value the value to add to the map if and only if the key is absent + * @param keySerializer + * @param valueSerializer + * @return true if the value was added to the cache, false if the value + * already existed in the cache + * + * @throws IOException if unable to communicate with the remote instance + */ + <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException; + + /** + * Adds the specified key and value to the cache, if they are not already + * present, serializing the key and value with the given + * {@link Serializer}s. If a value already exists in the cache for the given + * key, the value associated with the key is returned, after being + * deserialized with the given valueDeserializer. + * + * @param <K> + * @param <V> + * @param key + * @param value + * @param keySerializer + * @param valueSerializer + * @param valueDeserializer + * @return + * @throws IOException + */ + <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException; + + /** + * Determines if the given value is present in the cache and if so returns + * <code>true</code>, else returns <code>false</code> + * + * @param <K> + * @param key + * @param keySerializer + * @return + * + * @throws IOException if unable to communicate with the remote instance + */ + <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException; + + /** + * Returns the value in the cache for the given key, if one exists; + * otherwise returns <code>null</code> + * + * @param <K> + * @param <V> + * @param key the key to lookup in the map + * @param keySerializer + * @param valueDeserializer + * + * @return + * @throws IOException + */ + <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException; + + /** + * Attempts to notify the server that we are finished communicating with it + * and cleans up resources + * @throws java.io.IOException + */ + void close() throws IOException; + + /** + * Removes the entry with the given key from the cache, if it is present. + * + * @param <K> + * @param key + * @param serializer + * @return <code>true</code> if the entry is removed, <code>false</code> if + * the key did not exist in the cache + * @throws IOException + */ + <K> boolean remove(K key, Serializer<K> serializer) throws IOException; + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java new file mode 100644 index 0000000..12aae3e --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClient.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; + +import org.apache.nifi.controller.ControllerService; + +/** + * This interface defines an API that can be used for interacting with a + * Distributed Cache that functions similarly to a {@link java.util.Set Set}. + */ +public interface DistributedSetCacheClient extends ControllerService { + + /** + * Adds the specified value to the cache, serializing the value with the + * given {@link Serializer}. + * + * @param <T> + * @param value + * @param serializer + * @return true if the value was added to the cache, false if the value + * already existed in the cache + * + * @throws IOException if unable to communicate with the remote instance + */ + <T> boolean addIfAbsent(T value, Serializer<T> serializer) throws IOException; + + /** + * Returns if the given value is present in the cache and if so returns + * <code>true</code>, else returns <code>false</code> + * + * @param <T> + * @param value + * @param serializer + * @return + * + * @throws IOException if unable to communicate with the remote instance + */ + <T> boolean contains(T value, Serializer<T> serializer) throws IOException; + + /** + * Removes the given value from the cache, if it is present. + * + * @param <T> + * @param value + * @param serializer + * @return <code>true</code> if the value is removed, <code>false</code> if + * the value did not exist in the cache + * @throws IOException + */ + <T> boolean remove(T value, Serializer<T> serializer) throws IOException; + + /** + * Attempts to notify the server that we are finished communicating with it + * and cleans up resources + * @throws java.io.IOException + */ + void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java new file mode 100644 index 0000000..f1896be --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/Serializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +/** + * Provides a mechanism by which a value can be serialized to a stream of bytes + * @param <T> + */ +public interface Serializer<T> { + + /** + * Serializes the given value to the {@link OutputStream} + * + * @param value + * @param output + * @throws SerializationException If unable to serialize the given value + * @throws java.io.IOException + */ + void serialize(T value, OutputStream output) throws SerializationException, IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java new file mode 100644 index 0000000..bb2fcb2 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/DeserializationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client.exception; + +public class DeserializationException extends RuntimeException { + + public DeserializationException(final Throwable cause) { + super(cause); + } + + public DeserializationException(final String message) { + super(message); + } + + public DeserializationException(final String message, final Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java new file mode 100644 index 0000000..aac59f5 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/exception/SerializationException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client.exception; + +public class SerializationException extends RuntimeException { + + public SerializationException(final Throwable cause) { + super(cause); + } + + public SerializationException(final String message) { + super(message); + } + + public SerializationException(final String message, final Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml new file mode 100644 index 0000000..a251393 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml @@ -0,0 +1,60 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-client-service</artifactId> + <packaging>jar</packaging> + + <name>Distributed Cache Client Service</name> + <description>Provides a Client for interfacing with a Distributed Cache</description> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java new file mode 100644 index 0000000..f838c2f --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +public interface CommsSession extends Closeable { + + void setTimeout(final long value, final TimeUnit timeUnit); + + InputStream getInputStream() throws IOException; + + OutputStream getOutputStream() throws IOException; + + boolean isClosed(); + + void interrupt(); + + String getHostname(); + + int getPort(); + + long getTimeout(TimeUnit timeUnit); + + SSLContext getSSLContext(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java new file mode 100644 index 0000000..ee96660 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.ByteArrayOutputStream; +import org.apache.nifi.io.DataOutputStream; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { + + private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Server Hostname") + .description("The name of the server that is running the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Server Port") + .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .defaultValue(null) + .build(); + public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description( + "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); + private volatile ConfigurationContext configContext; + private volatile boolean closed = false; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(COMMUNICATIONS_TIMEOUT); + return descriptors; + } + + @OnConfigured + public void cacheConfig(final ConfigurationContext context) { + this.configContext = context; + } + + @Override + public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) + throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("putIfAbsent"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + @Override + public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("containsKey"); + + serialize(key, keySerializer, dos); + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + @Override + public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, + final Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<V>() { + @Override + public V execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("getAndPutIfAbsent"); + + serialize(key, keySerializer, dos); + serialize(value, valueSerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + + @Override + public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { + return withCommsSession(new CommsAction<V>() { + @Override + public V execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("get"); + + serialize(key, keySerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + + @Override + public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { + return withCommsSession(new CommsAction<Boolean>() { + @Override + public Boolean execute(final CommsSession session) throws IOException { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("remove"); + + serialize(key, serializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } + }); + } + + private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { + final int responseLength = dis.readInt(); + final byte[] responseBuffer = new byte[responseLength]; + dis.readFully(responseBuffer); + return responseBuffer; + } + + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { + final String hostname = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + final CommsSession commsSession; + if (sslContextService == null) { + commsSession = new StandardCommsSession(hostname, port); + } else { + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + } + + commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return commsSession; + } + + private CommsSession leaseCommsSession() throws IOException { + CommsSession session = queue.poll(); + if (session != null && !session.isClosed()) { + return session; + } + + session = createCommsSession(configContext); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + try { + ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); + } catch (final HandshakeException e) { + try { + session.close(); + } catch (final IOException ioe) { + } + + throw new IOException(e); + } + + return session; + } + + @Override + public void close() throws IOException { + this.closed = true; + + CommsSession commsSession; + while ((commsSession = queue.poll()) != null) { + try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { + dos.writeUTF("close"); + dos.flush(); + commsSession.close(); + } catch (final IOException e) { + } + } + logger.info("Closed {}", new Object[] { getIdentifier() }); + } + + @Override + protected void finalize() throws Throwable { + if (!closed) + close(); + logger.debug("Finalize called"); + } + + private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + dos.writeInt(baos.size()); + baos.writeTo(dos); + } + + private <T> T withCommsSession(final CommsAction<T> action) throws IOException { + if (closed) { + throw new IllegalStateException("Client is closed"); + } + + final CommsSession session = leaseCommsSession(); + try { + return action.execute(session); + } catch (final IOException ioe) { + try { + session.close(); + } catch (final IOException ignored) { + } + + throw ioe; + } finally { + if (!session.isClosed()) { + if (this.closed) { + try { + session.close(); + } catch (final IOException ioe) { + } + } else { + queue.offer(session); + } + } + } + } + + private static interface CommsAction<T> { + T execute(CommsSession commsSession) throws IOException; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java new file mode 100644 index 0000000..1d7c94c --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.ByteArrayOutputStream; +import org.apache.nifi.io.DataOutputStream; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient { + + private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Server Hostname") + .description("The name of the server that is running the DistributedSetCacheServer service") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Server Port") + .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description( + "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted") + .required(false) + .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class)) + .defaultValue(null) + .build(); + public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description( + "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>(); + private volatile ConfigurationContext configContext; + private volatile boolean closed = false; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(HOSTNAME); + descriptors.add(PORT); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(COMMUNICATIONS_TIMEOUT); + return descriptors; + } + + @OnConfigured + public void onConfigured(final ConfigurationContext context) { + this.configContext = context; + } + + public CommsSession createCommsSession(final ConfigurationContext context) throws IOException { + final String hostname = context.getProperty(HOSTNAME).getValue(); + final int port = context.getProperty(PORT).asInteger(); + final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + final CommsSession commsSession; + if (sslContextService == null) { + commsSession = new StandardCommsSession(hostname, port); + } else { + commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port); + } + + commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS); + return commsSession; + } + + private CommsSession leaseCommsSession() throws IOException { + CommsSession session = queue.poll(); + if (session != null && !session.isClosed()) { + return session; + } + + session = createCommsSession(configContext); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + try { + ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); + } catch (final HandshakeException e) { + try { + session.close(); + } catch (final IOException ioe) { + } + + throw new IOException(e); + } + + return session; + } + + @Override + public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("addIfAbsent", value, serializer); + } + + @Override + public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("contains", value, serializer); + } + + @Override + public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException { + return invokeRemoteBoolean("remove", value, serializer); + } + + @Override + public void close() throws IOException { + this.closed = true; + + CommsSession commsSession; + while ((commsSession = queue.poll()) != null) { + try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) { + dos.writeUTF("close"); + dos.flush(); + commsSession.close(); + } catch (final IOException e) { + } + } + logger.info("Closed {}", new Object[] { getIdentifier() }); + } + + @Override + protected void finalize() throws Throwable { + if (!closed) + close(); + logger.debug("Finalize called"); + } + + private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException { + if (closed) { + throw new IllegalStateException("Client is closed"); + } + + final CommsSession session = leaseCommsSession(); + try { + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF(methodName); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + dos.writeInt(baos.size()); + baos.writeTo(dos); + dos.flush(); + + final DataInputStream dis = new DataInputStream(session.getInputStream()); + return dis.readBoolean(); + } catch (final IOException ioe) { + try { + session.close(); + } catch (final IOException ignored) { + } + + throw ioe; + } finally { + if (!session.isClosed()) { + if (this.closed) { + try { + session.close(); + } catch (final IOException ioe) { + } + } else { + queue.offer(session); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java new file mode 100644 index 0000000..c8be082 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; + +public class SSLCommsSession implements CommsSession { + private final SSLSocketChannel sslSocketChannel; + private final SSLContext sslContext; + private final String hostname; + private final int port; + + private final SSLSocketChannelInputStream in; + private final BufferedInputStream bufferedIn; + + private final SSLSocketChannelOutputStream out; + private final BufferedOutputStream bufferedOut; + + public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { + sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + + in = new SSLSocketChannelInputStream(sslSocketChannel); + bufferedIn = new BufferedInputStream(in); + + out = new SSLSocketChannelOutputStream(sslSocketChannel); + bufferedOut = new BufferedOutputStream(out); + + this.sslContext = sslContext; + this.hostname = hostname; + this.port = port; + } + + @Override + public void interrupt() { + sslSocketChannel.interrupt(); + } + + @Override + public void close() throws IOException { + sslSocketChannel.close(); + } + + @Override + public void setTimeout(final long value, final TimeUnit timeUnit) { + sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return bufferedOut; + } + + @Override + public boolean isClosed() { + return sslSocketChannel.isClosed(); + } + + @Override + public String getHostname() { + return hostname; + } + + @Override + public int getPort() { + return port; + } + @Override + public SSLContext getSSLContext() { + return sslContext; + } + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java new file mode 100644 index 0000000..bbe2917 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.client; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.io.InterruptableInputStream; +import org.apache.nifi.remote.io.InterruptableOutputStream; +import org.apache.nifi.remote.io.socket.SocketChannelInputStream; +import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; + +public class StandardCommsSession implements CommsSession { + private final SocketChannel socketChannel; + private final String hostname; + private final int port; + private volatile long timeoutMillis; + + private final SocketChannelInputStream in; + private final InterruptableInputStream bufferedIn; + + private final SocketChannelOutputStream out; + private final InterruptableOutputStream bufferedOut; + + public StandardCommsSession(final String hostname, final int port) throws IOException { + socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + socketChannel.configureBlocking(false); + in = new SocketChannelInputStream(socketChannel); + bufferedIn = new InterruptableInputStream(new BufferedInputStream(in)); + + out = new SocketChannelOutputStream(socketChannel); + bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out)); + + this.hostname = hostname; + this.port = port; + } + + @Override + public void interrupt() { + bufferedIn.interrupt(); + bufferedOut.interrupt(); + } + + @Override + public void close() throws IOException { + socketChannel.close(); + } + + @Override + public void setTimeout(final long value, final TimeUnit timeUnit) { + in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit)); + timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return bufferedOut; + } + + @Override + public boolean isClosed() { + boolean closed = !socketChannel.isConnected(); + if (!closed) { + try { + this.in.isDataAvailable(); + } catch (IOException e) { + try { + close(); + } catch (IOException e1) { + } + closed = true; + } + } + return closed; + } + + @Override + public String getHostname() { + return hostname; + } + + @Override + public int getPort() { + return port; + } + + @Override + public SSLContext getSSLContext() { + return null; + } + + @Override + public long getTimeout(final TimeUnit timeUnit) { + return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..a91f7ee --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService +org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html new file mode 100644 index 0000000..d5f3595 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html @@ -0,0 +1,78 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> +<meta charset="utf-8" /> +<title>Distributed Map Cache Client Service</title> +<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> +</head> + +<body> + <h2>Description:</h2> + + <p>A Controller Service that can be used to communicate with a + Distributed Map Cache Server.</p> + + + + <p> + <strong>Properties:</strong> + </p> + <p>In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.</p> + + <ul> + <li><strong>Server Hostname</strong> + <ul> + <li>The name of the server that is running the DistributedMapCacheServer service</li> + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Server Port</strong> + <ul> + <li>The port on the remote server that is to be used when communicating with the + <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li> + + <li>Default value: 4557</li> + <li>Supports expression language: false</li> + </ul></li> + <li>SSL Context Service + <ul> + <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted + <li>Default value: no default</li> + <li>Supports expression language: false</li> + </ul></li> + <li><strong>Communications Timeout</strong> + <ul> + <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received + <li>Default value: 30 secs</li> + <li>Supports expression language: false</li> + </ul></li> + + </ul> + + + <i>See Also:</i> + <ul> + <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li> + <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li> + </ul> + +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html new file mode 100755 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml new file mode 100644 index 0000000..f636261 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-protocol</artifactId> + <name>Distributed Cache Protocol</name> + + <description> + Defines the communications protocol that is used between clients and servers + for the Distributed Cache services + </description> + + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java new file mode 100644 index 0000000..da2acad --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.protocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.remote.VersionNegotiator; + +public class ProtocolHandshake { + + public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' }; + + public static final int RESOURCE_OK = 20; + public static final int DIFFERENT_RESOURCE_VERSION = 21; + public static final int ABORT = 255; + + + public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + try { + dos.write(MAGIC_HEADER); + + initiateVersionNegotiation(versionNegotiator, dis, dos); + } finally { + dos.flush(); + } + } + + + public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { + final DataInputStream dis = new DataInputStream(in); + final DataOutputStream dos = new DataOutputStream(out); + + try { + final byte[] magicHeaderBuffer = new byte[4]; + dis.readFully(magicHeaderBuffer); + + receiveVersionNegotiation(versionNegotiator, dis, dos); + } finally { + dos.flush(); + } + } + + + private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + // Write the classname of the RemoteStreamCodec, followed by its version + dos.writeInt(negotiator.getVersion()); + dos.flush(); + + // wait for response from server. + final int statusCode = dis.read(); + switch (statusCode) { + case RESOURCE_OK: // server accepted our proposal of codec name/version + return; + case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version + // Get server's preferred version + final int newVersion = dis.readInt(); + + // Determine our new preferred version that is no greater than the server's preferred version. + final Integer newPreference = negotiator.getPreferredVersion(newVersion); + // If we could not agree with server on a version, fail now. + if ( newPreference == null ) { + throw new HandshakeException("Could not agree on protocol version"); + } + + negotiator.setVersion(newPreference); + + // Attempt negotiation of resource based on our new preferred version. + initiateVersionNegotiation(negotiator, dis, dos); + case ABORT: + throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); + default: + throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server"); + } + } + + private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + final int version = dis.readInt(); + if ( negotiator.isVersionSupported(version) ) { + dos.write(RESOURCE_OK); + dos.flush(); + + negotiator.setVersion(version); + } else { + final Integer preferred = negotiator.getPreferredVersion(version); + if ( preferred == null ) { + dos.write(ABORT); + dos.flush(); + throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol"); + } + dos.write(DIFFERENT_RESOURCE_VERSION); + dos.writeInt(preferred); + dos.flush(); + + receiveVersionNegotiation(negotiator, dis, dos); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java new file mode 100644 index 0000000..8049d42 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/exception/HandshakeException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.protocol.exception; + +public class HandshakeException extends Exception { + public HandshakeException(final String message) { + super(message); + } + + public HandshakeException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml new file mode 100644 index 0000000..b57d284 --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/pom.xml @@ -0,0 +1,78 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-services-bundle</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>distributed-cache-server</artifactId> + + <name>Distributed Cache Server</name> + <description>Provides a Controller Service for hosting Distributed Caches</description> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>remote-communications-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>wali</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>distributed-cache-client-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>ssl-context-service</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java new file mode 100644 index 0000000..9b4e70e --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.SocketTimeoutException; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; +import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.io.socket.SocketChannelInputStream; +import org.apache.nifi.remote.io.socket.SocketChannelOutputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractCacheServer implements CacheServer { + + private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class); + + private final String identifier; + private final int port; + private final SSLContext sslContext; + protected volatile boolean stopped = false; + private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>();; + + private volatile ServerSocketChannel serverSocketChannel; + + public AbstractCacheServer(final String identifier, final SSLContext sslContext, final int port) { + this.identifier = identifier; + this.port = port; + this.sslContext = sslContext; + } + + @Override + public void start() throws IOException { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(true); + serverSocketChannel.bind(new InetSocketAddress(port)); + + final Runnable runnable = new Runnable() { + + @Override + public void run() { + while (true) { + final SocketChannel socketChannel; + try { + socketChannel = serverSocketChannel.accept(); + logger.debug("Connected to {}", new Object[] { socketChannel }); + } catch (final IOException e) { + if (!stopped) { + logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString()); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + return; + } + + final Runnable processInputRunnable = new Runnable() { + @Override + public void run() { + final InputStream rawInputStream; + final OutputStream rawOutputStream; + final String peer = socketChannel.socket().getInetAddress().getHostName(); + + try { + if (sslContext == null) { + rawInputStream = new SocketChannelInputStream(socketChannel); + rawOutputStream = new SocketChannelOutputStream(socketChannel); + } else { + final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false); + sslSocketChannel.connect(); + rawInputStream = new SSLSocketChannelInputStream(sslSocketChannel); + rawOutputStream = new SSLSocketChannelOutputStream(sslSocketChannel); + } + } catch (IOException e) { + logger.error("Cannot create input and/or output streams for {}", new Object[] { identifier }, e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + try { + socketChannel.close(); + } catch (IOException swallow) { + } + + return; + } + try (final InputStream in = new BufferedInputStream(rawInputStream); + final OutputStream out = new BufferedOutputStream(rawOutputStream)) { + + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); + + ProtocolHandshake.receiveHandshake(in, out, versionNegotiator); + + boolean continueComms = true; + while (continueComms) { + continueComms = listen(in, out, versionNegotiator.getVersion()); + } + // client has issued 'close' + logger.debug("Client issued close on {}", new Object[] { socketChannel }); + } catch (final SocketTimeoutException e) { + logger.debug("30 sec timeout reached", e); + } catch (final IOException | HandshakeException e) { + if (!stopped) { + logger.error("{} unable to communicate with remote peer {} due to {}", new Object[] { this, peer, e.toString() }); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } finally { + processInputThreads.remove(Thread.currentThread()); + } + } + }; + + final Thread processInputThread = new Thread(processInputRunnable); + processInputThread.setName("Distributed Cache Server Communications Thread: " + identifier); + processInputThread.setDaemon(true); + processInputThread.start(); + processInputThreads.add(processInputThread); + } + } + }; + + final Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName("Distributed Cache Server: " + identifier); + thread.start(); + } + + @Override + public void stop() throws IOException { + stopped = true; + logger.info("Stopping CacheServer {}", new Object[] { this.identifier }); + + if (serverSocketChannel != null) { + serverSocketChannel.close(); + } + // need to close out the created SocketChannels...this is done by interrupting + // the created threads that loop on listen(). + for (Thread processInputThread : processInputThreads) { + processInputThread.interrupt(); + int i = 0; + while (!processInputThread.isInterrupted() && i++ < 5) { + try { + Thread.sleep(50); // allow thread to gracefully terminate + } catch (InterruptedException e) { + } + } + } + processInputThreads.clear(); + } + + @Override + public String toString() { + return "CacheServer[id=" + identifier + "]"; + } + + /** + * Listens for incoming data and communicates with remote peer + * + * @param in + * @param out + * @param version + * @return <code>true</code> if communications should continue, <code>false</code> otherwise + * @throws IOException + */ + protected abstract boolean listen(InputStream in, OutputStream out, int version) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java new file mode 100644 index 0000000..71ac56d --- /dev/null +++ b/nar-bundles/standard-services/distributed-cache-services-bundle/distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheRecord.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class CacheRecord { + + private static final AtomicLong idGenerator = new AtomicLong(0L); + + private final long id; + private final long entryDate; + private volatile long lastHitDate; + private final AtomicInteger hitCount = new AtomicInteger(0); + + public CacheRecord() { + entryDate = System.currentTimeMillis(); + lastHitDate = entryDate; + id = idGenerator.getAndIncrement(); + } + + public long getEntryDate() { + return entryDate; + } + + public long getLastHitDate() { + return lastHitDate; + } + + public int getHitCount() { + return hitCount.get(); + } + + public void hit() { + hitCount.getAndIncrement(); + lastHitDate = System.currentTimeMillis(); + } + + public long getId() { + return id; + } +}