http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
index e322990..71cb055 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -27,17 +27,17 @@ import org.apache.streams.pojo.json.ActivityObject;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.javatuples.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Supporting class for interacting with neo4j via rest API
@@ -48,10 +48,10 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
 
-  public static final String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
-  public static final String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
+  private static final String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
+  private static final String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
 
-  public static final String createVertexStatementTemplate =
+  private static final String createVertexStatementTemplate =
       "MATCH (x {id: '<id>'}) "
           + "CREATE UNIQUE (v:<type> { props }) "
           + "ON CREATE SET v <labels> "
@@ -59,13 +59,13 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
 
 
 
-  public static final String mergeVertexStatementTemplate =
+  private static final String mergeVertexStatementTemplate =
       "MERGE (v:<type> {id: '<id>'}) "
           + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
           + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
           + "RETURN v";
 
-  public static final String createEdgeStatementTemplate =
+  private static final String createEdgeStatementTemplate =
       "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "
           + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "
           + "RETURN r";
@@ -113,7 +113,7 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
    */
   public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject 
activityObject) {
 
-    Preconditions.checkNotNull(activityObject.getObjectType());
+    Objects.requireNonNull(activityObject.getObjectType());
 
     List<String> labels = getLabels(activityObject);
 
@@ -144,9 +144,9 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
    */
   public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject) {
 
-    Preconditions.checkNotNull(activityObject.getObjectType());
+    Objects.requireNonNull(activityObject.getObjectType());
 
-    Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
 
     List<String> labels = getLabels(activityObject);
 
@@ -176,7 +176,7 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
    */
   public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity) {
 
-    Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+    Pair queryPlusParameters = new Pair(null, new HashMap<>());
 
     ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
     Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
@@ -213,7 +213,7 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
   public static String getPropertyCreater(Map<String, Object> map) {
     StringBuilder builder = new StringBuilder();
     builder.append("{");
-    List<String> parts = Lists.newArrayList();
+    List<String> parts = new ArrayList<>();
     for ( Map.Entry<String, Object> entry : map.entrySet()) {
       if ( entry.getValue() instanceof String ) {
         String propVal = (String) (entry.getValue());
@@ -226,7 +226,7 @@ public class CypherQueryGraphHelper implements 
QueryGraphHelper {
   }
 
   private List<String> getLabels(ActivityObject activityObject) {
-    List<String> labels = Lists.newArrayList(":streams");
+    List<String> labels = Collections.singletonList(":streams");
     if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
       List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
       for ( String extraLabel : extraLabels ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
index 72e668f..9f47058 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
@@ -23,13 +23,12 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-
 import org.javatuples.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Supporting class for interacting with neo4j via rest API.
@@ -40,9 +39,9 @@ public class Neo4jHttpGraphHelper implements HttpGraphHelper {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
 
-  public static final String statementKey = "statement";
-  public static final String paramsKey = "parameters";
-  public static final String propsKey = "props";
+  private static final String statementKey = "statement";
+  private static final String paramsKey = "parameters";
+  private static final String propsKey = "props";
 
   /**
    * createHttpRequest neo4j rest json payload.
@@ -54,9 +53,9 @@ public class Neo4jHttpGraphHelper implements HttpGraphHelper {
 
     LOGGER.debug("createHttpRequest: ", queryPlusParameters);
 
-    Preconditions.checkNotNull(queryPlusParameters);
-    Preconditions.checkNotNull(queryPlusParameters.getValue0());
-    Preconditions.checkNotNull(queryPlusParameters.getValue1());
+    Objects.requireNonNull(queryPlusParameters);
+    Objects.requireNonNull(queryPlusParameters.getValue0());
+    Objects.requireNonNull(queryPlusParameters.getValue1());
 
     ObjectNode request = MAPPER.createObjectNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
index 673b402..24ddd65 100644
--- 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
+++ 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
@@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.commons.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,8 +37,7 @@ import java.io.InputStream;
 import java.util.List;
 
 /**
- * Unit test for
- * @see {@link org.apache.streams.graph.GraphVertexReader}
+ * Unit test for {@link org.apache.streams.graph.GraphVertexReader}
  *
  * Test that graph db responses can be converted to streams data.
  */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
index c5a06fc..6102db0 100644
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
+++ 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
@@ -26,7 +26,6 @@ import org.apache.streams.util.GuidUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HConnection;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index d254abb..64b7284 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -30,15 +30,15 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -49,6 +49,7 @@ import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
@@ -62,7 +63,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 public class WebHdfsPersistReader implements StreamsPersistReader, 
DatumStatusCountable {
 
-  public static final String STREAMS_ID = "WebHdfsPersistReader";
+  private static final String STREAMS_ID = "WebHdfsPersistReader";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WebHdfsPersistReader.class);
 
@@ -110,7 +111,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
     StringBuilder uriBuilder = new StringBuilder();
     uriBuilder.append(hdfsConfiguration.getScheme());
     uriBuilder.append("://");
-    if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+    if (StringUtils.isNotBlank(hdfsConfiguration.getHost())) {
       uriBuilder.append(hdfsConfiguration.getHost());
       if (hdfsConfiguration.getPort() != null) {
         uriBuilder.append(":" + hdfsConfiguration.getPort());
@@ -148,36 +149,34 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
       UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
       
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
 
-      ugi.doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws Exception {
-          Configuration conf = new Configuration();
-          
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
-          conf.set("fs.hdfs.impl", 
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
-          conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
-          LOGGER.info("WebURI : {}", getURI().toString());
-          client = FileSystem.get(getURI(), conf);
-          LOGGER.info("Connected to WebHDFS");
-
-          /*
-          * 
************************************************************************************************
-          * This code is an example of how you would work with HDFS and you 
weren't going over
-          * the webHDFS protocol.
-          *
-          * Smashew: 2013-10-01
-          * 
************************************************************************************************
-          conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" 
+ userName);
-          conf.set("namenode.host","0.0.0.0");
-          conf.set("hadoop.job.ugi", userName);
-          conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
-          fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
-          FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
-          for(int i=0;i<status.length;i++)
-          {
-              LOGGER.info("Directory: {}", status[i].getPath());
-          }
-          */
-          return null;
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        Configuration conf = new Configuration();
+        conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
+        conf.set("fs.file.impl", LocalFileSystem.class.getName());
+        LOGGER.info("WebURI : {}", getURI().toString());
+        client = FileSystem.get(getURI(), conf);
+        LOGGER.info("Connected to WebHDFS");
+
+        /*
+        * 
************************************************************************************************
+        * This code is an example of how you would work with HDFS and you 
weren't going over
+        * the webHDFS protocol.
+        *
+        * Smashew: 2013-10-01
+        * 
************************************************************************************************
+        conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + 
userName);
+        conf.set("namenode.host","0.0.0.0");
+        conf.set("hadoop.job.ugi", userName);
+        conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
+        fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
+        FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
+        for(int i=0;i<status.length;i++)
+        {
+            LOGGER.info("Directory: {}", status[i].getPath());
         }
+        */
+        return null;
       });
     } catch (Exception ex) {
       LOGGER.error("There was an error connecting to WebHDFS, please check 
your settings and try again");
@@ -206,7 +205,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
         status[0] = fileStatus;
       } else if ( client.isDirectory(path)) {
         status = client.listStatus(path);
-        List<FileStatus> statusList = Lists.newArrayList(status);
+        List<FileStatus> statusList = Arrays.asList(status);
         Collections.sort(statusList);
         status = statusList.toArray(new FileStatus[0]);
         LOGGER.info("Found Directory : {} files", status.length);
@@ -287,11 +286,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
 
   @Override
   public boolean isRunning() {
-    if ( task != null) {
-      return !task.isDone() && !task.isCancelled();
-    } else {
-      return true;
-    }
+    return task == null || !task.isDone() && !task.isCancelled();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 5bff080..d18bda9 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum;
 
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.Uninterruptibles;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 4554c0f..29a6b73 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,7 +30,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -65,7 +64,7 @@ public class WebHdfsPersistWriter implements 
StreamsPersistWriter, Flushable, Cl
   private Path path;
   private int linesPerFile;
   private int totalRecordsWritten = 0;
-  private final List<Path> writtenFiles = new ArrayList<Path>();
+  private final List<Path> writtenFiles = new ArrayList<>();
   private int fileLineCounter = 0;
   private OutputStreamWriter currentWriter = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
 
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
index a35f124..6e5f351 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
@@ -19,14 +19,21 @@
 
 package org.apache.streams.hdfs.test;
 
-import org.apache.streams.hdfs.*;
+import org.apache.streams.hdfs.HdfsConfiguration;
+import org.apache.streams.hdfs.HdfsReaderConfiguration;
+import org.apache.streams.hdfs.HdfsWriterConfiguration;
+import org.apache.streams.hdfs.WebHdfsPersistReader;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test for checking that strings append to FS paths as expected
@@ -63,7 +70,7 @@ public class HdfsPersistConfigTest {
         HdfsWriterConfiguration writerConfiguration = new 
HdfsWriterConfiguration();
         writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
         writerConfiguration.setHost("localhost");
-        writerConfiguration.setPort(9000l);
+        writerConfiguration.setPort(9000L);
         writerConfiguration.setPath("path");
         writerConfiguration.setWriterPath("writerPath");
         writerConfiguration.setUser("cloudera");
@@ -86,7 +93,7 @@ public class HdfsPersistConfigTest {
         HdfsWriterConfiguration writerConfiguration = new 
HdfsWriterConfiguration();
         writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
         writerConfiguration.setHost("localhost");
-        writerConfiguration.setPort(57000l);
+        writerConfiguration.setPort(57000L);
         writerConfiguration.setPath("path");
         writerConfiguration.setWriterPath("writerPath");
         writerConfiguration.setUser("cloudera");
@@ -128,7 +135,7 @@ public class HdfsPersistConfigTest {
         HdfsReaderConfiguration readerConfiguration = new 
HdfsReaderConfiguration();
         readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
         readerConfiguration.setHost("localhost");
-        readerConfiguration.setPort(9000l);
+        readerConfiguration.setPort(9000L);
         readerConfiguration.setPath("path");
         readerConfiguration.setReaderPath("readerPath");
 
@@ -150,7 +157,7 @@ public class HdfsPersistConfigTest {
         HdfsReaderConfiguration readerConfiguration = new 
HdfsReaderConfiguration();
         readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
         readerConfiguration.setHost("localhost");
-        readerConfiguration.setPort(57000l);
+        readerConfiguration.setPort(57000L);
         readerConfiguration.setPath("path");
         readerConfiguration.setReaderPath("readerPath");
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
 
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
index 7191d9a..1ab1693 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -18,10 +18,6 @@
 
 package org.apache.streams.hdfs.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.hdfs.HdfsConfiguration;
@@ -31,6 +27,11 @@ import org.apache.streams.hdfs.WebHdfsPersistReader;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -48,7 +51,7 @@ public class TestHdfsPersist {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestHdfsPersist.class);
 
-    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+    private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
     @Before
     public void setup() {
@@ -60,12 +63,12 @@ public class TestHdfsPersist {
     @Test
     public void TestHdfsPersist() throws Exception {
 
-        List<List<String>> fieldArrays = Lists.newArrayList();
-        fieldArrays.add(new ArrayList<String>());
-        fieldArrays.add(Lists.newArrayList("ID"));
-        fieldArrays.add(Lists.newArrayList("ID", "DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+        List<List<String>> fieldArrays = new ArrayList<>();
+        fieldArrays.add(new ArrayList<>());
+        fieldArrays.add(Collections.singletonList("ID"));
+        fieldArrays.add(Arrays.asList("ID", "DOC"));
+        fieldArrays.add(Arrays.asList("ID", "TS", "DOC"));
+        fieldArrays.add(Arrays.asList("ID", "TS", "META", "DOC"));
 
         for( List<String> fields : fieldArrays )
             TestHdfsPersistCase(fields);
@@ -108,13 +111,13 @@ public class TestHdfsPersist {
         HdfsReaderConfiguration hdfsReaderConfiguration = 
MAPPER.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class);
 
         WebHdfsPersistReader reader = new 
WebHdfsPersistReader(hdfsReaderConfiguration);
-        hdfsReaderConfiguration.setReaderPath(new 
Integer(fields.size()).toString());
+        hdfsReaderConfiguration.setReaderPath(Integer.toString(fields.size()));
 
         reader.prepare(null);
 
         StreamsResultSet resultSet = reader.readAll();
 
-        assert( resultSet.size() == count);
+        Assert.assertEquals(resultSet.size(), count);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index 64f7200..edca3de 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -25,7 +25,13 @@ import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.Whitelist;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import kafka.utils.VerifiableProperties;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,14 +46,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import kafka.utils.VerifiableProperties;
-
 /**
  * KafkaPersistReader reads documents from kafka.
  */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 199be73..a36246c 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -20,15 +20,13 @@ package org.apache.streams.kafka;
 
 import org.apache.streams.core.StreamsDatum;
 
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
-
 /**
  * KafkaPersistReaderTask reads documents from kafka on behalf of
  * @see org.apache.streams.kafka.KafkaPersistReader
@@ -51,9 +49,8 @@ public class KafkaPersistReaderTask implements Runnable {
     MessageAndMetadata<String,String> item;
     while (true) {
 
-      ConsumerIterator<String, String> it = stream.iterator();
-      while (it.hasNext()) {
-        item = it.next();
+      for (MessageAndMetadata<String, String> aStream : stream) {
+        item = aStream;
         reader.persistQueue.add(new StreamsDatum(item.message()));
       }
       try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 40e125f..9f696c7 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -26,7 +26,9 @@ import org.apache.streams.util.GuidUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,10 +37,6 @@ import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
 /**
  * KafkaPersistWriter writes documents to kafka.
  */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index b6a7404..30ecfbf 100644
--- 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -38,7 +38,6 @@ import com.mongodb.DBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
-
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 6072f58..8f33648 100644
--- 
a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ 
b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -35,7 +35,6 @@ import com.mongodb.MongoClient;
 import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.util.JSON;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
 
b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
index 2a2e170..7688b04 100644
--- 
a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
+++ 
b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
@@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
-
 import org.apache.commons.io.Charsets;
 import org.apache.commons.io.IOUtils;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
index ae0709a..718e0f3 100644
--- 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
@@ -26,12 +26,11 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -57,7 +56,7 @@ public class CleanAdditionalPropertiesProcessor implements 
StreamsProcessor {
 
   @Override
   public List<StreamsDatum> process(StreamsDatum datum) {
-    List<StreamsDatum> result = Lists.newLinkedList();
+    List<StreamsDatum> result = new LinkedList<>();
     ObjectNode activity = this.mapper.convertValue(datum.getDocument(), 
ObjectNode.class);
     cleanAdditionalProperties(activity);
     datum.setDocument(activity);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
index 4736ee2..26c7c4f 100644
--- 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
@@ -24,12 +24,12 @@ import org.apache.streams.core.StreamsProcessor;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -42,7 +42,7 @@ public class TypeConverterProcessor implements 
StreamsProcessor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterProcessor.class);
 
-  private List<String> formats = Lists.newArrayList();
+  private List<String> formats = new ArrayList<>();
 
   private ObjectMapper mapper;
 
@@ -91,7 +91,7 @@ public class TypeConverterProcessor implements 
StreamsProcessor {
 
   @Override
   public List<StreamsDatum> process(StreamsDatum entry) {
-    List<StreamsDatum> result = Lists.newLinkedList();
+    List<StreamsDatum> result = new LinkedList<>();
     Object inDoc = entry.getDocument();
     ObjectNode node = null;
     if ( inClass == String.class

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
index e0759c3..cea963a 100644
--- 
a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
+++ 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
@@ -25,16 +25,15 @@ import org.apache.streams.jackson.TypeConverterProcessor;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  *
@@ -48,7 +47,7 @@ public class TypeConverterProcessorTest {
   @Test
   public void testTypeConverterStringToString() {
     final String ID = "1";
-    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
+    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
String.class, Collections.singletonList(DATASIFT_FORMAT));
     processor.prepare(null);
     StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
     List<StreamsDatum> result = processor.process(datum);
@@ -64,7 +63,7 @@ public class TypeConverterProcessorTest {
   @Test
   public void testTypeConverterStringToObjectNode() {
     final String ID = "1";
-    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
+    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
ObjectNode.class, Collections.singletonList(DATASIFT_FORMAT));
     processor.prepare(null);
     StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
     List<StreamsDatum> result = processor.process(datum);
@@ -80,9 +79,9 @@ public class TypeConverterProcessorTest {
   @Test
   public void testTypeConverterObjectNodeToString() throws IOException {
     final String ID = "1";
-    StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
+    StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, 
String.class, Collections.singletonList(DATASIFT_FORMAT));
     processor.prepare(null);
-    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT));
+    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Collections.singletonList(DATASIFT_FORMAT));
     ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class);
     StreamsDatum datum = new StreamsDatum(node, ID);
     List<StreamsDatum> result = processor.process(datum);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index c2c3705..b1024bd 100644
--- 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -28,15 +28,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.collect.Lists;
 import com.jayway.jsonpath.JsonPath;
-
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
-
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -107,15 +104,13 @@ public class JsonPathExtractor implements 
StreamsProcessor {
         } else if (readResult instanceof JSONArray) {
           LOGGER.info("Matched Array:");
           JSONArray array = (JSONArray) readResult;
-          Iterator iterator = array.iterator();
-          while (iterator.hasNext()) {
-            Object item = iterator.next();
-            if ( item instanceof String ) {
+          for (Object item : array) {
+            if (item instanceof String) {
               LOGGER.info("String Item:" + item);
               String match = (String) item;
               StreamsDatum matchDatum = new StreamsDatum(match);
               result.add(matchDatum);
-            } else if ( item instanceof JSONObject ) {
+            } else if (item instanceof JSONObject) {
               LOGGER.info("Object Item:" + item);
               JSONObject match = (JSONObject) item;
               ObjectNode objectNode = 
mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index ec741c2..dfeb8a2 100644
--- 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -29,20 +29,18 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.jayway.jsonpath.JsonPath;
-
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
-
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Provides a base implementation for filtering datums which
@@ -77,7 +75,7 @@ public class JsonPathFilter implements StreamsProcessor {
   @Override
   public List<StreamsDatum> process(StreamsDatum entry) {
 
-    List<StreamsDatum> result = Lists.newArrayList();
+    List<StreamsDatum> result = new ArrayList<>();
 
     String json = null;
 
@@ -102,7 +100,7 @@ public class JsonPathFilter implements StreamsProcessor {
       }
     }
 
-    Preconditions.checkNotNull(document);
+    Objects.requireNonNull(document);
 
     if ( StringUtils.isNotEmpty(json)) {
 
@@ -114,7 +112,7 @@ public class JsonPathFilter implements StreamsProcessor {
         LOGGER.warn(ex.getMessage());
       }
 
-      Preconditions.checkNotNull(srcResult);
+      Objects.requireNonNull(srcResult);
 
       String[] path = StringUtils.split(pathExpression, '.');
       ObjectNode node = document;
@@ -122,7 +120,7 @@ public class JsonPathFilter implements StreamsProcessor {
         node = (ObjectNode) document.get(path[i]);
       }
 
-      Preconditions.checkNotNull(node);
+      Objects.requireNonNull(node);
 
       if ( srcResult instanceof JSONArray ) {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
 
b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 1ab7c00..50ea9ba 100644
--- 
a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ 
b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.json.JsonPathExtractor;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
index 206931f..305ddfd 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
@@ -27,15 +27,14 @@ import org.apache.streams.pojo.json.Activity;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +76,7 @@ public abstract class AbstractRegexExtensionExtractor<T> 
implements StreamsProce
     } else {
       return new ArrayList<>();
     }
-    if (Strings.isNullOrEmpty(pattern)) {
+    if (StringUtils.isBlank(pattern)) {
       prepare(null);
     }
     Map<String, List<Integer>> matches = RegexUtils.extractMatches(pattern, 
activity.getContent());
@@ -92,7 +91,7 @@ public abstract class AbstractRegexExtensionExtractor<T> 
implements StreamsProce
     entities.addAll(set);
 
     entry.setDocument(activity);
-    return Lists.newArrayList(entry);
+    return Collections.singletonList(entry);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
index d1936d1..d46c2af 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
@@ -21,7 +21,6 @@ package org.apache.streams.regex;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
index 6e17de8..1912ff0 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
@@ -19,10 +19,11 @@
 
 package org.apache.streams.regex;
 
-import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
index 66f7aa5..bb0e95d 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
@@ -19,15 +19,21 @@
 
 package org.apache.streams.regex;
 
-import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
index d5d8d9b..64e8599 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
@@ -19,14 +19,18 @@
 
 package org.apache.streams.regex;
 
-import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
index 42867d7..cefac6f 100644
--- 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
+++ 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java
@@ -18,9 +18,8 @@
 
 package org.apache.streams.urls;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import org.apache.commons.codec.net.URLCodec;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.validator.routines.UrlValidator;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -28,12 +27,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.URLDecoder;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 public class LinkResolver implements Serializable {
 
@@ -73,7 +76,7 @@ public class LinkResolver implements Serializable {
 
     // To help canonicalize the URL, these parts are 'known' to be 'ok' to 
remove
     private static final Collection<String> URL_TRACKING_TO_REMOVE = new 
ArrayList<String>() {{
-        /******************************************************************
+        /*
          * Google uses parameters in the URL string to track referrers
          * on their Google Analytics and promotions. These are the
          * identified URL patterns.
@@ -102,7 +105,7 @@ public class LinkResolver implements Serializable {
     // This element holds all the information about all the re-directs that 
have taken place
     // and the steps and HTTP codes that occurred inside of each step.
     private final LinkDetails linkDetails;
-    private Collection<String> domainsSensitiveTo = new HashSet<String>();
+    private Collection<String> domainsSensitiveTo = new HashSet<>();
 
     /**
      * Get the link details
@@ -125,7 +128,7 @@ public class LinkResolver implements Serializable {
 
     public void run() {
 
-        Preconditions.checkNotNull(linkDetails.getOriginalURL());
+        Objects.requireNonNull(linkDetails.getOriginalURL());
 
         linkDetails.setStartTime(DateTime.now());
 
@@ -140,22 +143,22 @@ public class LinkResolver implements Serializable {
             this.linkDetails.setRedirected(false);
 
         linkDetails.setFinalURL(cleanURL(linkDetails.getFinalURL()));
-        if( !Strings.isNullOrEmpty(linkDetails.getFinalURL()))
+        if(StringUtils.isNotBlank(linkDetails.getFinalURL()))
             
linkDetails.setNormalizedURL(normalizeURL(linkDetails.getFinalURL()));
-        if( !Strings.isNullOrEmpty(linkDetails.getNormalizedURL()))
+        if(StringUtils.isNotBlank(linkDetails.getNormalizedURL()))
             
linkDetails.setUrlParts(tokenizeURL(linkDetails.getNormalizedURL()));
 
         this.updateTookInMillis();
     }
 
     protected void updateTookInMillis() {
-        Preconditions.checkNotNull(linkDetails.getStartTime());
+        Objects.requireNonNull(linkDetails.getStartTime());
         
linkDetails.setTookInMills(DateTime.now().minus(linkDetails.getStartTime().getMillis()).getMillis());
     }
 
     public void unwindLink(String url) {
-        Preconditions.checkNotNull(linkDetails);
-        Preconditions.checkNotNull(url);
+        Objects.requireNonNull(linkDetails);
+        Objects.requireNonNull(url);
 
         // Check url validity
         UrlValidator urlValidator = new UrlValidator();
@@ -238,7 +241,7 @@ public class LinkResolver implements Serializable {
             linkDetails.setFinalResponseCode((long) 
connection.getResponseCode());
 
             Map<String, List<String>> headers = 
createCaseInsensitiveMap(connection.getHeaderFields());
-            /******************************************************************
+            /*
              * If they want us to set cookies, well, then we will set cookies
              * Example URL:
              * http://nyti.ms/1bCpesx
@@ -247,7 +250,7 @@ public class LinkResolver implements Serializable {
                 
linkDetails.getCookies().add(headers.get(SET_COOKIE_IDENTIFIER).get(0));
 
             switch (linkDetails.getFinalResponseCode().intValue()) {
-                /**
+                /*
                  * W3C HTTP Response Codes:
                  * http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
                  */
@@ -263,7 +266,7 @@ public class LinkResolver implements Serializable {
                 case 304: // Not Modified
                 case 306: // This status code is unused but in the redirect 
block.
                 case 307: // Temporary re-direct
-                    
/*******************************************************************
+                    /*
                      * Author:
                      * Smashew
                      *
@@ -338,7 +341,7 @@ public class LinkResolver implements Serializable {
     }
 
     private Map<String, List<String>> createCaseInsensitiveMap(Map<String, 
List<String>> input) {
-        Map<String, List<String>> toReturn = new HashMap<String, 
List<String>>();
+        Map<String, List<String>> toReturn = new HashMap<>();
         for (String k : input.keySet())
             if (k != null && input.get(k) != null)
                 toReturn.put(k.toLowerCase(), input.get(k));
@@ -418,7 +421,7 @@ public class LinkResolver implements Serializable {
         // If you want to just look in the GET parameters, or you want to 
ignore the domain
         // or you want to use the domain as a token itself, that would have to 
be
         // processed above the next line, and only the remaining parts split
-        List<String> toReturn = new ArrayList<String>();
+        List<String> toReturn = new ArrayList<>();
 
         // Split the URL by forward slashes. Most modern browsers will accept 
a URL
         // this malformed such as http://www.smashew.com/hello//how////are/you

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java
 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java
index d6785e9..5493fef 100644
--- 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java
+++ 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java
@@ -18,7 +18,11 @@
 
 package org.apache.streams.urls;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
@@ -57,7 +61,7 @@ public final class LinkResolverHelperFunctions {
     public static final long DEFAULT_STAGGER = RECENT_DOMAINS_BACKOFF / 10;
 
     // Map to store the information of recent domains, with the last time they 
were accessed.
-    private static final ConcurrentMap<String, Date> RECENT_DOMAINS = new 
ConcurrentHashMap<String, Date>();
+    private static final ConcurrentMap<String, Date> RECENT_DOMAINS = new 
ConcurrentHashMap<>();
 
     private static Timer timer;
 
@@ -142,7 +146,7 @@ public final class LinkResolverHelperFunctions {
             // see if there is any work that 'can' be done
             if(RECENT_DOMAINS.size() != 0) {
                 // create a temporary list of the items that can be removed
-                Collection<String> ableToRemove = new HashSet<String>();
+                Collection<String> ableToRemove = new HashSet<>();
 
 
                 // iterate through all the domains (keys)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
index 203b414..6c88e28 100644
--- 
a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
+++ 
b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java
@@ -18,16 +18,19 @@
 
 package org.apache.streams.urls;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 public class LinkResolverProcessor implements StreamsProcessor {
 
@@ -102,7 +105,7 @@ public class LinkResolverProcessor implements 
StreamsProcessor {
 
 
     protected Set<String> unwind(List<String> inputLinks) {
-        Set<String> outputLinks = new HashSet<String>();
+        Set<String> outputLinks = new HashSet<>();
         for (String link : inputLinks) {
             try {
                 LinkResolver unwinder = new LinkResolver(link);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java
 
b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java
index 12f9848..6c8439f 100644
--- 
a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java
+++ 
b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java
@@ -24,9 +24,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Date;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class LinkHelperFunctionsTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
 
b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
index 5908baa..3af3a89 100644
--- 
a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
+++ 
b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java
@@ -18,24 +18,22 @@
 
 package org.apache.streams.urls;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.jackson.StreamsJacksonModule;
 import org.apache.streams.pojo.json.Activity;
+
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
-/**
- * Created by rebanks on 2/27/14.
- */
 public class TestLinkUnwinderProcessor {
 
     private static String activityString;
@@ -73,44 +71,44 @@ public class TestLinkUnwinderProcessor {
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorBitly() throws Exception{
-        
testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4";), 
Lists.newArrayList("http://www.wcgworld.com/";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4";), 
Lists.newArrayList("http://www.wcgworld.com/";));
+        
testActivityUnwinderHelper(Collections.singletonList("http://bit.ly/1cX5Rh4";), 
Collections.singletonList("http://www.wcgworld.com/";));
+        
testStringActivityUnwinderHelper(Collections.singletonList("http://bit.ly/1cX5Rh4";),
 Collections.singletonList("http://www.wcgworld.com/";));
     }
 
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorTdotCo() throws Exception{
-        
testActivityUnwinderHelper(Lists.newArrayList("http://t.co/lLFgFynv2G";), 
Lists.newArrayList("http://www.holmesreport.com/latest";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://t.co/lLFgFynv2G";), 
Lists.newArrayList("http://www.holmesreport.com/latest";));
+        
testActivityUnwinderHelper(Collections.singletonList("http://t.co/lLFgFynv2G";), 
Collections.singletonList("http://www.holmesreport.com/latest";));
+        
testStringActivityUnwinderHelper(Collections.singletonList("http://t.co/lLFgFynv2G";),
 Collections.singletonList("http://www.holmesreport.com/latest";));
     }
 
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorGoogle() throws Exception{
-        testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA";), 
Lists.newArrayList("http://www.wcgworld.com/";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA";), 
Lists.newArrayList("http://www.wcgworld.com/";));
+        
testActivityUnwinderHelper(Collections.singletonList("http://goo.gl/wSrHDA";), 
Collections.singletonList("http://www.wcgworld.com/";));
+        
testStringActivityUnwinderHelper(Collections.singletonList("http://goo.gl/wSrHDA";),
 Collections.singletonList("http://www.wcgworld.com/";));
     }
 
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorOwly() throws Exception{
-        testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte";), 
Lists.newArrayList("http://www.wcgworld.com/";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte";), 
Lists.newArrayList("http://www.wcgworld.com/";));
+        
testActivityUnwinderHelper(Collections.singletonList("http://ow.ly/u4Kte";), 
Collections.singletonList("http://www.wcgworld.com/";));
+        
testStringActivityUnwinderHelper(Collections.singletonList("http://ow.ly/u4Kte";),
 Collections.singletonList("http://www.wcgworld.com/";));
     }
 
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{
-        testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt";), 
Lists.newArrayList("http://www.wcgworld.com/";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt";), 
Lists.newArrayList("http://www.wcgworld.com/";));
+        
testActivityUnwinderHelper(Collections.singletonList("http://x.co/3yapt";), 
Collections.singletonList("http://www.wcgworld.com/";));
+        
testStringActivityUnwinderHelper(Collections.singletonList("http://x.co/3yapt";),
 Collections.singletonList("http://www.wcgworld.com/";));
     }
 
     @Ignore
     @Test
     public void testActivityLinkUnwinderProcessorMulti() throws Exception{
         // changed these tests because the processor now guarantees each 
result returned only once
-        testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt";, 
"http://ow.ly/u4Kte";, "http://goo.gl/wSrHDA";), 
Lists.newArrayList("http://www.wcgworld.com/";));
-        
testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt";, 
"http://ow.ly/u4Kte";, "http://goo.gl/wSrHDA";), 
Lists.newArrayList("http://www.wcgworld.com/";));
+        testActivityUnwinderHelper(Arrays.asList("http://x.co/3yapt";, 
"http://ow.ly/u4Kte";, "http://goo.gl/wSrHDA";), 
Collections.singletonList("http://www.wcgworld.com/";));
+        testStringActivityUnwinderHelper(Arrays.asList("http://x.co/3yapt";, 
"http://ow.ly/u4Kte";, "http://goo.gl/wSrHDA";), 
Collections.singletonList("http://www.wcgworld.com/";));
     }
 
     public void testActivityUnwinderHelper(List<String> input, List<String> 
expected) throws Exception{
@@ -137,7 +135,7 @@ public class TestLinkUnwinderProcessor {
         assertNotNull(resultActivity.getLinks());
         List<String> resultLinks = resultActivity.getLinks();
         assertEquals(expected.size(), resultLinks.size());
-        assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+        assertEquals(expected, resultLinks);
     }
 
     public void testStringActivityUnwinderHelper(List<String> input, 
List<String> expected) throws Exception{
@@ -159,7 +157,7 @@ public class TestLinkUnwinderProcessor {
         assertNotNull(resultActivity.getLinks());
         List<String> resultLinks = resultActivity.getLinks();
         assertEquals(expected.size(), resultLinks.size());
-        assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks));
+        assertEquals(expected, resultLinks);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
index 1216c38..7dd6769 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
@@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.commons.lang.NotImplementedException;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index 306fecc..0710a2c 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
index 92cf333..7a25b23 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
@@ -29,13 +29,11 @@ import 
org.apache.streams.facebook.provider.FacebookEventClassifier;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 
-import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +63,7 @@ public class FacebookTypeConverter implements 
StreamsProcessor {
 
   private int count = 0;
 
-  public static final String TERMINATE = new String("TERMINATE");
+  public static final String TERMINATE = "TERMINATE";
 
   public FacebookTypeConverter(Class inClass, Class outClass) {
     this.inClass = inClass;
@@ -138,8 +136,6 @@ public class FacebookTypeConverter implements 
StreamsProcessor {
       while (parser.nextToken() != null) {
       }
       valid = true;
-    } catch (JsonParseException jpe) {
-      LOGGER.warn("validate: {}", jpe);
     } catch (IOException ioe) {
       LOGGER.warn("validate: {}", ioe);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index 617bfab..52ec222 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -28,17 +28,15 @@ import 
org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-
+import facebook4j.Facebook;
+import facebook4j.FacebookFactory;
+import facebook4j.conf.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import facebook4j.Facebook;
-import facebook4j.FacebookFactory;
-import facebook4j.conf.ConfigurationBuilder;
-
 /**
  * Abstract data collector for Facebook.  Iterates over ids and queues data to 
be output
  * by a {@link org.apache.streams.core.StreamsProvider}
@@ -66,7 +64,7 @@ public abstract class FacebookDataCollector implements 
Runnable {
     this.queue = queue;
     this.isComplete = new AtomicBoolean(false);
     this.backOff = new ExponentialBackOffStrategy(5);
-    this.authTokens = new BasicTokenManager<String>();
+    this.authTokens = new BasicTokenManager<>();
     if (config.getUserAccessTokens() != null) {
       for (String token : config.getUserAccessTokens()) {
         this.authTokens.addTokenToPool(token);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
index 47c2afb..2370810 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
@@ -24,7 +24,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
index 3253479..93508b0 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -34,7 +34,15 @@ import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -54,16 +62,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import facebook4j.Facebook;
-import facebook4j.FacebookException;
-import facebook4j.FacebookFactory;
-import facebook4j.Friend;
-import facebook4j.Paging;
-import facebook4j.Post;
-import facebook4j.ResponseList;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-
 public class FacebookFriendFeedProvider implements StreamsProvider, 
Serializable {
 
   public static final String STREAMS_ID = "FacebookFriendFeedProvider";
@@ -78,7 +76,7 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
   private Class klass;
   protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<>();
 
   public FacebookUserstreamConfiguration getConfig() {
     return configuration;
@@ -103,7 +101,7 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
   private static ExecutorService newFixedThreadPoolWithQueueSize(int 
numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   /**
@@ -116,7 +114,6 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
       configuration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
FacebookUserInformationConfiguration.class);
     } catch (IOException ex) {
       ex.printStackTrace();
-      return;
     }
   }
 
@@ -193,8 +190,7 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
     this.start = start;
     this.end = end;
     readCurrent();
-    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-    return result;
+    return (StreamsResultSet)providerQueue.iterator();
   }
 
   @Override
@@ -266,9 +262,8 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
         .setClientVersion("v1.0");
 
     FacebookFactory ff = new FacebookFactory(cb.build());
-    Facebook facebook = ff.getInstance();
 
-    return facebook;
+    return ff.getInstance();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
index 50ac64a..c973863 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -34,7 +34,15 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
-
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
 import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -44,6 +52,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -55,22 +64,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import facebook4j.Facebook;
-import facebook4j.FacebookException;
-import facebook4j.FacebookFactory;
-import facebook4j.Friend;
-import facebook4j.Paging;
-import facebook4j.Post;
-import facebook4j.ResponseList;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-
 /**
  * FacebookFriendUpdatesProvider provides updates from friend feed.
  */
 public class FacebookFriendUpdatesProvider implements StreamsProvider, 
Serializable {
 
-  public static final String STREAMS_ID = "FacebookFriendPostsProvider";
+  private static final String STREAMS_ID = "FacebookFriendPostsProvider";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
 
@@ -84,7 +83,7 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
   private Class klass;
   protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>();
+  protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<>();
 
   public FacebookUserstreamConfiguration getConfig() {
     return configuration;
@@ -110,7 +109,7 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
   private static ExecutorService newFixedThreadPoolWithQueueSize(int 
numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
         5000L, TimeUnit.MILLISECONDS,
-        new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+        new ArrayBlockingQueue<>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   /**
@@ -123,7 +122,6 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
       configuration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
FacebookUserInformationConfiguration.class);
     } catch (IOException ex) {
       ex.printStackTrace();
-      return;
     }
   }
 
@@ -209,8 +207,7 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
     this.start = start;
     this.end = end;
     readCurrent();
-    StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-    return result;
+    return (StreamsResultSet)providerQueue.iterator();
   }
 
   @Override
@@ -242,11 +239,11 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
 
     executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-    Preconditions.checkNotNull(providerQueue);
-    Preconditions.checkNotNull(this.klass);
-    Preconditions.checkNotNull(configuration.getOauth().getAppId());
-    Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-    Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+    Objects.requireNonNull(providerQueue);
+    Objects.requireNonNull(this.klass);
+    Objects.requireNonNull(configuration.getOauth().getAppId());
+    Objects.requireNonNull(configuration.getOauth().getAppSecret());
+    Objects.requireNonNull(configuration.getOauth().getUserAccessToken());
 
     Facebook client = getFacebookClient();
 
@@ -254,11 +251,6 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
       ResponseList<Friend> friendResponseList = client.friends().getFriends();
       Paging<Friend> friendPaging;
       do {
-
-        for ( Friend friend : friendResponseList ) {
-          // client.rawAPI().callPostAPI();
-          // add a subscription
-        }
         friendPaging = friendResponseList.getPaging();
         friendResponseList = client.fetchNext(friendPaging);
       }
@@ -283,9 +275,8 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
         .setClientVersion("v1.0");
 
     FacebookFactory ff = new FacebookFactory(cb.build());
-    Facebook facebook = ff.getInstance();
 
-    return facebook;
+    return ff.getInstance();
   }
 
   @Override


Reply via email to