http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
index 1d52b5c..6901b4f 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
@@ -18,11 +18,20 @@
 
 package org.apache.streams.components.http.processor;
 
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.ActivityObject;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -32,14 +41,6 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -244,7 +245,7 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
   public HttpPost prepareHttpPost(URI uri, HttpEntity entity) {
     HttpPost httpPost = new HttpPost(uri);
     httpPost.addHeader("content-type", this.configuration.getContentType());
-    if ( !Strings.isNullOrEmpty(authHeader)) {
+    if (StringUtils.isNotBlank(authHeader)) {
       httpPost.addHeader("Authorization", String.format("Basic %s", 
authHeader));
     }
     httpPost.setEntity(entity);
@@ -261,11 +262,11 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
         .setHost(this.configuration.getHostname())
         .setPath(this.configuration.getResourcePath());
 
-    if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) {
+    if (StringUtils.isNotBlank(configuration.getAccessToken()) ) {
       uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
     }
-    if ( !Strings.isNullOrEmpty(configuration.getUsername())
-        && !Strings.isNullOrEmpty(configuration.getPassword())) {
+    if (StringUtils.isNotBlank(configuration.getUsername())
+        && StringUtils.isNotBlank(configuration.getPassword())) {
       String string = configuration.getUsername() + ":" + 
configuration.getPassword();
       authHeader = Base64.encodeBase64String(string.getBytes());
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
index ab11a68..e46342e 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
@@ -18,12 +18,19 @@
 
 package org.apache.streams.components.http.provider;
 
+import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -37,13 +44,6 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpProviderConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +60,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -236,7 +237,7 @@ public class SimpleHttpProvider implements StreamsProvider {
 
   protected List<ObjectNode> execute(URI uri) {
 
-    Preconditions.checkNotNull(uri);
+    Objects.requireNonNull(uri);
 
     List<ObjectNode> results = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
 
b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
index 2333c4b..a74e6bc 100644
--- 
a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
+++ 
b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
@@ -23,7 +23,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.impl.client.CloseableHttpClient;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
 
b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
index 5eea60e..ce88a23 100644
--- 
a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
+++ 
b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
@@ -21,7 +21,6 @@ package org.apache.streams.config;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
index 319b32a..83e98b5 100644
--- 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
+++ 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
 
b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
index 82cc6bc..80fa7de 100644
--- 
a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
+++ 
b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
@@ -18,13 +18,14 @@
 
 package org.apache.streams.config.test;
 
+import org.apache.streams.config.ComponentConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
-import org.apache.streams.config.ComponentConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
 
b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
index a29d8c7..64010c1 100644
--- 
a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
+++ 
b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
@@ -18,34 +18,19 @@
 
 package org.apache.streams.config.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.streams.config.ComponentConfiguration;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintStream;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Scanner;
-
 /**
- * Test for
- * @see {@link org.apache.streams.config.StreamsConfigurator}
+ * Test for {@link org.apache.streams.config.StreamsConfigurator}
  */
 public class StreamsConfiguratorTest {
 
-    private static final ObjectMapper mapper = new ObjectMapper();
-
     @Test
     public void testDetectConfiguration() throws Exception {
 
@@ -53,7 +38,7 @@ public class StreamsConfiguratorTest {
 
         Config detected = StreamsConfigurator.getConfig();
 
-        junit.framework.Assert.assertEquals(config, detected);
+        Assert.assertEquals(config, detected);
 
         StreamsConfiguration defaultPojo = 
StreamsConfigurator.detectConfiguration();
 
@@ -63,7 +48,7 @@ public class StreamsConfiguratorTest {
 
         assert(configuredPojo != null);
 
-        junit.framework.Assert.assertEquals(configuredPojo, defaultPojo);
+        Assert.assertEquals(configuredPojo, defaultPojo);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
index e3bfe70..42cce8a 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -37,7 +37,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
 import com.typesafe.config.Config;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +79,7 @@ public class KinesisPersistReader implements 
StreamsPersistReader, Serializable
   public KinesisPersistReader() {
     Config config = StreamsConfigurator.config.getConfig("kinesis");
     this.config = new 
ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
-    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
   /**
@@ -88,7 +87,7 @@ public class KinesisPersistReader implements 
StreamsPersistReader, Serializable
    */
   public KinesisPersistReader(KinesisReaderConfiguration config) {
     this.config = config;
-    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
   public void setConfig(KinesisReaderConfiguration config) {
@@ -176,7 +175,7 @@ public class KinesisPersistReader implements 
StreamsPersistReader, Serializable
     while( !executor.isTerminated()) {
       try {
         executor.awaitTermination(5, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException ignored) {}
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
index a93fda8..37891be 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -27,7 +27,6 @@ import 
com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
 import com.google.common.collect.Maps;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
index b61a364..6e2db0f 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -37,7 +37,6 @@ import com.amazonaws.services.kinesis.model.PutRecordResult;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +76,7 @@ public class KinesisPersistWriter implements 
StreamsPersistWriter {
   public KinesisPersistWriter() {
     Config config = StreamsConfigurator.config.getConfig("kinesis");
     this.config = new 
ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
-    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
   /**
@@ -85,7 +84,7 @@ public class KinesisPersistWriter implements 
StreamsPersistWriter {
    */
   public KinesisPersistWriter(KinesisWriterConfiguration config) {
     this.config = config;
-    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
   }
 
   public void setConfig(KinesisWriterConfiguration config) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index f34782a..e70c361 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -20,7 +20,6 @@ package org.apache.streams.s3;
 
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index e8ca0c7..87bd8c8 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -22,7 +22,6 @@ import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.Upload;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 753b439..da1a00e 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -39,7 +39,6 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,15 +146,15 @@ public class S3PersistReader implements 
StreamsPersistReader, DatumStatusCountab
 
     ObjectListing listing = this.amazonS3Client.listObjects(request);
 
-    this.files = new ArrayList<String>();
+    this.files = new ArrayList<>();
 
-    /**
+    /*
      * If you can list files that are in this path, then you must be dealing 
with a directory
      * if you cannot list files that are in this path, then you are most 
likely dealing with
      * a simple file.
      */
-    boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true 
: false;
-    boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? 
true : false;
+    boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0;
+    boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0;
 
     if (hasCommonPrefixes || hasObjectSummaries) {
       // Handle the 'directory' use case

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index f0e9626..82bcba7 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.util.ComponentUtils;
 
 import com.google.common.base.Strings;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +54,7 @@ public class S3PersistReaderTask implements Runnable {
       BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(is));
       LOGGER.info("Reading: {} ", file);
 
-      String line = "";
+      String line;
       try {
         while ((line = bufferedReader.readLine()) != null) {
           if ( !Strings.isNullOrEmpty(line) ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index ef6e831..34e4b67 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -39,7 +39,6 @@ import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +69,7 @@ public class S3PersistWriter implements StreamsPersistWriter, 
DatumStatusCountab
   private ObjectMapper objectMapper;
   private AmazonS3Client amazonS3Client;
   private S3WriterConfiguration s3WriterConfiguration;
-  private final List<String> writtenFiles = new ArrayList<String>();
+  private final List<String> writtenFiles = new ArrayList<>();
   protected LineReadWriteUtil lineWriterUtil;
 
   private final AtomicLong totalBytesWritten = new AtomicLong();
@@ -79,7 +78,7 @@ public class S3PersistWriter implements StreamsPersistWriter, 
DatumStatusCountab
   private final AtomicInteger totalRecordsWritten = new AtomicInteger();
   private AtomicInteger fileLineCounter = new AtomicInteger();
 
-  private static Map<String, String> objectMetaData = new HashMap<String, 
String>();
+  private static Map<String, String> objectMetaData = new HashMap<>();
 
   static {
     objectMetaData.put("line[0]", "id");
@@ -103,7 +102,7 @@ public class S3PersistWriter implements 
StreamsPersistWriter, DatumStatusCountab
   }
 
   public Map<String, String> getObjectMetaData() {
-    return this.objectMetaData;
+    return objectMetaData;
   }
 
   public ObjectMapper getObjectMapper() {
@@ -115,7 +114,7 @@ public class S3PersistWriter implements 
StreamsPersistWriter, DatumStatusCountab
   }
 
   public void setObjectMetaData(Map<String, String> val) {
-    this.objectMetaData = val;
+    objectMetaData = val;
   }
 
   public S3PersistWriter() {
@@ -203,7 +202,7 @@ public class S3PersistWriter implements 
StreamsPersistWriter, DatumStatusCountab
           this.s3WriterConfiguration.getBucket(),
           this.s3WriterConfiguration.getWriterPath(),
           fileName,
-          this.objectMetaData);
+          objectMetaData);
 
       // reset the counter
       this.fileLineCounter = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
 
b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 6358071..dfbd91e 100644
--- 
a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ 
b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -24,12 +24,11 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.PrintStream;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -42,14 +41,14 @@ public class ConsolePersistWriter implements 
StreamsPersistWriter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsolePersistWriter.class);
 
-  protected PrintStream printStream = System.out;
+  private PrintStream printStream = System.out;
 
   protected volatile Queue<StreamsDatum> persistQueue;
 
   private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
   public ConsolePersistWriter() {
-    this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    this.persistQueue = new ConcurrentLinkedQueue<>();
   }
 
   public ConsolePersistWriter(PrintStream printStream) {
@@ -63,7 +62,7 @@ public class ConsolePersistWriter implements 
StreamsPersistWriter {
   }
 
   public void prepare(Object configuration) {
-    Preconditions.checkNotNull(persistQueue);
+    Objects.requireNonNull(persistQueue);
   }
 
   public void cleanUp() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index bdff9aa..f488263 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -19,7 +19,6 @@
 package org.apache.streams.elasticsearch;
 
 import com.google.common.net.InetAddresses;
-
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang.builder.ToStringBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 789b62f..3c4ebc9 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -21,20 +21,19 @@ package org.apache.streams.elasticsearch;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 
-import com.google.common.base.Preconditions;
-
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * ElasticsearchPersistDeleter deletes documents from elasticsearch.
  */
 public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter 
implements StreamsPersistWriter {
 
-  public static final String STREAMS_ID = 
ElasticsearchPersistDeleter.class.getCanonicalName();
+  private static final String STREAMS_ID = 
ElasticsearchPersistDeleter.class.getCanonicalName();
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
 
@@ -84,9 +83,9 @@ public class ElasticsearchPersistDeleter extends 
ElasticsearchPersistWriter impl
   public void delete(String index, String type, String id) {
     DeleteRequest deleteRequest;
 
-    Preconditions.checkNotNull(index);
-    Preconditions.checkNotNull(id);
-    Preconditions.checkNotNull(type);
+    Objects.requireNonNull(index);
+    Objects.requireNonNull(id);
+    Objects.requireNonNull(type);
 
     // They didn't specify an ID, so we will create one for them.
     deleteRequest = new DeleteRequest()
@@ -104,8 +103,8 @@ public class ElasticsearchPersistDeleter extends 
ElasticsearchPersistWriter impl
    */
   public void add(DeleteRequest request) {
 
-    Preconditions.checkNotNull(request);
-    Preconditions.checkNotNull(request.index());
+    Objects.requireNonNull(request);
+    Objects.requireNonNull(request.index());
 
     // If our queue is larger than our flush threshold, then we should flush 
the queue.
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 388497e..27285f9 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -27,7 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Queues;
-
 import org.elasticsearch.search.SearchHit;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -200,7 +199,7 @@ public class ElasticsearchPersistReader implements 
StreamsPersistReader, Seriali
       StreamsDatum item;
       while (query.hasNext()) {
         SearchHit hit = query.next();
-        ObjectNode jsonObject = null;
+        ObjectNode jsonObject;
         try {
           jsonObject = mapper.readValue(hit.getSourceAsString(), 
ObjectNode.class);
           item = new StreamsDatum(jsonObject, hit.getId());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index f4da436..c8b95d6 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -21,21 +21,20 @@ package org.apache.streams.elasticsearch;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
+import org.apache.commons.lang3.StringUtils;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * ElasticsearchPersistUpdater updates documents to elasticsearch.
  */
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter 
implements StreamsPersistWriter {
 
-  public static final String STREAMS_ID = 
ElasticsearchPersistUpdater.class.getCanonicalName();
+  private static final String STREAMS_ID = 
ElasticsearchPersistUpdater.class.getCanonicalName();
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
 
@@ -97,8 +96,8 @@ public class ElasticsearchPersistUpdater extends 
ElasticsearchPersistWriter impl
   public void update(String indexName, String type, String id, String parent, 
String routing, String json) {
     UpdateRequest updateRequest;
 
-    Preconditions.checkNotNull(id);
-    Preconditions.checkNotNull(json);
+    Objects.requireNonNull(id);
+    Objects.requireNonNull(json);
 
     // They didn't specify an ID, so we will create one for them.
     updateRequest = new UpdateRequest()
@@ -107,11 +106,11 @@ public class ElasticsearchPersistUpdater extends 
ElasticsearchPersistWriter impl
         .id(id)
         .doc(json);
 
-    if (!Strings.isNullOrEmpty(parent)) {
+    if (StringUtils.isNotBlank(parent)) {
       updateRequest = updateRequest.parent(parent);
     }
 
-    if (!Strings.isNullOrEmpty(routing)) {
+    if (StringUtils.isNotBlank(routing)) {
       updateRequest = updateRequest.routing(routing);
     }
 
@@ -128,8 +127,8 @@ public class ElasticsearchPersistUpdater extends 
ElasticsearchPersistWriter impl
    */
   public void add(UpdateRequest request) {
 
-    Preconditions.checkNotNull(request);
-    Preconditions.checkNotNull(request.index());
+    Objects.requireNonNull(request);
+    Objects.requireNonNull(request.index());
 
     // If our queue is larger than our flush threshold, then we should flush 
the queue.
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 07ab734..bc60197 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -27,8 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -52,6 +50,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -425,9 +424,9 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Seriali
   public void add(String indexName, String type, String id, String parent, 
String routing, String ts, String json) {
 
     // make sure that these are not null
-    Preconditions.checkNotNull(indexName);
-    Preconditions.checkNotNull(type);
-    Preconditions.checkNotNull(json);
+    Objects.requireNonNull(indexName);
+    Objects.requireNonNull(type);
+    Objects.requireNonNull(json);
 
     IndexRequestBuilder indexRequestBuilder = manager.getClient()
         .prepareIndex(indexName, type)
@@ -451,8 +450,8 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Seriali
 
   protected void add(IndexRequest request) {
 
-    Preconditions.checkNotNull(request);
-    Preconditions.checkNotNull(request.index());
+    Objects.requireNonNull(request);
+    Objects.requireNonNull(request.index());
 
     // If our queue is larger than our flush threshold, then we should flush 
the queue.
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 06a6dc8..a77101b 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -23,7 +23,6 @@ import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 6ce15d4..28f8b38 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.typesafe.config.Config;
-
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index bef190e..cc35b05 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -27,7 +27,6 @@ import 
org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 
 import com.typesafe.config.Config;
-
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index 2a64fbc..105c453 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
index 721ad42..e5efbb9 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
@@ -28,7 +28,6 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index 69394ee..ab6341c 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -32,9 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -53,9 +50,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 
@@ -124,7 +122,7 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
   @Override
   public List<StreamsDatum> process(StreamsDatum entry) {
 
-    List<StreamsDatum> result = Lists.newArrayList();
+    List<StreamsDatum> result = new ArrayList<>();
 
     String json;
     ObjectNode node;
@@ -172,9 +170,8 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
 
     ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
-    Iterator<PercolateResponse.Match> matchIterator = response.iterator();
-    while (matchIterator.hasNext()) {
-      tagArray.add(matchIterator.next().getId().string());
+    for (PercolateResponse.Match aResponse : response) {
+      tagArray.add(aResponse.getId().string());
     }
 
     LOGGER.trace("Percolate matches: {}", tagArray);
@@ -202,14 +199,14 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
 
     mapper = StreamsJacksonMapper.getInstance();
 
-    Preconditions.checkNotNull(config);
+    Objects.requireNonNull(config);
 
     manager = new ElasticsearchClientManager(config);
 
     if ( config.getTags() != null && 
config.getTags().getAdditionalProperties().size() > 0) {
       // initial write tags to index
       createIndexIfMissing(config.getIndex());
-      if ( config.getReplaceTags() == true ) {
+      if (config.getReplaceTags()) {
         deleteOldQueries(config.getIndex());
       }
       for (String tag : config.getTags().getAdditionalProperties().keySet()) {
@@ -219,7 +216,7 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
       }
       bulkBuilder = manager.getClient().prepareBulk();
 
-      if (writePercolateRules() == true) {
+      if (writePercolateRules()) {
         LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + 
config.getIndex() + " _percolator");
       } else {
         LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " 
tags to " + config.getIndex() + " _percolator");
@@ -230,7 +227,7 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
 
   @Override
   public void cleanUp() {
-    if ( config.getCleanupTags() == true ) {
+    if (config.getCleanupTags()) {
       deleteOldQueries(config.getIndex());
     }
     manager.getClient().close();
@@ -311,7 +308,7 @@ public class PercolateTagProcessor implements 
StreamsProcessor {
    * @return result
    */
   public Set<String> getActivePercolateTags(String index) {
-    Set<String> tags = new HashSet<String>();
+    Set<String> tags = new HashSet<>();
     SearchRequestBuilder searchBuilder = 
manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
     SearchResponse response = 
searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
     SearchHits hits = response.getHits();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
index caa0b8d..91afef6 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
@@ -24,11 +24,9 @@ import 
org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
 
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.apache.commons.lang.SerializationUtils;
 import org.elasticsearch.client.Client;
 import org.junit.Assert;
@@ -36,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -70,7 +69,7 @@ public class DatumFromMetadataProcessorIT {
   @Test
   public void testDatumFromMetadataProcessor() {
 
-    Map<String, Object> metadata = Maps.newHashMap();
+    Map<String, Object> metadata = new HashMap<>();
 
     metadata.put("index", testConfiguration.getIndexes().get(0));
     metadata.put("type", testConfiguration.getTypes().get(0));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
index 553a711..d3b1fd3 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
@@ -18,34 +18,26 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import org.elasticsearch.action.count.CountRequest;
-import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
@@ -62,14 +54,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
-import java.net.URL;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
index 6b52ce5..d35b8a2 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
@@ -18,14 +18,6 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
@@ -34,6 +26,15 @@ import 
org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -61,7 +62,7 @@ import java.net.URL;
 import java.util.List;
 import java.util.Set;
 
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -104,7 +105,7 @@ public class ElasticsearchParentChildWriterIT {
             DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
             DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
             assertTrue(deleteIndexResponse.isAcknowledged());
-        };
+        }
 
         PutIndexTemplateRequestBuilder putTemplateRequestBuilder = 
testClient.admin().indices().preparePutTemplate("mappings");
         URL templateURL = 
ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json");
@@ -133,7 +134,7 @@ public class ElasticsearchParentChildWriterIT {
         if(indicesExistsResponse.isExists()) {
             DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
             DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-        };
+        }
 
         ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
         testPersistWriter.prepare(null);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
index e356aff..603ec5f 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
@@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -53,7 +52,7 @@ import java.io.File;
 import java.io.InputStream;
 import java.util.List;
 
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -120,7 +119,7 @@ public class ElasticsearchPersistUpdaterIT {
       Activity update = new Activity();
       update.setAdditionalProperty("updated", Boolean.TRUE);
       update.setAdditionalProperty("str", "str");
-      update.setAdditionalProperty("long", 10l);
+      update.setAdditionalProperty("long", 10L);
       update.setActor(
           new ActivityObject()
               .withAdditionalProperty("updated", Boolean.TRUE)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
index f290971..3fec9f3 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -53,7 +52,7 @@ import java.io.File;
 import java.io.InputStream;
 import java.util.List;
 
-import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -91,7 +90,7 @@ public class ElasticsearchPersistWriterIT {
       DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
       DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
       assertTrue(deleteIndexResponse.isAcknowledged());
-    };
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
index 8900ef3..3e9f3e1 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java
@@ -18,24 +18,17 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.TimeUnit;
-
-/**
- * Created by sblackmon on 10/20/14.
- */
 public class TestDocumentToMetadataProcessor {
 
     private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
index 76f10b1..7bd4341 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.ActivityObject;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
-
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.SerializationUtils;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
index b921ba5..32d6032 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
@@ -26,9 +26,7 @@ import org.apache.streams.core.StreamsResultSet;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
 import com.squareup.tape.QueueFile;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +38,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -50,11 +49,11 @@ import java.util.concurrent.Executors;
  */
 public class FileBufferPersistReader implements StreamsPersistReader, 
Serializable {
 
-  public static final String STREAMS_ID = "FileBufferPersistReader";
+  private static final String STREAMS_ID = "FileBufferPersistReader";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistReader.class);
 
-  protected volatile Queue<StreamsDatum> persistQueue;
+  private volatile Queue<StreamsDatum> persistQueue;
 
   private ObjectMapper mapper;
 
@@ -109,7 +108,7 @@ public class FileBufferPersistReader implements 
StreamsPersistReader, Serializab
     }
 
     StreamsResultSet current;
-    current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+    current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue));
     persistQueue.clear();
 
     return current;
@@ -164,7 +163,7 @@ public class FileBufferPersistReader implements 
StreamsPersistReader, Serializab
       LOGGER.error(ex.getMessage());
     }
 
-    Preconditions.checkNotNull(queueFile);
+    Objects.requireNonNull(queueFile);
 
     this.persistQueue = new ConcurrentLinkedQueue<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
index 76dfafc..a4348f6 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
@@ -26,15 +26,15 @@ import org.apache.streams.util.GuidUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.squareup.tape.QueueFile;
-
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -43,11 +43,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class FileBufferPersistWriter implements StreamsPersistWriter, 
Serializable {
 
-  public static final String STREAMS_ID = "FileBufferPersistWriter";
+  private static final String STREAMS_ID = "FileBufferPersistWriter";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistWriter.class);
 
-  protected volatile Queue<StreamsDatum> persistQueue;
+  private volatile Queue<StreamsDatum> persistQueue;
 
   private ObjectMapper mapper;
 
@@ -74,9 +74,9 @@ public class FileBufferPersistWriter implements 
StreamsPersistWriter, Serializab
 
     String key = entry.getId() != null ? entry.getId() : 
GuidUtils.generateGuid("filewriter");
 
-    Preconditions.checkArgument(!Strings.isNullOrEmpty(key));
+    Preconditions.checkArgument(StringUtils.isNotBlank(key));
     Preconditions.checkArgument(entry.getDocument() instanceof String);
-    Preconditions.checkArgument(!Strings.isNullOrEmpty((String) 
entry.getDocument()));
+    Preconditions.checkArgument(StringUtils.isNotBlank((String) 
entry.getDocument()));
 
     byte[] item = ((String)entry.getDocument()).getBytes();
     try {
@@ -102,7 +102,7 @@ public class FileBufferPersistWriter implements 
StreamsPersistWriter, Serializab
     Preconditions.checkArgument(file.exists());
     Preconditions.checkArgument(file.canWrite());
 
-    Preconditions.checkNotNull(queueFile);
+    Objects.requireNonNull(queueFile);
 
     this.persistQueue  = new ConcurrentLinkedQueue<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java
 
b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java
index 7c3e019..de8419d 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java
@@ -18,9 +18,6 @@
 
 package org.apache.streams.filebuffer.test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
 import org.apache.streams.console.ConsolePersistReader;
 import org.apache.streams.console.ConsolePersistWriter;
 import org.apache.streams.core.StreamBuilder;
@@ -30,19 +27,18 @@ import 
org.apache.streams.filebuffer.FileBufferConfiguration;
 import org.apache.streams.filebuffer.FileBufferPersistReader;
 import org.apache.streams.filebuffer.FileBufferPersistWriter;
 import org.apache.streams.local.builders.LocalStreamBuilder;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 
-import static org.mockito.Mockito.mock;
-
 import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
-
-import static org.mockito.Mockito.mock;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * Tests {@link org.apache.streams.filebuffer.FileBufferPersistWriter }
@@ -52,12 +48,12 @@ public class FileBufferPersistIT {
 
     private FileBufferConfiguration testConfiguration;
 
-    ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class);
-    ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class);
+    private ConsolePersistReader reader = 
Mockito.mock(ConsolePersistReader.class);
+    private ConsolePersistWriter writer = 
Mockito.mock(ConsolePersistWriter.class);
 
-    StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}");
-    StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}");
-    StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}");
+    private StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}");
+    private StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}");
+    private StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}");
 
     @Before
     public void prepareTest() {
@@ -71,8 +67,8 @@ public class FileBufferPersistIT {
 
         PowerMockito.when(reader.readCurrent())
                 .thenReturn(
-                        new StreamsResultSet(Queues.newConcurrentLinkedQueue(
-                                Lists.newArrayList(testDatum1, testDatum2, 
testDatum3)))
+                        new StreamsResultSet(new ConcurrentLinkedQueue<>(
+                            Arrays.asList(testDatum1, testDatum2, testDatum3)))
                 ).thenReturn(null);
     }
 
@@ -81,7 +77,7 @@ public class FileBufferPersistIT {
 
         assert(testConfiguration != null);
 
-        Map<String, Object> streamConfig = Maps.newHashMap();
+        Map<String, Object> streamConfig = new HashMap<>();
         streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000);
 
         StreamBuilder builder = new LocalStreamBuilder(1, streamConfig);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java
 
b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java
index 728e368..391e445 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java
@@ -23,6 +23,7 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.filebuffer.FileBufferConfiguration;
 import org.apache.streams.filebuffer.FileBufferPersistReader;
 import org.apache.streams.filebuffer.FileBufferPersistWriter;
+
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
index fee2f6f..614dc95 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java
@@ -18,24 +18,25 @@
 
 package org.apache.streams.graph;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 /**
  * Interface for methods allowing persistance to a graph database wrapped with
  * a rest API.  CypherGraphHelper is a good example, for neo4j.
  */
 public interface GraphHelper {
 
-    public ObjectNode getVertexRequest(String streamsId);
+    ObjectNode getVertexRequest(String streamsId);
 
-    public ObjectNode getVertexRequest(Long vertexId);
+    ObjectNode getVertexRequest(Long vertexId);
 
-    public ObjectNode createVertexRequest(ActivityObject activityObject);
+    ObjectNode createVertexRequest(ActivityObject activityObject);
 
-    public ObjectNode mergeVertexRequest(ActivityObject activityObject);
+    ObjectNode mergeVertexRequest(ActivityObject activityObject);
 
-    public ObjectNode createEdgeRequest(Activity activity, ActivityObject 
source, ActivityObject destination);
+    ObjectNode createEdgeRequest(Activity activity, ActivityObject source, 
ActivityObject destination);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
index 847328a..5b2dec6 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -34,9 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -45,7 +43,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -60,10 +60,10 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GraphHttpPersistWriter.class);
   private static final long MAX_WRITE_LATENCY = 1000;
 
-  protected GraphHttpConfiguration configuration;
+  private GraphHttpConfiguration configuration;
 
-  protected QueryGraphHelper queryGraphHelper;
-  protected HttpGraphHelper httpGraphHelper;
+  private QueryGraphHelper queryGraphHelper;
+  private HttpGraphHelper httpGraphHelper;
 
   private static ObjectMapper mapper;
 
@@ -73,7 +73,7 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
    * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from 
JVM 'graph'.
    */
   public GraphHttpPersistWriter() {
-    this(new 
ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    this(new 
ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
   }
 
   /**
@@ -94,7 +94,7 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
   protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
 
     Activity activity = null;
-    ActivityObject activityObject = null;
+    ActivityObject activityObject;
     Object document = entry.getDocument();
 
     if (document instanceof Activity) {
@@ -137,7 +137,7 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
 
     // always add vertices first
 
-    List<String> labels = Lists.newArrayList("streams");
+    List<String> labels = Collections.singletonList("streams");
 
     if ( activityObject != null ) {
       if ( activityObject.getObjectType() != null ) {
@@ -151,20 +151,17 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
       ActivityObject actor = activity.getActor();
       Provider provider = activity.getProvider();
 
-      if ( provider != null
-          && !Strings.isNullOrEmpty(provider.getId()) ) {
+      if (provider != null && StringUtils.isNotBlank(provider.getId())) {
         labels.add(provider.getId());
       }
-      if (actor != null
-          && !Strings.isNullOrEmpty(actor.getId())) {
+      if (actor != null && StringUtils.isNotBlank(actor.getId())) {
         if (actor.getObjectType() != null) {
           labels.add(actor.getObjectType());
         }
         
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
       }
 
-      if (activityObject != null
-          && !Strings.isNullOrEmpty(activityObject.getId())) {
+      if (activityObject != null && 
StringUtils.isNotBlank(activityObject.getId())) {
         if (activityObject.getObjectType() != null) {
           labels.add(activityObject.getObjectType());
         }
@@ -173,7 +170,7 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
 
       // then add edge
 
-      if (!Strings.isNullOrEmpty(activity.getVerb())) {
+      if (StringUtils.isNotBlank(activity.getVerb())) {
         
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
       }
     }
@@ -186,7 +183,7 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
   @Override
   protected ObjectNode executePost(HttpPost httpPost) {
 
-    Preconditions.checkNotNull(httpPost);
+    Objects.requireNonNull(httpPost);
 
     ObjectNode result = null;
 
@@ -239,8 +236,8 @@ public class GraphHttpPersistWriter extends 
SimpleHTTPPostPersistWriter {
       httpGraphHelper = new Neo4jHttpGraphHelper();
     }
 
-    Preconditions.checkNotNull(queryGraphHelper);
-    Preconditions.checkNotNull(httpGraphHelper);
+    Objects.requireNonNull(queryGraphHelper);
+    Objects.requireNonNull(httpGraphHelper);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
index 7c6e341..9560083 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
@@ -33,11 +33,10 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -58,7 +57,7 @@ public class GraphVertexReader extends SimpleHttpProvider 
implements StreamsPers
    * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 
'graph'.
    */
   public GraphVertexReader() {
-    this(new 
ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+    this(new 
ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
   }
 
   /**
@@ -83,7 +82,7 @@ public class GraphVertexReader extends SimpleHttpProvider 
implements StreamsPers
    * @return result
    */
   public List<ObjectNode> parse(JsonNode jsonNode) {
-    List<ObjectNode> results = Lists.newArrayList();
+    List<ObjectNode> results = new ArrayList<>();
 
     ObjectNode root = (ObjectNode) jsonNode;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
index 17b8840..ca1f4e4 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -19,7 +19,6 @@
 package org.apache.streams.graph;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.javatuples.Pair;
 
 import java.util.Map;
@@ -30,6 +29,6 @@ import java.util.Map;
  */
 public interface HttpGraphHelper {
 
-  public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters);
+  ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters);
 
 }

Reply via email to