http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
new file mode 100644
index 0000000..39ca330
--- /dev/null
+++ 
b/commons/nifi-utils/src/test/java/org/apache/nifi/util/timebuffer/TestTimedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util.timebuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+public class TestTimedBuffer {
+
+    @Test
+    public void testAgesOff() throws InterruptedException {
+        final LongEntityAccess access = new LongEntityAccess();
+        final TimedBuffer<TimestampedLong> buffer = new 
TimedBuffer<>(TimeUnit.SECONDS, 2, access);
+
+        buffer.add(new TimestampedLong(1000000L));
+        TimestampedLong aggregate = 
buffer.getAggregateValue(System.currentTimeMillis() - 30000L);
+        assertEquals(1000000L, aggregate.getValue().longValue());
+        Thread.sleep(1000L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertEquals(1000000L, aggregate.getValue().longValue());
+        Thread.sleep(1500L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertNull(aggregate);
+    }
+
+    @Test
+    public void testAggregation() throws InterruptedException {
+        final LongEntityAccess access = new LongEntityAccess();
+        final TimedBuffer<TimestampedLong> buffer = new 
TimedBuffer<>(TimeUnit.SECONDS, 2, access);
+
+        buffer.add(new TimestampedLong(1000000L));
+        buffer.add(new TimestampedLong(1000000L));
+        buffer.add(new TimestampedLong(25000L));
+
+        TimestampedLong aggregate = 
buffer.getAggregateValue(System.currentTimeMillis() - 30000L);
+        assertEquals(2025000L, aggregate.getValue().longValue());
+        Thread.sleep(1000L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertEquals(2025000L, aggregate.getValue().longValue());
+        Thread.sleep(1500L);
+        aggregate = buffer.getAggregateValue(System.currentTimeMillis() - 
30000L);
+        assertNull(aggregate);
+    }
+
+    private static class TimestampedLong {
+
+        private final Long value;
+        private final long timestamp = System.currentTimeMillis();
+
+        public TimestampedLong(final Long value) {
+            this.value = value;
+        }
+
+        public Long getValue() {
+            return value;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    private static class LongEntityAccess implements 
EntityAccess<TimestampedLong> {
+
+        @Override
+        public TimestampedLong aggregate(TimestampedLong oldValue, 
TimestampedLong toAdd) {
+            if (oldValue == null && toAdd == null) {
+                return new TimestampedLong(0L);
+            } else if (oldValue == null) {
+                return toAdd;
+            } else if (toAdd == null) {
+                return oldValue;
+            }
+
+            return new TimestampedLong(oldValue.getValue().longValue() + 
toAdd.getValue().longValue());
+        }
+
+        @Override
+        public TimestampedLong createNew() {
+            return new TimestampedLong(0L);
+        }
+
+        @Override
+        public long getTimestamp(TimestampedLong entity) {
+            return entity == null ? 0L : entity.getTimestamp();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/nifi-web-utils/pom.xml b/commons/nifi-web-utils/pom.xml
new file mode 100644
index 0000000..434e1a3
--- /dev/null
+++ b/commons/nifi-web-utils/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-web-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Web Utils</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.10</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.18.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+            <version>1.18.2</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>javax.servlet-api</artifactId>
+            <version>3.1.0</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
new file mode 100644
index 0000000..8c0b1f4
--- /dev/null
+++ 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ClientUtils.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import java.net.URI;
+import java.util.Map;
+import javax.ws.rs.core.MediaType;
+
+/**
+ *
+ */
+public class ClientUtils {
+
+    private final Client client;
+
+    public ClientUtils(Client client) {
+        this.client = client;
+    }
+
+    /**
+     * Gets the content at the specified URI.
+     *
+     * @param uri
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri) throws ClientHandlerException, 
UniformInterfaceException {
+        return get(uri, null);
+    }
+
+    /**
+     * Gets the content at the specified URI using the given query parameters.
+     *
+     * @param uri
+     * @param queryParams
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse get(final URI uri, final Map<String, String> 
queryParams) throws ClientHandlerException, UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        if (queryParams != null) {
+            for (final Map.Entry<String, String> queryEntry : 
queryParams.entrySet()) {
+                webResource = webResource.queryParam(queryEntry.getKey(), 
queryEntry.getValue());
+            }
+        }
+
+        return 
webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+    }
+
+    /**
+     * Performs a POST using the specified url and entity body.
+     *
+     * @param uri
+     * @param entity
+     * @return
+     */
+    public ClientResponse post(URI uri, Object entity) throws 
ClientHandlerException, UniformInterfaceException {
+        // get the resource
+        WebResource.Builder resourceBuilder = 
client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON);
+
+        // include the request entity
+        if (entity != null) {
+            resourceBuilder = resourceBuilder.entity(entity);
+        }
+
+        // perform the request
+        return resourceBuilder.post(ClientResponse.class);
+    }
+
+    /**
+     * Performs a POST using the specified url and form data.
+     *
+     * @param uri
+     * @param formData
+     * @return
+     */
+    public ClientResponse post(URI uri, Map<String, String> formData) throws 
ClientHandlerException, UniformInterfaceException {
+        // convert the form data
+        MultivaluedMapImpl entity = new MultivaluedMapImpl();
+        for (String key : formData.keySet()) {
+            entity.add(key, formData.get(key));
+        }
+
+        // get the resource
+        WebResource.Builder resourceBuilder = 
client.resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED);
+
+        // add the form data if necessary
+        if (!entity.isEmpty()) {
+            resourceBuilder = resourceBuilder.entity(entity);
+        }
+
+        // perform the request
+        return resourceBuilder.post(ClientResponse.class);
+    }
+
+    /**
+     * Performs a HEAD request to the specified URI.
+     *
+     * @param uri
+     * @return
+     * @throws ClientHandlerException
+     * @throws UniformInterfaceException
+     */
+    public ClientResponse head(final URI uri) throws ClientHandlerException, 
UniformInterfaceException {
+        // perform the request
+        WebResource webResource = client.resource(uri);
+        return webResource.head();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
new file mode 100644
index 0000000..4e7f5b6
--- /dev/null
+++ 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/ObjectMapperResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import javax.ws.rs.ext.ContextResolver;
+import javax.ws.rs.ext.Provider;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+@Provider
+public class ObjectMapperResolver implements ContextResolver<ObjectMapper> {
+
+    private final ObjectMapper mapper;
+
+    public ObjectMapperResolver() throws Exception {
+        mapper = new ObjectMapper();
+
+        final AnnotationIntrospector jaxbIntrospector = new 
JaxbAnnotationIntrospector();
+        final SerializationConfig serializationConfig = 
mapper.getSerializationConfig();
+        final DeserializationConfig deserializationConfig = 
mapper.getDeserializationConfig();
+
+        
mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
+        
mapper.setDeserializationConfig(deserializationConfig.withAnnotationIntrospector(jaxbIntrospector));
+    }
+
+    @Override
+    public ObjectMapper getContext(Class<?> objectType) {
+        return mapper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
new file mode 100644
index 0000000..587b3d8
--- /dev/null
+++ 
b/commons/nifi-web-utils/src/main/java/org/apache/nifi/web/util/WebUtils.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateParsingException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+
+import org.apache.nifi.security.util.CertificateUtils;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
+
+/**
+ * Common utilities related to web development.
+ *
+ * @author unattributed
+ */
+public final class WebUtils {
+
+    private static Logger logger = LoggerFactory.getLogger(WebUtils.class);
+
+    final static ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private WebUtils() {
+    }
+
+    /**
+     * Creates a client for non-secure requests. The client will be created
+     * using the given configuration. Additionally, the client will be
+     * automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     *
+     * @return a Client instance
+     */
+    public static Client createClient(final ClientConfig config) {
+        return createClientHelper(config, null);
+    }
+
+    /**
+     * Creates a client for secure requests. The client will be created using
+     * the given configuration and security context. Additionally, the client
+     * will be automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     * @param ctx security context
+     *
+     * @return a Client instance
+     */
+    public static Client createClient(final ClientConfig config, final 
SSLContext ctx) {
+        return createClientHelper(config, ctx);
+    }
+
+    /**
+     * A helper method for creating clients. The client will be created using
+     * the given configuration and security context. Additionally, the client
+     * will be automatically configured for JSON serialization/deserialization.
+     *
+     * @param config client configuration
+     * @param ctx security context, which may be null for non-secure client
+     * creation
+     *
+     * @return a Client instance
+     */
+    private static Client createClientHelper(final ClientConfig config, final 
SSLContext ctx) {
+
+        final ClientConfig finalConfig = (config == null) ? new 
DefaultClientConfig() : config;
+
+        if (ctx != null && StringUtils.isBlank((String) 
finalConfig.getProperty(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES))) {
+
+            // custom hostname verifier that checks subject alternative names 
against the hostname of the URI
+            final HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+                @Override
+                public boolean verify(final String hostname, final SSLSession 
ssls) {
+
+                    try {
+                        for (final Certificate peerCertificate : 
ssls.getPeerCertificates()) {
+                            if (peerCertificate instanceof X509Certificate) {
+                                final X509Certificate x509Cert = 
(X509Certificate) peerCertificate;
+                                final List<String> subjectAltNames = 
CertificateUtils.getSubjectAlternativeNames(x509Cert);
+                                if 
(subjectAltNames.contains(hostname.toLowerCase())) {
+                                    return true;
+                                }
+                            }
+                        }
+                    } catch (final SSLPeerUnverifiedException | 
CertificateParsingException ex) {
+                        logger.warn("Hostname Verification encountered 
exception verifying hostname due to: " + ex, ex);
+                    }
+
+                    return false;
+                }
+            };
+
+            
finalConfig.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new 
HTTPSProperties(hostnameVerifier, ctx));
+        }
+
+        finalConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, 
Boolean.TRUE);
+        finalConfig.getClasses().add(ObjectMapperResolver.class);
+
+        // web client for restful request
+        return Client.create(finalConfig);
+
+    }
+
+    /**
+     * Serializes the given object to hexadecimal. Serialization uses Java's
+     * native serialization mechanism, the ObjectOutputStream.
+     *
+     * @param obj an object
+     * @return the serialized object as hex
+     */
+    public static String serializeObjectToHex(final Serializable obj) {
+
+        final ByteArrayOutputStream serializedObj = new 
ByteArrayOutputStream();
+
+        // IOException can never be thrown because we are serializing to an in 
memory byte array
+        try {
+            final ObjectOutputStream oos = new 
ObjectOutputStream(serializedObj);
+            oos.writeObject(obj);
+            oos.close();
+        } catch (final IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+        logger.debug(String.format("Serialized object '%s' size: %d", obj, 
serializedObj.size()));
+
+        // hex encode the binary
+        return new String(Hex.encodeHex(serializedObj.toByteArray(), /* 
tolowercase */ true));
+    }
+
+    /**
+     * Deserializes a Java serialized, hex-encoded string into a Java object.
+     * This method is the inverse of the serializeObjectToHex method in this
+     * class.
+     *
+     * @param hexEncodedObject a string
+     * @return the object
+     * @throws ClassNotFoundException if the class could not be found
+     */
+    public static Serializable deserializeHexToObject(final String 
hexEncodedObject) throws ClassNotFoundException {
+
+        // decode the hex encoded object
+        byte[] serializedObj;
+        try {
+            serializedObj = Hex.decodeHex(hexEncodedObject.toCharArray());
+        } catch (final DecoderException de) {
+            throw new IllegalArgumentException(de);
+        }
+
+        // IOException can never be thrown because we are deserializing from 
an in memory byte array
+        try {
+            // deserialize bytes into object
+            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(serializedObj));
+            return (Serializable) ois.readObject();
+        } catch (final IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/commons/processor-utilities/pom.xml 
b/commons/processor-utilities/pom.xml
new file mode 100644
index 0000000..0519b7f
--- /dev/null
+++ b/commons/processor-utilities/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-processor-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+    <name>NiFi Processor Utils</name>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>[0.0.1-SNAPSHOT, 1.0.0-SNAPSHOT)</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>[0.0.1-SNAPSHOT,1.0.0-SNAPSHOT)</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
----------------------------------------------------------------------
diff --git 
a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
new file mode 100644
index 0000000..1f77093
--- /dev/null
+++ 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/FlowFileFilters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.FlowFileFilter;
+
+public class FlowFileFilters {
+
+    /**
+     * Returns a new {@link FlowFileFilter} that will pull FlowFiles until the
+     * maximum file size has been reached, or the maximum FlowFile Count was
+     * been reached (this is important because FlowFiles may be 0 bytes!). If
+     * the first FlowFile exceeds the max size, the FlowFile will be selected
+     * and no other FlowFile will be.
+     *
+     * @param maxSize the maximum size of the group of FlowFiles
+     * @param unit the unit of the <code>maxSize</code> argument
+     * @param maxCount the maximum number of FlowFiles to pull
+     * @return
+     */
+    public static FlowFileFilter newSizeBasedFilter(final double maxSize, 
final DataUnit unit, final int maxCount) {
+        final double maxBytes = DataUnit.B.convert(maxSize, unit);
+
+        return new FlowFileFilter() {
+            int count = 0;
+            long size = 0L;
+
+            @Override
+            public FlowFileFilterResult filter(final FlowFile flowFile) {
+                if (count == 0) {
+                    count++;
+                    size += flowFile.getSize();
+
+                    return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+                }
+
+                if ((size + flowFile.getSize() > maxBytes) || (count + 1 > 
maxCount)) {
+                    return FlowFileFilterResult.REJECT_AND_TERMINATE;
+                }
+
+                count++;
+                size += flowFile.getSize();
+                return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+            }
+
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
----------------------------------------------------------------------
diff --git 
a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
new file mode 100644
index 0000000..0d66df5
--- /dev/null
+++ 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/SSLProperties.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.security.util.CertificateUtils;
+import org.apache.nifi.security.util.KeystoreType;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.security.util.SslContextFactory.ClientAuth;
+
+public class SSLProperties {
+
+    public static final PropertyDescriptor TRUSTSTORE = new 
PropertyDescriptor.Builder()
+            .name("Truststore Filename")
+            .description("The fully-qualified filename of the Truststore")
+            .defaultValue(null)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor TRUSTSTORE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Truststore Type")
+            .description("The Type of the Truststore. Either JKS or PKCS12")
+            .allowableValues("JKS", "PKCS12")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue(null)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor TRUSTSTORE_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Truststore Password")
+            .description("The password for the Truststore")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static final PropertyDescriptor KEYSTORE = new 
PropertyDescriptor.Builder()
+            .name("Keystore Filename")
+            .description("The fully-qualified filename of the Keystore")
+            .defaultValue(null)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor KEYSTORE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Keystore Type")
+            .description("The Type of the Keystore")
+            .allowableValues("JKS", "PKCS12")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor KEYSTORE_PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("Keystore Password")
+            .defaultValue(null)
+            .description("The password for the Keystore")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+
+    public static Collection<ValidationResult> validateStore(final 
Map<PropertyDescriptor, String> properties) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+        results.addAll(validateStore(properties, 
KeystoreValidationGroup.KEYSTORE));
+        results.addAll(validateStore(properties, 
KeystoreValidationGroup.TRUSTSTORE));
+        return results;
+    }
+
+    public static Collection<ValidationResult> validateStore(final 
Map<PropertyDescriptor, String> properties, final KeystoreValidationGroup 
keyStoreOrTrustStore) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final String filename;
+        final String password;
+        final String type;
+
+        if (keyStoreOrTrustStore == KeystoreValidationGroup.KEYSTORE) {
+            filename = properties.get(KEYSTORE);
+            password = properties.get(KEYSTORE_PASSWORD);
+            type = properties.get(KEYSTORE_TYPE);
+        } else {
+            filename = properties.get(TRUSTSTORE);
+            password = properties.get(TRUSTSTORE_PASSWORD);
+            type = properties.get(TRUSTSTORE_TYPE);
+        }
+
+        final String keystoreDesc = (keyStoreOrTrustStore == 
KeystoreValidationGroup.KEYSTORE) ? "Keystore" : "Truststore";
+
+        final int nulls = countNulls(filename, password, type);
+        if (nulls != 3 && nulls != 0) {
+            results.add(new 
ValidationResult.Builder().valid(false).explanation("Must set either 0 or 3 
properties for " + keystoreDesc).subject(keystoreDesc + " Properties").build());
+        } else if (nulls == 0) {
+            // all properties were filled in.
+            final File file = new File(filename);
+            if (!file.exists() || !file.canRead()) {
+                results.add(new 
ValidationResult.Builder().valid(false).subject(keystoreDesc + " 
Properties").explanation("Cannot access file " + 
file.getAbsolutePath()).build());
+            } else {
+                try {
+                    final boolean storeValid = 
CertificateUtils.isStoreValid(file.toURI().toURL(), KeystoreType.valueOf(type), 
password.toCharArray());
+                    if (!storeValid) {
+                        results.add(new 
ValidationResult.Builder().subject(keystoreDesc + " 
Properties").valid(false).explanation("Invalid KeyStore Password or Type 
specified for file " + filename).build());
+                    }
+                } catch (MalformedURLException e) {
+                    results.add(new 
ValidationResult.Builder().subject(keystoreDesc + " 
Properties").valid(false).explanation("Malformed URL from file: " + e).build());
+                }
+            }
+        }
+
+        return results;
+    }
+
+    private static int countNulls(Object... objects) {
+        int count = 0;
+        for (final Object x : objects) {
+            if (x == null) {
+                count++;
+            }
+        }
+
+        return count;
+    }
+
+    public static enum KeystoreValidationGroup {
+
+        KEYSTORE, TRUSTSTORE
+    }
+
+    public static List<PropertyDescriptor> getKeystoreDescriptors(final 
boolean required) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        for (final PropertyDescriptor descriptor : KEYSTORE_DESCRIPTORS) {
+            final PropertyDescriptor.Builder builder = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(descriptor).required(required);
+            if (required && 
descriptor.getName().equals(KEYSTORE_TYPE.getName())) {
+                builder.defaultValue("JKS");
+            }
+            descriptors.add(builder.build());
+        }
+
+        return descriptors;
+    }
+
+    public static List<PropertyDescriptor> getTruststoreDescriptors(final 
boolean required) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        for (final PropertyDescriptor descriptor : TRUSTSTORE_DESCRIPTORS) {
+            final PropertyDescriptor.Builder builder = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(descriptor).required(required);
+            if (required && 
descriptor.getName().equals(TRUSTSTORE_TYPE.getName())) {
+                builder.defaultValue("JKS");
+            }
+            descriptors.add(builder.build());
+        }
+
+        return descriptors;
+    }
+
+    public static SSLContext createSSLContext(final ProcessContext context, 
final ClientAuth clientAuth)
+            throws UnrecoverableKeyException, KeyManagementException, 
KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException {
+        final String keystoreFile = context.getProperty(KEYSTORE).getValue();
+        if (keystoreFile == null) {
+            return SslContextFactory.createTrustSslContext(
+                    context.getProperty(TRUSTSTORE).getValue(),
+                    
context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
+                    context.getProperty(TRUSTSTORE_TYPE).getValue());
+        } else {
+            final String truststoreFile = 
context.getProperty(TRUSTSTORE).getValue();
+            if (truststoreFile == null) {
+                return SslContextFactory.createSslContext(
+                        context.getProperty(KEYSTORE).getValue(),
+                        
context.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(),
+                        context.getProperty(KEYSTORE_TYPE).getValue());
+            } else {
+                return SslContextFactory.createSslContext(
+                        context.getProperty(KEYSTORE).getValue(),
+                        
context.getProperty(KEYSTORE_PASSWORD).getValue().toCharArray(),
+                        context.getProperty(KEYSTORE_TYPE).getValue(),
+                        context.getProperty(TRUSTSTORE).getValue(),
+                        
context.getProperty(TRUSTSTORE_PASSWORD).getValue().toCharArray(),
+                        context.getProperty(TRUSTSTORE_TYPE).getValue(),
+                        clientAuth);
+            }
+        }
+    }
+
+    private static final Set<PropertyDescriptor> KEYSTORE_DESCRIPTORS = new 
HashSet<>();
+    private static final Set<PropertyDescriptor> TRUSTSTORE_DESCRIPTORS = new 
HashSet<>();
+
+    static {
+        KEYSTORE_DESCRIPTORS.add(KEYSTORE);
+        KEYSTORE_DESCRIPTORS.add(KEYSTORE_TYPE);
+        KEYSTORE_DESCRIPTORS.add(KEYSTORE_PASSWORD);
+
+        TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE);
+        TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE_TYPE);
+        TRUSTSTORE_DESCRIPTORS.add(TRUSTSTORE_PASSWORD);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
----------------------------------------------------------------------
diff --git 
a/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
new file mode 100644
index 0000000..10748fe
--- /dev/null
+++ 
b/commons/processor-utilities/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.UnsupportedCharsetException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.util.FormatUtils;
+
+public class StandardValidators {
+
+    //
+    //
+    // STATICALLY DEFINED VALIDATORS
+    //
+    //
+    public static final Validator ATTRIBUTE_KEY_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
+            builder.subject(subject).input(input);
+
+            try {
+                FlowFile.KeyValidator.validateKey(input);
+                builder.valid(true);
+            } catch (final IllegalArgumentException e) {
+                builder.valid(false).explanation(e.getMessage());
+            }
+
+            return builder.build();
+        }
+    };
+
+    public static final Validator ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR = new 
Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
+            builder.subject("Property Name").input(subject);
+
+            try {
+                FlowFile.KeyValidator.validateKey(subject);
+                builder.valid(true);
+            } catch (final IllegalArgumentException e) {
+                builder.valid(false).explanation(e.getMessage());
+            }
+
+            return builder.build();
+        }
+    };
+
+    public static final Validator POSITIVE_INTEGER_VALIDATOR = new Validator() 
{
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                final int intVal = Integer.parseInt(value);
+
+                if (intVal <= 0) {
+                    reason = "not a positive value";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "not a valid integer";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    public static final Validator POSITIVE_LONG_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                final long longVal = Long.parseLong(value);
+
+                if (longVal <= 0) {
+                    reason = "not a positive value";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "not a valid 64-bit integer";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    public static final Validator PORT_VALIDATOR = createLongValidator(1, 
65535, true);
+
+    public static final Validator NON_EMPTY_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            return new 
ValidationResult.Builder().subject(subject).input(value).valid(value != null && 
!value.isEmpty()).explanation(subject + " cannot be empty").build();
+        }
+    };
+
+    public static final Validator BOOLEAN_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final boolean valid = "true".equalsIgnoreCase(value) || 
"false".equalsIgnoreCase(value);
+            final String explanation = valid ? null : "Value must be 'true' or 
'false'";
+            return new 
ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+        }
+    };
+
+    public static final Validator INTEGER_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                Integer.parseInt(value);
+            } catch (final NumberFormatException e) {
+                reason = "not a valid integer";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    public static final Validator LONG_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                Long.parseLong(value);
+            } catch (final NumberFormatException e) {
+                reason = "not a valid Long";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    public static final Validator NON_NEGATIVE_INTEGER_VALIDATOR = new 
Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                final int intVal = Integer.parseInt(value);
+
+                if (intVal < 0) {
+                    reason = "value is negative";
+                }
+            } catch (final NumberFormatException e) {
+                reason = "value is not a valid integer";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    public static final Validator CHARACTER_SET_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            String reason = null;
+            try {
+                if (!Charset.isSupported(value)) {
+                    reason = "Character Set is not supported by this JVM.";
+                }
+            } catch (final UnsupportedCharsetException uce) {
+                reason = "Character Set is not supported by this JVM.";
+            } catch (final IllegalArgumentException iae) {
+                reason = "Character Set value cannot be null.";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    };
+
+    /**
+     * URL Validator that does not allow the Expression Language to be used
+     */
+    public static final Validator URL_VALIDATOR = createURLValidator();
+
+    public static final Validator URI_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            try {
+                new URI(input);
+                return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Valid 
URI").valid(true).build();
+            } catch (final Exception e) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Not a 
valid URI").valid(false).build();
+            }
+        }
+    };
+
+    public static final Validator REGULAR_EXPRESSION_VALIDATOR = 
createRegexValidator(0, Integer.MAX_VALUE, false);
+
+    public static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = 
new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            try {
+                context.newExpressionLanguageCompiler().compile(input);
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            } catch (final Exception e) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(e.getMessage()).build();
+            }
+        }
+
+    };
+
+    public static final Validator TIME_PERIOD_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            if (input == null) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time
 Period cannot be null").build();
+            }
+            if 
(Pattern.compile(FormatUtils.TIME_DURATION_REGEX).matcher(input.toLowerCase()).matches())
 {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            } else {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must
 be of format <duration> <TimeUnit> where <duration> is a non-negative integer 
and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, 
days").build();
+            }
+        }
+    };
+
+    public static final Validator DATA_SIZE_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            if (input == null) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Data
 Size cannot be null").build();
+            }
+            if 
(Pattern.compile(DataUnit.DATA_SIZE_REGEX).matcher(input.toUpperCase()).matches())
 {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            } else {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Must
 be of format <Data Size> <Data Unit> where <Data Size> is a non-negative 
integer and <Data Unit> is a supported Data Unit, such as: B, KB, MB, GB, 
TB").build();
+            }
+        }
+    };
+
+    public static final Validator FILE_EXISTS_VALIDATOR = new 
FileExistsValidator(true);
+
+    //
+    //
+    // FACTORY METHODS FOR VALIDATORS
+    //
+    //
+    public static Validator createDirectoryExistsValidator(final boolean 
allowExpressionLanguage, final boolean createDirectoryIfMissing) {
+        return new DirectoryExistsValidator(allowExpressionLanguage, 
createDirectoryIfMissing);
+    }
+
+    private static Validator createURLValidator() {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                try {
+                    final String evaluatedInput = 
context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+                    new URL(evaluatedInput);
+                    return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Valid 
URL").valid(true).build();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).explanation("Not a 
valid URL").valid(false).build();
+                }
+            }
+        };
+    }
+
+    public static Validator createTimePeriodValidator(final long minTime, 
final TimeUnit minTimeUnit, final long maxTime, final TimeUnit maxTimeUnit) {
+        return new TimePeriodValidator(minTime, minTimeUnit, maxTime, 
maxTimeUnit);
+    }
+
+    public static Validator createAttributeExpressionLanguageValidator(final 
ResultType expectedResultType) {
+        return createAttributeExpressionLanguageValidator(expectedResultType, 
true);
+    }
+
+    public static Validator createRegexMatchingValidator(final Pattern 
pattern) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                final boolean matches = pattern.matcher(input).matches();
+                return new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(matches)
+                        .explanation(matches ? null : "Value does not match 
regular expression: " + pattern.pattern())
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Creates a @{link Validator} that ensure that a value is a valid Java
+     * Regular Expression with at least <code>minCapturingGroups</code>
+     * capturing groups and at most <code>maxCapturingGroups</code> capturing
+     * groups. If <code>supportAttributeExpressionLanguage</code> is set to
+     * <code>true</code>, the value may also include the Expression Language,
+     * but the result of evaluating the Expression Language will be applied
+     * before the Regular Expression is performed. In this case, the Expression
+     * Language will not support FlowFile Attributes but only System/JVM
+     * Properties
+     *
+     * @param minCapturingGroups
+     * @param maxCapturingGroups
+     * @param supportAttributeExpressionLanguage
+     * @return
+     */
+    public static Validator createRegexValidator(final int minCapturingGroups, 
final int maxCapturingGroups, final boolean supportAttributeExpressionLanguage) 
{
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String value, final ValidationContext context) {
+                try {
+                    final String substituted;
+                    if (supportAttributeExpressionLanguage) {
+                        try {
+                            substituted = 
context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                        } catch (final Exception e) {
+                            return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Failed
 to evaluate the Attribute Expression Language due to " + e.toString()).build();
+                        }
+                    } else {
+                        substituted = value;
+                    }
+
+                    final Pattern pattern = Pattern.compile(substituted);
+                    final int numGroups = pattern.matcher("").groupCount();
+                    if (numGroups < minCapturingGroups || numGroups > 
maxCapturingGroups) {
+                        return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("RegEx
 is required to have between " + minCapturingGroups + " and " + 
maxCapturingGroups + " Capturing Groups but has " + numGroups).build();
+                    }
+
+                    return new 
ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(value).valid(false).explanation("Not
 a valid Java Regular Expression").build();
+                }
+
+            }
+        };
+    }
+
+    public static Validator createAttributeExpressionLanguageValidator(final 
ResultType expectedResultType, final boolean allowExtraCharacters) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                final String syntaxError = 
context.newExpressionLanguageCompiler().validateExpression(input, 
allowExtraCharacters);
+                if (syntaxError != null) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(syntaxError).build();
+                }
+
+                final ResultType resultType = allowExtraCharacters ? 
ResultType.STRING : 
context.newExpressionLanguageCompiler().getResultType(input);
+                if (!resultType.equals(expectedResultType)) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Expected
 Attribute Query to return type " + expectedResultType + " but query returns 
type " + resultType).build();
+                }
+
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+    }
+
+    public static Validator createLongValidator(final long minimum, final long 
maximum, final boolean inclusive) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                String reason = null;
+                try {
+                    final long longVal = Long.parseLong(input);
+                    if (longVal < minimum || (!inclusive && longVal == 
minimum) | longVal > maximum || (!inclusive && longVal == maximum)) {
+                        reason = "Value must be between " + minimum + " and " 
+ maximum + " (" + (inclusive ? "inclusive" : "exclusive") + ")";
+                    }
+                } catch (final NumberFormatException e) {
+                    reason = "not a valid integer";
+                }
+
+                return new 
ValidationResult.Builder().subject(subject).input(input).explanation(reason).valid(reason
 == null).build();
+            }
+
+        };
+    }
+
+    //
+    //
+    // SPECIFIC VALIDATOR IMPLEMENTATIONS THAT CANNOT BE ANONYMOUS CLASSES
+    //
+    //
+    static class TimePeriodValidator implements Validator {
+
+        private final Pattern pattern;
+
+        private final long minNanos;
+        private final long maxNanos;
+
+        private final String minValueEnglish;
+        private final String maxValueEnglish;
+
+        public TimePeriodValidator(final long minValue, final TimeUnit 
minTimeUnit, final long maxValue, final TimeUnit maxTimeUnit) {
+            pattern = Pattern.compile(FormatUtils.TIME_DURATION_REGEX);
+
+            this.minNanos = TimeUnit.NANOSECONDS.convert(minValue, 
minTimeUnit);
+            this.maxNanos = TimeUnit.NANOSECONDS.convert(maxValue, 
maxTimeUnit);
+            this.minValueEnglish = minValue + " " + minTimeUnit.toString();
+            this.maxValueEnglish = maxValue + " " + maxTimeUnit.toString();
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
+            if (input == null) {
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(false).explanation("Time
 Period cannot be null").build();
+            }
+            final String lowerCase = input.toLowerCase();
+            final boolean validSyntax = pattern.matcher(lowerCase).matches();
+            final ValidationResult.Builder builder = new 
ValidationResult.Builder();
+            if (validSyntax) {
+                final long nanos = FormatUtils.getTimeDuration(lowerCase, 
TimeUnit.NANOSECONDS);
+
+                if (nanos < minNanos || nanos > maxNanos) {
+                    builder.subject(subject).input(input).valid(false)
+                            .explanation("Must be in the range of " + 
minValueEnglish + " to " + maxValueEnglish);
+                } else {
+                    builder.subject(subject).input(input).valid(true);
+                }
+            } else {
+                builder.subject(subject).input(input).valid(false)
+                        .explanation("Must be of format <duration> <TimeUnit> 
where <duration> is a non-negative integer and TimeUnit is a supported Time 
Unit, such as: nanos, millis, secs, mins, hrs, days");
+            }
+            return builder.build();
+        }
+    }
+
+    public static class FileExistsValidator implements Validator {
+
+        private final boolean allowEL;
+
+        public FileExistsValidator(final boolean allowExpressionLanguage) {
+            this.allowEL = allowExpressionLanguage;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final String substituted;
+            if (allowEL) {
+                try {
+                    substituted = 
context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(value).valid(false)
+                            .explanation("Not a valid Expression Language 
value: " + e.getMessage()).build();
+                }
+            } else {
+                substituted = value;
+            }
+
+            final File file = new File(substituted);
+            final boolean valid = file.exists();
+            final String explanation = valid ? null : "File " + file + " does 
not exist";
+            return new 
ValidationResult.Builder().subject(subject).input(value).valid(valid).explanation(explanation).build();
+        }
+    }
+
+    public static class DirectoryExistsValidator implements Validator {
+
+        private final boolean allowEL;
+        private final boolean create;
+
+        public DirectoryExistsValidator(final boolean allowExpressionLanguage, 
final boolean create) {
+            this.allowEL = allowExpressionLanguage;
+            this.create = create;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String 
value, final ValidationContext context) {
+            final String substituted;
+            if (allowEL) {
+                try {
+                    substituted = 
context.newPropertyValue(value).evaluateAttributeExpressions().getValue();
+                } catch (final Exception e) {
+                    return new 
ValidationResult.Builder().subject(subject).input(value).valid(false)
+                            .explanation("Not a valid Expression Language 
value: " + e.getMessage()).build();
+                }
+
+                if (substituted.trim().isEmpty() && !value.trim().isEmpty()) {
+                    // User specified an Expression and nothing more... assume 
valid.
+                    return new 
ValidationResult.Builder().subject(subject).input(value).valid(true).build();
+                }
+            } else {
+                substituted = value;
+            }
+
+            String reason = null;
+            try {
+                final File file = new File(substituted);
+                if (!file.exists()) {
+                    if (!create) {
+                        reason = "Directory does not exist";
+                    } else if (!file.mkdirs()) {
+                        reason = "Directory does not exist and could not be 
created";
+                    }
+                } else if (!file.isDirectory()) {
+                    reason = "Path does not point to a directory";
+                }
+            } catch (final Exception e) {
+                reason = "Value is not a valid directory name";
+            }
+
+            return new 
ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason
 == null).build();
+        }
+    }
+
+    public static Validator createControllerServiceExistsValidator(final 
Class<? extends ControllerService> serviceClass) {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                final ControllerService svc = 
context.getControllerServiceLookup().getControllerService(input);
+
+                if (svc == null) {
+                    return new 
ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("No
 Controller Service exists with this ID").build();
+                }
+
+                if (!serviceClass.isAssignableFrom(svc.getClass())) {
+                    return new 
ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller
 Service with this ID is of type " + svc.getClass().getName() + " but is 
expected to be of type " + serviceClass.getName()).build();
+                }
+
+                final ValidationContext serviceValidationContext = 
context.getControllerServiceValidationContext(svc);
+                final Collection<ValidationResult> serviceValidationResults = 
svc.validate(serviceValidationContext);
+                for (final ValidationResult result : serviceValidationResults) 
{
+                    if (!result.isValid()) {
+                        return new 
ValidationResult.Builder().valid(false).input(input).subject(subject).explanation("Controller
 Service " + input + " is not valid: " + result.getExplanation()).build();
+                    }
+                }
+
+                return new 
ValidationResult.Builder().input(input).subject(subject).valid(true).build();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java
----------------------------------------------------------------------
diff --git 
a/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java
 
b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java
new file mode 100644
index 0000000..359def2
--- /dev/null
+++ 
b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/TestFormatUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+
+import org.junit.Test;
+
+public class TestFormatUtils {
+
+    @Test
+    public void testParse() {
+        assertEquals(3, FormatUtils.getTimeDuration("3000 ms", 
TimeUnit.SECONDS));
+        assertEquals(3000, FormatUtils.getTimeDuration("3000 s", 
TimeUnit.SECONDS));
+        assertEquals(0, FormatUtils.getTimeDuration("999 millis", 
TimeUnit.SECONDS));
+        assertEquals(4L * 24L * 60L * 60L * 1000000000L, 
FormatUtils.getTimeDuration("4 days", TimeUnit.NANOSECONDS));
+        assertEquals(24, FormatUtils.getTimeDuration("1 DAY", TimeUnit.HOURS));
+        assertEquals(60, FormatUtils.getTimeDuration("1 hr", 
TimeUnit.MINUTES));
+        assertEquals(60, FormatUtils.getTimeDuration("1 Hrs", 
TimeUnit.MINUTES));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
----------------------------------------------------------------------
diff --git 
a/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
 
b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
new file mode 100644
index 0000000..2ae50c9
--- /dev/null
+++ 
b/commons/processor-utilities/src/test/java/org/apache/nifi/processor/util/TestStandardValidators.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import org.junit.Test;
+
+public class TestStandardValidators {
+
+    @Test
+    public void testTimePeriodValidator() {
+        Validator val = StandardValidators.createTimePeriodValidator(1L, 
TimeUnit.SECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        ValidationResult vr;
+
+        vr = val.validate("TimePeriodTest", "0 sense made", null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("TimePeriodTest", null, null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("TimePeriodTest", "0 secs", null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("TimePeriodTest", "999 millis", null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("TimePeriodTest", "999999999 nanos", null);
+        assertFalse(vr.isValid());
+
+        vr = val.validate("TimePeriodTest", "1 sec", null);
+        assertTrue(vr.isValid());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/pom.xml
----------------------------------------------------------------------
diff --git a/commons/remote-communications-utils/pom.xml 
b/commons/remote-communications-utils/pom.xml
new file mode 100644
index 0000000..5e5ebc1
--- /dev/null
+++ b/commons/remote-communications-utils/pom.xml
@@ -0,0 +1,29 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>remote-communications-utils</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>remote-communications-utils</name>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
----------------------------------------------------------------------
diff --git 
a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
new file mode 100644
index 0000000..77c34c9
--- /dev/null
+++ 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/StandardVersionNegotiator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class StandardVersionNegotiator implements VersionNegotiator {
+
+    private final List<Integer> versions;
+    private int curVersion;
+
+    public StandardVersionNegotiator(final int... supportedVersions) {
+        if (Objects.requireNonNull(supportedVersions).length == 0) {
+            throw new IllegalArgumentException("At least one version must be 
supported");
+        }
+
+        final List<Integer> supported = new ArrayList<>();
+        for (final int version : supportedVersions) {
+            supported.add(version);
+        }
+        this.versions = Collections.unmodifiableList(supported);
+        this.curVersion = supportedVersions[0];
+    }
+
+    @Override
+    public int getVersion() {
+        return curVersion;
+    }
+
+    @Override
+    public void setVersion(final int version) throws IllegalArgumentException {
+        if (!isVersionSupported(version)) {
+            throw new IllegalArgumentException("Version " + version + " is not 
supported");
+        }
+
+        this.curVersion = version;
+    }
+
+    @Override
+    public int getPreferredVersion() {
+        return versions.get(0);
+    }
+
+    @Override
+    public Integer getPreferredVersion(final int maxVersion) {
+        for (final Integer version : this.versions) {
+            if (maxVersion >= version) {
+                return version;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isVersionSupported(final int version) {
+        return versions.contains(version);
+    }
+
+    @Override
+    public List<Integer> getSupportedVersions() {
+        return versions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
----------------------------------------------------------------------
diff --git 
a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
new file mode 100644
index 0000000..74f9b3d
--- /dev/null
+++ 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/VersionNegotiator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.util.List;
+
+public interface VersionNegotiator {
+
+    /**
+     * @return the currently configured Version of this resource
+     */
+    int getVersion();
+
+    /**
+     * Sets the version of this resource to the specified version. Only the
+     * lower byte of the version is relevant.
+     *
+     * @param version
+     * @throws IllegalArgumentException if the given Version is not supported 
by
+     * this resource, as is indicated by the {@link #isVersionSupported(int)}
+     * method
+     */
+    void setVersion(int version) throws IllegalArgumentException;
+
+    /**
+     *
+     * @return the Version of this resource that is preferred
+     */
+    int getPreferredVersion();
+
+    /**
+     * Gets the preferred version of this resource that is no greater than the
+     * given maxVersion. If no acceptable version exists that is less than
+     * <code>maxVersion</code>, then <code>null</code> is returned
+     *
+     * @param maxVersion
+     * @return
+     */
+    Integer getPreferredVersion(int maxVersion);
+
+    /**
+     * Indicates whether or not the specified version is supported by this
+     * resource
+     *
+     * @param version
+     * @return
+     */
+    boolean isVersionSupported(int version);
+
+    List<Integer> getSupportedVersions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
----------------------------------------------------------------------
diff --git 
a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
new file mode 100644
index 0000000..05fd915
--- /dev/null
+++ 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/exception/TransmissionDisabledException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Indicates that the user disabled transmission while communications were
+ * taking place with a peer
+ */
+public class TransmissionDisabledException extends RuntimeException {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git 
a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
new file mode 100644
index 0000000..71cf894
--- /dev/null
+++ 
b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+
+public class CompressionInputStream extends InputStream {
+
+    private final InputStream in;
+    private final Inflater inflater;
+
+    private byte[] compressedBuffer;
+    private byte[] buffer;
+
+    private int bufferIndex;
+    private boolean eos = false;    // whether or not we've reached the end of 
stream
+    private boolean allDataRead = false;    // different from eos b/c eos 
means allDataRead == true && buffer is empty
+
+    private final byte[] fourByteBuffer = new byte[4];
+
+    public CompressionInputStream(final InputStream in) {
+        this.in = in;
+        inflater = new Inflater();
+
+        buffer = new byte[0];
+        compressedBuffer = new byte[0];
+        bufferIndex = 1;
+    }
+
+    private String toHex(final byte[] array) {
+        final StringBuilder sb = new StringBuilder("0x");
+        for (final byte b : array) {
+            final String hex = Integer.toHexString(b).toUpperCase();
+            if (hex.length() == 1) {
+                sb.append("0");
+            }
+            sb.append(hex);
+        }
+        return sb.toString();
+    }
+
+    protected void readChunkHeader() throws IOException {
+        // Ensure that we have a valid SYNC chunk
+        fillBuffer(fourByteBuffer);
+        if (!Arrays.equals(CompressionOutputStream.SYNC_BYTES, 
fourByteBuffer)) {
+            throw new IOException("Invalid CompressionInputStream. Expected 
first 4 bytes to be 'SYNC' but were " + toHex(fourByteBuffer));
+        }
+
+        // determine the size of the decompressed buffer
+        fillBuffer(fourByteBuffer);
+        buffer = new byte[toInt(fourByteBuffer)];
+
+        // determine the size of the compressed buffer
+        fillBuffer(fourByteBuffer);
+        compressedBuffer = new byte[toInt(fourByteBuffer)];
+
+        bufferIndex = buffer.length;   // indicate that buffer is empty
+    }
+
+    private int toInt(final byte[] data) {
+        return ((data[0] & 0xFF) << 24)
+                | ((data[1] & 0xFF) << 16)
+                | ((data[2] & 0xFF) << 8)
+                | (data[3] & 0xFF);
+    }
+
+    protected void bufferAndDecompress() throws IOException {
+        if (allDataRead) {
+            eos = true;
+            return;
+        }
+
+        readChunkHeader();
+        fillBuffer(compressedBuffer);
+
+        inflater.setInput(compressedBuffer);
+        try {
+            inflater.inflate(buffer);
+        } catch (final DataFormatException e) {
+            throw new IOException(e);
+        }
+        inflater.reset();
+
+        bufferIndex = 0;
+        final int moreDataByte = in.read();
+        if (moreDataByte < 1) {
+            allDataRead = true;
+        } else if (moreDataByte > 1) {
+            throw new IOException("Expected indicator of whether or not more 
data was to come (-1, 0, or 1) but got " + moreDataByte);
+        }
+    }
+
+    private void fillBuffer(final byte[] buffer) throws IOException {
+        int len;
+        int bytesLeft = buffer.length;
+        int bytesRead = 0;
+        while (bytesLeft > 0 && (len = in.read(buffer, bytesRead, bytesLeft)) 
> 0) {
+            bytesLeft -= len;
+            bytesRead += len;
+        }
+
+        if (bytesRead < buffer.length) {
+            throw new EOFException();
+        }
+    }
+
+    private boolean isBufferEmpty() {
+        return bufferIndex >= buffer.length;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        return buffer[bufferIndex++];
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        if (eos) {
+            return -1;
+        }
+
+        if (isBufferEmpty()) {
+            bufferAndDecompress();
+        }
+
+        if (isBufferEmpty()) {
+            eos = true;
+            return -1;
+        }
+
+        final int free = buffer.length - bufferIndex;
+        final int bytesToTransfer = Math.min(len, free);
+        System.arraycopy(buffer, bufferIndex, b, off, bytesToTransfer);
+        bufferIndex += bytesToTransfer;
+
+        return bytesToTransfer;
+    }
+
+    /**
+     * Does nothing. Does NOT close underlying InputStream
+     * @throws java.io.IOException
+     */
+    @Override
+    public void close() throws IOException {
+
+    }
+}

Reply via email to