http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java index e322990..71cb055 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java @@ -27,17 +27,17 @@ import org.apache.streams.pojo.json.ActivityObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.javatuples.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Supporting class for interacting with neo4j via rest API @@ -48,10 +48,10 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - public static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; - public static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; + private static final String getVertexLongIdStatementTemplate = "MATCH (v) WHERE ID(v) = <id> RETURN v"; + private static final String getVertexStringIdStatementTemplate = "MATCH (v {id: '<id>'} ) RETURN v"; - public static final String createVertexStatementTemplate = + private static final String createVertexStatementTemplate = "MATCH (x {id: '<id>'}) " + "CREATE UNIQUE (v:<type> { props }) " + "ON CREATE SET v <labels> " @@ -59,13 +59,13 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { - public static final String mergeVertexStatementTemplate = + private static final String mergeVertexStatementTemplate = "MERGE (v:<type> {id: '<id>'}) " + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = timestamp() " + "RETURN v"; - public static final String createEdgeStatementTemplate = + private static final String createEdgeStatementTemplate = "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) " + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) " + "RETURN r"; @@ -113,7 +113,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { */ public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject activityObject) { - Preconditions.checkNotNull(activityObject.getObjectType()); + Objects.requireNonNull(activityObject.getObjectType()); List<String> labels = getLabels(activityObject); @@ -144,9 +144,9 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { */ public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject activityObject) { - Preconditions.checkNotNull(activityObject.getObjectType()); + Objects.requireNonNull(activityObject.getObjectType()); - Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); + Pair queryPlusParameters = new Pair(null, new HashMap<>()); List<String> labels = getLabels(activityObject); @@ -176,7 +176,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { */ public Pair<String, Map<String, Object>> createEdgeRequest(Activity activity) { - Pair queryPlusParameters = new Pair(null, Maps.newHashMap()); + Pair queryPlusParameters = new Pair(null, new HashMap<>()); ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class); Map<String, Object> props = PropertyUtil.flattenToMap(object, '.'); @@ -213,7 +213,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { public static String getPropertyCreater(Map<String, Object> map) { StringBuilder builder = new StringBuilder(); builder.append("{"); - List<String> parts = Lists.newArrayList(); + List<String> parts = new ArrayList<>(); for ( Map.Entry<String, Object> entry : map.entrySet()) { if ( entry.getValue() instanceof String ) { String propVal = (String) (entry.getValue()); @@ -226,7 +226,7 @@ public class CypherQueryGraphHelper implements QueryGraphHelper { } private List<String> getLabels(ActivityObject activityObject) { - List<String> labels = Lists.newArrayList(":streams"); + List<String> labels = Collections.singletonList(":streams"); if ( activityObject.getAdditionalProperties().containsKey("labels") ) { List<String> extraLabels = (List<String>)activityObject.getAdditionalProperties().get("labels"); for ( String extraLabel : extraLabels ) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java index 72e668f..9f47058 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java @@ -23,13 +23,12 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; - import org.javatuples.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Objects; /** * Supporting class for interacting with neo4j via rest API. @@ -40,9 +39,9 @@ public class Neo4jHttpGraphHelper implements HttpGraphHelper { private static final Logger LOGGER = LoggerFactory.getLogger(Neo4jHttpGraphHelper.class); - public static final String statementKey = "statement"; - public static final String paramsKey = "parameters"; - public static final String propsKey = "props"; + private static final String statementKey = "statement"; + private static final String paramsKey = "parameters"; + private static final String propsKey = "props"; /** * createHttpRequest neo4j rest json payload. @@ -54,9 +53,9 @@ public class Neo4jHttpGraphHelper implements HttpGraphHelper { LOGGER.debug("createHttpRequest: ", queryPlusParameters); - Preconditions.checkNotNull(queryPlusParameters); - Preconditions.checkNotNull(queryPlusParameters.getValue0()); - Preconditions.checkNotNull(queryPlusParameters.getValue1()); + Objects.requireNonNull(queryPlusParameters); + Objects.requireNonNull(queryPlusParameters.getValue0()); + Objects.requireNonNull(queryPlusParameters.getValue1()); ObjectNode request = MAPPER.createObjectNode(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java index 673b402..24ddd65 100644 --- a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java +++ b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java @@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.commons.io.IOUtils; import org.junit.Before; import org.junit.Test; @@ -38,8 +37,7 @@ import java.io.InputStream; import java.util.List; /** - * Unit test for - * @see {@link org.apache.streams.graph.GraphVertexReader} + * Unit test for {@link org.apache.streams.graph.GraphVertexReader} * * Test that graph db responses can be converted to streams data. */ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java index c5a06fc..6102db0 100644 --- a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java +++ b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java @@ -26,7 +26,6 @@ import org.apache.streams.util.GuidUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HConnection; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index d254abb..64b7284 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -30,15 +30,15 @@ import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; - +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.security.UserGroupInformation; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -49,6 +49,7 @@ import java.math.BigInteger; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -62,7 +63,7 @@ import java.util.concurrent.LinkedBlockingQueue; */ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable { - public static final String STREAMS_ID = "WebHdfsPersistReader"; + private static final String STREAMS_ID = "WebHdfsPersistReader"; private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class); @@ -110,7 +111,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo StringBuilder uriBuilder = new StringBuilder(); uriBuilder.append(hdfsConfiguration.getScheme()); uriBuilder.append("://"); - if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) { + if (StringUtils.isNotBlank(hdfsConfiguration.getHost())) { uriBuilder.append(hdfsConfiguration.getHost()); if (hdfsConfiguration.getPort() != null) { uriBuilder.append(":" + hdfsConfiguration.getPort()); @@ -148,36 +149,34 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser()); ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE); - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - Configuration conf = new Configuration(); - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - LOGGER.info("WebURI : {}", getURI().toString()); - client = FileSystem.get(getURI(), conf); - LOGGER.info("Connected to WebHDFS"); - - /* - * ************************************************************************************************ - * This code is an example of how you would work with HDFS and you weren't going over - * the webHDFS protocol. - * - * Smashew: 2013-10-01 - * ************************************************************************************************ - conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); - conf.set("namenode.host","0.0.0.0"); - conf.set("hadoop.job.ugi", userName); - conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); - fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); - FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); - for(int i=0;i<status.length;i++) - { - LOGGER.info("Directory: {}", status[i].getPath()); - } - */ - return null; + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", LocalFileSystem.class.getName()); + LOGGER.info("WebURI : {}", getURI().toString()); + client = FileSystem.get(getURI(), conf); + LOGGER.info("Connected to WebHDFS"); + + /* + * ************************************************************************************************ + * This code is an example of how you would work with HDFS and you weren't going over + * the webHDFS protocol. + * + * Smashew: 2013-10-01 + * ************************************************************************************************ + conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName); + conf.set("namenode.host","0.0.0.0"); + conf.set("hadoop.job.ugi", userName); + conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner"); + fileSystem.createNewFile(new Path("/user/"+ userName + "/test")); + FileStatus[] status = fs.listStatus(new Path("/user/" + userName)); + for(int i=0;i<status.length;i++) + { + LOGGER.info("Directory: {}", status[i].getPath()); } + */ + return null; }); } catch (Exception ex) { LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again"); @@ -206,7 +205,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo status[0] = fileStatus; } else if ( client.isDirectory(path)) { status = client.listStatus(path); - List<FileStatus> statusList = Lists.newArrayList(status); + List<FileStatus> statusList = Arrays.asList(status); Collections.sort(statusList); status = statusList.toArray(new FileStatus[0]); LOGGER.info("Found Directory : {} files", status.length); @@ -287,11 +286,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo @Override public boolean isRunning() { - if ( task != null) { - return !task.isDone() && !task.isCancelled(); - } else { - return true; - } + return task == null || !task.isDone() && !task.isCancelled(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index 5bff080..d18bda9 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum; import com.google.common.base.Strings; import com.google.common.util.concurrent.Uninterruptibles; - import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 4554c0f..29a6b73 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -30,7 +30,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -65,7 +64,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl private Path path; private int linesPerFile; private int totalRecordsWritten = 0; - private final List<Path> writtenFiles = new ArrayList<Path>(); + private final List<Path> writtenFiles = new ArrayList<>(); private int fileLineCounter = 0; private OutputStreamWriter currentWriter = null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java index a35f124..6e5f351 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java @@ -19,14 +19,21 @@ package org.apache.streams.hdfs.test; -import org.apache.streams.hdfs.*; +import org.apache.streams.hdfs.HdfsConfiguration; +import org.apache.streams.hdfs.HdfsReaderConfiguration; +import org.apache.streams.hdfs.HdfsWriterConfiguration; +import org.apache.streams.hdfs.WebHdfsPersistReader; +import org.apache.streams.hdfs.WebHdfsPersistWriter; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URISyntaxException; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test for checking that strings append to FS paths as expected @@ -63,7 +70,7 @@ public class HdfsPersistConfigTest { HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); writerConfiguration.setHost("localhost"); - writerConfiguration.setPort(9000l); + writerConfiguration.setPort(9000L); writerConfiguration.setPath("path"); writerConfiguration.setWriterPath("writerPath"); writerConfiguration.setUser("cloudera"); @@ -86,7 +93,7 @@ public class HdfsPersistConfigTest { HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration(); writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); writerConfiguration.setHost("localhost"); - writerConfiguration.setPort(57000l); + writerConfiguration.setPort(57000L); writerConfiguration.setPath("path"); writerConfiguration.setWriterPath("writerPath"); writerConfiguration.setUser("cloudera"); @@ -128,7 +135,7 @@ public class HdfsPersistConfigTest { HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS); readerConfiguration.setHost("localhost"); - readerConfiguration.setPort(9000l); + readerConfiguration.setPort(9000L); readerConfiguration.setPath("path"); readerConfiguration.setReaderPath("readerPath"); @@ -150,7 +157,7 @@ public class HdfsPersistConfigTest { HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration(); readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS); readerConfiguration.setHost("localhost"); - readerConfiguration.setPort(57000l); + readerConfiguration.setPort(57000L); readerConfiguration.setPath("path"); readerConfiguration.setReaderPath("readerPath"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java index 7191d9a..1ab1693 100644 --- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java +++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java @@ -18,10 +18,6 @@ package org.apache.streams.hdfs.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.hdfs.HdfsConfiguration; @@ -31,6 +27,11 @@ import org.apache.streams.hdfs.WebHdfsPersistReader; import org.apache.streams.hdfs.WebHdfsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -48,7 +51,7 @@ public class TestHdfsPersist { private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class); - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); @Before public void setup() { @@ -60,12 +63,12 @@ public class TestHdfsPersist { @Test public void TestHdfsPersist() throws Exception { - List<List<String>> fieldArrays = Lists.newArrayList(); - fieldArrays.add(new ArrayList<String>()); - fieldArrays.add(Lists.newArrayList("ID")); - fieldArrays.add(Lists.newArrayList("ID", "DOC")); - fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC")); - fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC")); + List<List<String>> fieldArrays = new ArrayList<>(); + fieldArrays.add(new ArrayList<>()); + fieldArrays.add(Collections.singletonList("ID")); + fieldArrays.add(Arrays.asList("ID", "DOC")); + fieldArrays.add(Arrays.asList("ID", "TS", "DOC")); + fieldArrays.add(Arrays.asList("ID", "TS", "META", "DOC")); for( List<String> fields : fieldArrays ) TestHdfsPersistCase(fields); @@ -108,13 +111,13 @@ public class TestHdfsPersist { HdfsReaderConfiguration hdfsReaderConfiguration = MAPPER.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class); WebHdfsPersistReader reader = new WebHdfsPersistReader(hdfsReaderConfiguration); - hdfsReaderConfiguration.setReaderPath(new Integer(fields.size()).toString()); + hdfsReaderConfiguration.setReaderPath(Integer.toString(fields.size())); reader.prepare(null); StreamsResultSet resultSet = reader.readAll(); - assert( resultSet.size() == count); + Assert.assertEquals(resultSet.size(), count); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java index 64f7200..edca3de 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java @@ -25,7 +25,13 @@ import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; import com.fasterxml.jackson.databind.ObjectMapper; - +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.consumer.Whitelist; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.serializer.StringDecoder; +import kafka.utils.VerifiableProperties; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +46,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.consumer.Whitelist; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.serializer.StringDecoder; -import kafka.utils.VerifiableProperties; - /** * KafkaPersistReader reads documents from kafka. */ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java index 199be73..a36246c 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java @@ -20,15 +20,13 @@ package org.apache.streams.kafka; import org.apache.streams.core.StreamsDatum; +import kafka.consumer.KafkaStream; +import kafka.message.MessageAndMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Random; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.message.MessageAndMetadata; - /** * KafkaPersistReaderTask reads documents from kafka on behalf of * @see org.apache.streams.kafka.KafkaPersistReader @@ -51,9 +49,8 @@ public class KafkaPersistReaderTask implements Runnable { MessageAndMetadata<String,String> item; while (true) { - ConsumerIterator<String, String> it = stream.iterator(); - while (it.hasNext()) { - item = it.next(); + for (MessageAndMetadata<String, String> aStream : stream) { + item = aStream; reader.persistQueue.add(new StreamsDatum(item.message())); } try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java index 40e125f..9f696c7 100644 --- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java +++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java @@ -26,7 +26,9 @@ import org.apache.streams.util.GuidUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +37,6 @@ import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - /** * KafkaPersistWriter writes documents to kafka. */ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java index b6a7404..30ecfbf 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java @@ -38,7 +38,6 @@ import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java index 6072f58..8f33648 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java @@ -35,7 +35,6 @@ import com.mongodb.MongoClient; import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.util.JSON; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java index 2a2e170..7688b04 100644 --- a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java +++ b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java @@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java index ae0709a..718e0f3 100644 --- a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java @@ -26,12 +26,11 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; -import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -57,7 +56,7 @@ public class CleanAdditionalPropertiesProcessor implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum datum) { - List<StreamsDatum> result = Lists.newLinkedList(); + List<StreamsDatum> result = new LinkedList<>(); ObjectNode activity = this.mapper.convertValue(datum.getDocument(), ObjectNode.class); cleanAdditionalProperties(activity); datum.setDocument(activity); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java index 4736ee2..26c7c4f 100644 --- a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java +++ b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java @@ -24,12 +24,12 @@ import org.apache.streams.core.StreamsProcessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; /** @@ -42,7 +42,7 @@ public class TypeConverterProcessor implements StreamsProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); - private List<String> formats = Lists.newArrayList(); + private List<String> formats = new ArrayList<>(); private ObjectMapper mapper; @@ -91,7 +91,7 @@ public class TypeConverterProcessor implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newLinkedList(); + List<StreamsDatum> result = new LinkedList<>(); Object inDoc = entry.getDocument(); ObjectNode node = null; if ( inClass == String.class http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java index e0759c3..cea963a 100644 --- a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java +++ b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java @@ -25,16 +25,15 @@ import org.apache.streams.jackson.TypeConverterProcessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; - import org.junit.Test; import java.io.IOException; +import java.util.Collections; import java.util.List; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** * @@ -48,7 +47,7 @@ public class TypeConverterProcessorTest { @Test public void testTypeConverterStringToString() { final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(String.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + StreamsProcessor processor = new TypeConverterProcessor(String.class, String.class, Collections.singletonList(DATASIFT_FORMAT)); processor.prepare(null); StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); List<StreamsDatum> result = processor.process(datum); @@ -64,7 +63,7 @@ public class TypeConverterProcessorTest { @Test public void testTypeConverterStringToObjectNode() { final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(String.class, ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); + StreamsProcessor processor = new TypeConverterProcessor(String.class, ObjectNode.class, Collections.singletonList(DATASIFT_FORMAT)); processor.prepare(null); StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); List<StreamsDatum> result = processor.process(datum); @@ -80,9 +79,9 @@ public class TypeConverterProcessorTest { @Test public void testTypeConverterObjectNodeToString() throws IOException { final String ID = "1"; - StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, String.class, Lists.newArrayList(DATASIFT_FORMAT)); + StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, String.class, Collections.singletonList(DATASIFT_FORMAT)); processor.prepare(null); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT)); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(DATASIFT_FORMAT)); ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class); StreamsDatum datum = new StreamsDatum(node, ID); List<StreamsDatum> result = processor.process(datum); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java index c2c3705..b1024bd 100644 --- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java +++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java @@ -28,15 +28,12 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.collect.Lists; import com.jayway.jsonpath.JsonPath; - import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Iterator; import java.util.List; /** @@ -107,15 +104,13 @@ public class JsonPathExtractor implements StreamsProcessor { } else if (readResult instanceof JSONArray) { LOGGER.info("Matched Array:"); JSONArray array = (JSONArray) readResult; - Iterator iterator = array.iterator(); - while (iterator.hasNext()) { - Object item = iterator.next(); - if ( item instanceof String ) { + for (Object item : array) { + if (item instanceof String) { LOGGER.info("String Item:" + item); String match = (String) item; StreamsDatum matchDatum = new StreamsDatum(match); result.add(matchDatum); - } else if ( item instanceof JSONObject ) { + } else if (item instanceof JSONObject) { LOGGER.info("Object Item:" + item); JSONObject match = (JSONObject) item; ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java index ec741c2..dfeb8a2 100644 --- a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java +++ b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java @@ -29,20 +29,18 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.jayway.jsonpath.JsonPath; - import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Provides a base implementation for filtering datums which @@ -77,7 +75,7 @@ public class JsonPathFilter implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); String json = null; @@ -102,7 +100,7 @@ public class JsonPathFilter implements StreamsProcessor { } } - Preconditions.checkNotNull(document); + Objects.requireNonNull(document); if ( StringUtils.isNotEmpty(json)) { @@ -114,7 +112,7 @@ public class JsonPathFilter implements StreamsProcessor { LOGGER.warn(ex.getMessage()); } - Preconditions.checkNotNull(srcResult); + Objects.requireNonNull(srcResult); String[] path = StringUtils.split(pathExpression, '.'); ObjectNode node = document; @@ -122,7 +120,7 @@ public class JsonPathFilter implements StreamsProcessor { node = (ObjectNode) document.get(path[i]); } - Preconditions.checkNotNull(node); + Objects.requireNonNull(node); if ( srcResult instanceof JSONArray ) { try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java index 1ab7c00..50ea9ba 100644 --- a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java +++ b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java @@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.json.JsonPathExtractor; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java index 206931f..305ddfd 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java @@ -27,15 +27,14 @@ import org.apache.streams.pojo.json.Activity; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; - +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -77,7 +76,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce } else { return new ArrayList<>(); } - if (Strings.isNullOrEmpty(pattern)) { + if (StringUtils.isBlank(pattern)) { prepare(null); } Map<String, List<Integer>> matches = RegexUtils.extractMatches(pattern, activity.getContent()); @@ -92,7 +91,7 @@ public abstract class AbstractRegexExtensionExtractor<T> implements StreamsProce entities.addAll(set); entry.setDocument(activity); - return Lists.newArrayList(entry); + return Collections.singletonList(entry); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java index d1936d1..d46c2af 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java @@ -21,7 +21,6 @@ package org.apache.streams.regex; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java index 6e17de8..1912ff0 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java @@ -19,10 +19,11 @@ package org.apache.streams.regex; -import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; + +import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java index 66f7aa5..bb0e95d 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java @@ -19,15 +19,21 @@ package org.apache.streams.regex; -import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; + +import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java index d5d8d9b..64e8599 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java @@ -19,14 +19,18 @@ package org.apache.streams.regex; -import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.json.Activity; + +import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java index 42867d7..cefac6f 100644 --- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolver.java @@ -18,9 +18,8 @@ package org.apache.streams.urls; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.commons.codec.net.URLCodec; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.validator.routines.UrlValidator; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -28,12 +27,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLDecoder; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; public class LinkResolver implements Serializable { @@ -73,7 +76,7 @@ public class LinkResolver implements Serializable { // To help canonicalize the URL, these parts are 'known' to be 'ok' to remove private static final Collection<String> URL_TRACKING_TO_REMOVE = new ArrayList<String>() {{ - /****************************************************************** + /* * Google uses parameters in the URL string to track referrers * on their Google Analytics and promotions. These are the * identified URL patterns. @@ -102,7 +105,7 @@ public class LinkResolver implements Serializable { // This element holds all the information about all the re-directs that have taken place // and the steps and HTTP codes that occurred inside of each step. private final LinkDetails linkDetails; - private Collection<String> domainsSensitiveTo = new HashSet<String>(); + private Collection<String> domainsSensitiveTo = new HashSet<>(); /** * Get the link details @@ -125,7 +128,7 @@ public class LinkResolver implements Serializable { public void run() { - Preconditions.checkNotNull(linkDetails.getOriginalURL()); + Objects.requireNonNull(linkDetails.getOriginalURL()); linkDetails.setStartTime(DateTime.now()); @@ -140,22 +143,22 @@ public class LinkResolver implements Serializable { this.linkDetails.setRedirected(false); linkDetails.setFinalURL(cleanURL(linkDetails.getFinalURL())); - if( !Strings.isNullOrEmpty(linkDetails.getFinalURL())) + if(StringUtils.isNotBlank(linkDetails.getFinalURL())) linkDetails.setNormalizedURL(normalizeURL(linkDetails.getFinalURL())); - if( !Strings.isNullOrEmpty(linkDetails.getNormalizedURL())) + if(StringUtils.isNotBlank(linkDetails.getNormalizedURL())) linkDetails.setUrlParts(tokenizeURL(linkDetails.getNormalizedURL())); this.updateTookInMillis(); } protected void updateTookInMillis() { - Preconditions.checkNotNull(linkDetails.getStartTime()); + Objects.requireNonNull(linkDetails.getStartTime()); linkDetails.setTookInMills(DateTime.now().minus(linkDetails.getStartTime().getMillis()).getMillis()); } public void unwindLink(String url) { - Preconditions.checkNotNull(linkDetails); - Preconditions.checkNotNull(url); + Objects.requireNonNull(linkDetails); + Objects.requireNonNull(url); // Check url validity UrlValidator urlValidator = new UrlValidator(); @@ -238,7 +241,7 @@ public class LinkResolver implements Serializable { linkDetails.setFinalResponseCode((long) connection.getResponseCode()); Map<String, List<String>> headers = createCaseInsensitiveMap(connection.getHeaderFields()); - /****************************************************************** + /* * If they want us to set cookies, well, then we will set cookies * Example URL: * http://nyti.ms/1bCpesx @@ -247,7 +250,7 @@ public class LinkResolver implements Serializable { linkDetails.getCookies().add(headers.get(SET_COOKIE_IDENTIFIER).get(0)); switch (linkDetails.getFinalResponseCode().intValue()) { - /** + /* * W3C HTTP Response Codes: * http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html */ @@ -263,7 +266,7 @@ public class LinkResolver implements Serializable { case 304: // Not Modified case 306: // This status code is unused but in the redirect block. case 307: // Temporary re-direct - /******************************************************************* + /* * Author: * Smashew * @@ -338,7 +341,7 @@ public class LinkResolver implements Serializable { } private Map<String, List<String>> createCaseInsensitiveMap(Map<String, List<String>> input) { - Map<String, List<String>> toReturn = new HashMap<String, List<String>>(); + Map<String, List<String>> toReturn = new HashMap<>(); for (String k : input.keySet()) if (k != null && input.get(k) != null) toReturn.put(k.toLowerCase(), input.get(k)); @@ -418,7 +421,7 @@ public class LinkResolver implements Serializable { // If you want to just look in the GET parameters, or you want to ignore the domain // or you want to use the domain as a token itself, that would have to be // processed above the next line, and only the remaining parts split - List<String> toReturn = new ArrayList<String>(); + List<String> toReturn = new ArrayList<>(); // Split the URL by forward slashes. Most modern browsers will accept a URL // this malformed such as http://www.smashew.com/hello//how////are/you http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java index d6785e9..5493fef 100644 --- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverHelperFunctions.java @@ -18,7 +18,11 @@ package org.apache.streams.urls; -import java.util.*; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Pattern; @@ -57,7 +61,7 @@ public final class LinkResolverHelperFunctions { public static final long DEFAULT_STAGGER = RECENT_DOMAINS_BACKOFF / 10; // Map to store the information of recent domains, with the last time they were accessed. - private static final ConcurrentMap<String, Date> RECENT_DOMAINS = new ConcurrentHashMap<String, Date>(); + private static final ConcurrentMap<String, Date> RECENT_DOMAINS = new ConcurrentHashMap<>(); private static Timer timer; @@ -142,7 +146,7 @@ public final class LinkResolverHelperFunctions { // see if there is any work that 'can' be done if(RECENT_DOMAINS.size() != 0) { // create a temporary list of the items that can be removed - Collection<String> ableToRemove = new HashSet<String>(); + Collection<String> ableToRemove = new HashSet<>(); // iterate through all the domains (keys) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java index 203b414..6c88e28 100644 --- a/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java +++ b/streams-contrib/streams-processor-urls/src/main/java/org/apache/streams/urls/LinkResolverProcessor.java @@ -18,16 +18,19 @@ package org.apache.streams.urls; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class LinkResolverProcessor implements StreamsProcessor { @@ -102,7 +105,7 @@ public class LinkResolverProcessor implements StreamsProcessor { protected Set<String> unwind(List<String> inputLinks) { - Set<String> outputLinks = new HashSet<String>(); + Set<String> outputLinks = new HashSet<>(); for (String link : inputLinks) { try { LinkResolver unwinder = new LinkResolver(link); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java index 12f9848..6c8439f 100644 --- a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java +++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/LinkHelperFunctionsTest.java @@ -24,9 +24,9 @@ import org.slf4j.LoggerFactory; import java.util.Date; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class LinkHelperFunctionsTest { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java index 5908baa..3af3a89 100644 --- a/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java +++ b/streams-contrib/streams-processor-urls/src/test/java/org/apache/streams/urls/TestLinkUnwinderProcessor.java @@ -18,24 +18,22 @@ package org.apache.streams.urls; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.commons.lang3.SerializationUtils; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.jackson.StreamsJacksonModule; import org.apache.streams.pojo.json.Activity; + +import org.apache.commons.lang3.SerializationUtils; import org.junit.Ignore; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.List; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; -/** - * Created by rebanks on 2/27/14. - */ public class TestLinkUnwinderProcessor { private static String activityString; @@ -73,44 +71,44 @@ public class TestLinkUnwinderProcessor { @Ignore @Test public void testActivityLinkUnwinderProcessorBitly() throws Exception{ - testActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://bit.ly/1cX5Rh4"), Lists.newArrayList("http://www.wcgworld.com/")); + testActivityUnwinderHelper(Collections.singletonList("http://bit.ly/1cX5Rh4"), Collections.singletonList("http://www.wcgworld.com/")); + testStringActivityUnwinderHelper(Collections.singletonList("http://bit.ly/1cX5Rh4"), Collections.singletonList("http://www.wcgworld.com/")); } @Ignore @Test public void testActivityLinkUnwinderProcessorTdotCo() throws Exception{ - testActivityUnwinderHelper(Lists.newArrayList("http://t.co/lLFgFynv2G"), Lists.newArrayList("http://www.holmesreport.com/latest")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://t.co/lLFgFynv2G"), Lists.newArrayList("http://www.holmesreport.com/latest")); + testActivityUnwinderHelper(Collections.singletonList("http://t.co/lLFgFynv2G"), Collections.singletonList("http://www.holmesreport.com/latest")); + testStringActivityUnwinderHelper(Collections.singletonList("http://t.co/lLFgFynv2G"), Collections.singletonList("http://www.holmesreport.com/latest")); } @Ignore @Test public void testActivityLinkUnwinderProcessorGoogle() throws Exception{ - testActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/")); + testActivityUnwinderHelper(Collections.singletonList("http://goo.gl/wSrHDA"), Collections.singletonList("http://www.wcgworld.com/")); + testStringActivityUnwinderHelper(Collections.singletonList("http://goo.gl/wSrHDA"), Collections.singletonList("http://www.wcgworld.com/")); } @Ignore @Test public void testActivityLinkUnwinderProcessorOwly() throws Exception{ - testActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://ow.ly/u4Kte"), Lists.newArrayList("http://www.wcgworld.com/")); + testActivityUnwinderHelper(Collections.singletonList("http://ow.ly/u4Kte"), Collections.singletonList("http://www.wcgworld.com/")); + testStringActivityUnwinderHelper(Collections.singletonList("http://ow.ly/u4Kte"), Collections.singletonList("http://www.wcgworld.com/")); } @Ignore @Test public void testActivityLinkUnwinderProcessorGoDaddy() throws Exception{ - testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt"), Lists.newArrayList("http://www.wcgworld.com/")); + testActivityUnwinderHelper(Collections.singletonList("http://x.co/3yapt"), Collections.singletonList("http://www.wcgworld.com/")); + testStringActivityUnwinderHelper(Collections.singletonList("http://x.co/3yapt"), Collections.singletonList("http://www.wcgworld.com/")); } @Ignore @Test public void testActivityLinkUnwinderProcessorMulti() throws Exception{ // changed these tests because the processor now guarantees each result returned only once - testActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/")); - testStringActivityUnwinderHelper(Lists.newArrayList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Lists.newArrayList("http://www.wcgworld.com/")); + testActivityUnwinderHelper(Arrays.asList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Collections.singletonList("http://www.wcgworld.com/")); + testStringActivityUnwinderHelper(Arrays.asList("http://x.co/3yapt", "http://ow.ly/u4Kte", "http://goo.gl/wSrHDA"), Collections.singletonList("http://www.wcgworld.com/")); } public void testActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{ @@ -137,7 +135,7 @@ public class TestLinkUnwinderProcessor { assertNotNull(resultActivity.getLinks()); List<String> resultLinks = resultActivity.getLinks(); assertEquals(expected.size(), resultLinks.size()); - assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks)); + assertEquals(expected, resultLinks); } public void testStringActivityUnwinderHelper(List<String> input, List<String> expected) throws Exception{ @@ -159,7 +157,7 @@ public class TestLinkUnwinderProcessor { assertNotNull(resultActivity.getLinks()); List<String> resultLinks = resultActivity.getLinks(); assertEquals(expected.size(), resultLinks.size()); - assertEquals(Sets.newHashSet(expected), Sets.newHashSet(resultLinks)); + assertEquals(expected, resultLinks); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java index 1216c38..7dd6769 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java @@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.lang.NotImplementedException; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java index 306fecc..0710a2c 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java @@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.lang.NotImplementedException; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java index 92cf333..7a25b23 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java @@ -29,13 +29,11 @@ import org.apache.streams.facebook.provider.FacebookEventClassifier; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +63,7 @@ public class FacebookTypeConverter implements StreamsProcessor { private int count = 0; - public static final String TERMINATE = new String("TERMINATE"); + public static final String TERMINATE = "TERMINATE"; public FacebookTypeConverter(Class inClass, Class outClass) { this.inClass = inClass; @@ -138,8 +136,6 @@ public class FacebookTypeConverter implements StreamsProcessor { while (parser.nextToken() != null) { } valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); } catch (IOException ioe) { LOGGER.warn("validate: {}", ioe); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java index 617bfab..52ec222 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java @@ -28,17 +28,15 @@ import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; - +import facebook4j.Facebook; +import facebook4j.FacebookFactory; +import facebook4j.conf.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import facebook4j.Facebook; -import facebook4j.FacebookFactory; -import facebook4j.conf.ConfigurationBuilder; - /** * Abstract data collector for Facebook. Iterates over ids and queues data to be output * by a {@link org.apache.streams.core.StreamsProvider} @@ -66,7 +64,7 @@ public abstract class FacebookDataCollector implements Runnable { this.queue = queue; this.isComplete = new AtomicBoolean(false); this.backOff = new ExponentialBackOffStrategy(5); - this.authTokens = new BasicTokenManager<String>(); + this.authTokens = new BasicTokenManager<>(); if (config.getUserAccessTokens() != null) { for (String token : config.getUserAccessTokens()) { this.authTokens.addTokenToPool(token); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java index 47c2afb..2370810 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java @@ -24,7 +24,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java index 3253479..93508b0 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java @@ -34,7 +34,15 @@ import com.google.common.collect.Queues; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; - +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Friend; +import facebook4j.Paging; +import facebook4j.Post; +import facebook4j.ResponseList; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -54,16 +62,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import facebook4j.Facebook; -import facebook4j.FacebookException; -import facebook4j.FacebookFactory; -import facebook4j.Friend; -import facebook4j.Paging; -import facebook4j.Post; -import facebook4j.ResponseList; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; - public class FacebookFriendFeedProvider implements StreamsProvider, Serializable { public static final String STREAMS_ID = "FacebookFriendFeedProvider"; @@ -78,7 +76,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable private Class klass; protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<>(); public FacebookUserstreamConfiguration getConfig() { return configuration; @@ -103,7 +101,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { return new ThreadPoolExecutor(numThreads, numThreads, 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } /** @@ -116,7 +114,6 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); } catch (IOException ex) { ex.printStackTrace(); - return; } } @@ -193,8 +190,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable this.start = start; this.end = end; readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; + return (StreamsResultSet)providerQueue.iterator(); } @Override @@ -266,9 +262,8 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable .setClientVersion("v1.0"); FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - return facebook; + return ff.getInstance(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java index 50ac64a..c973863 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java @@ -34,7 +34,15 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; - +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Friend; +import facebook4j.Paging; +import facebook4j.Post; +import facebook4j.ResponseList; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -44,6 +52,7 @@ import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; import java.util.Iterator; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -55,22 +64,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import facebook4j.Facebook; -import facebook4j.FacebookException; -import facebook4j.FacebookFactory; -import facebook4j.Friend; -import facebook4j.Paging; -import facebook4j.Post; -import facebook4j.ResponseList; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; - /** * FacebookFriendUpdatesProvider provides updates from friend feed. */ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable { - public static final String STREAMS_ID = "FacebookFriendPostsProvider"; + private static final String STREAMS_ID = "FacebookFriendPostsProvider"; private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class); @@ -84,7 +83,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa private Class klass; protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<>(); public FacebookUserstreamConfiguration getConfig() { return configuration; @@ -110,7 +109,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { return new ThreadPoolExecutor(numThreads, numThreads, 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } /** @@ -123,7 +122,6 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); } catch (IOException ex) { ex.printStackTrace(); - return; } } @@ -209,8 +207,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa this.start = start; this.end = end; readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; + return (StreamsResultSet)providerQueue.iterator(); } @Override @@ -242,11 +239,11 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + Objects.requireNonNull(providerQueue); + Objects.requireNonNull(this.klass); + Objects.requireNonNull(configuration.getOauth().getAppId()); + Objects.requireNonNull(configuration.getOauth().getAppSecret()); + Objects.requireNonNull(configuration.getOauth().getUserAccessToken()); Facebook client = getFacebookClient(); @@ -254,11 +251,6 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa ResponseList<Friend> friendResponseList = client.friends().getFriends(); Paging<Friend> friendPaging; do { - - for ( Friend friend : friendResponseList ) { - // client.rawAPI().callPostAPI(); - // add a subscription - } friendPaging = friendResponseList.getPaging(); friendResponseList = client.fetchNext(friendPaging); } @@ -283,9 +275,8 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa .setClientVersion("v1.0"); FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - return facebook; + return ff.getInstance(); } @Override
