http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java index 1d52b5c..6901b4f 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java @@ -18,11 +18,20 @@ package org.apache.streams.components.http.processor; +import org.apache.streams.components.http.HttpProcessorConfiguration; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.extensions.ExtensionUtil; +import org.apache.streams.pojo.json.ActivityObject; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -32,14 +41,6 @@ import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpProcessorConfiguration; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.extensions.ExtensionUtil; -import org.apache.streams.pojo.json.ActivityObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -244,7 +245,7 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { public HttpPost prepareHttpPost(URI uri, HttpEntity entity) { HttpPost httpPost = new HttpPost(uri); httpPost.addHeader("content-type", this.configuration.getContentType()); - if ( !Strings.isNullOrEmpty(authHeader)) { + if (StringUtils.isNotBlank(authHeader)) { httpPost.addHeader("Authorization", String.format("Basic %s", authHeader)); } httpPost.setEntity(entity); @@ -261,11 +262,11 @@ public class SimpleHTTPPostProcessor implements StreamsProcessor { .setHost(this.configuration.getHostname()) .setPath(this.configuration.getResourcePath()); - if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) { + if (StringUtils.isNotBlank(configuration.getAccessToken()) ) { uriBuilder = uriBuilder.addParameter("access_token", configuration.getAccessToken()); } - if ( !Strings.isNullOrEmpty(configuration.getUsername()) - && !Strings.isNullOrEmpty(configuration.getPassword())) { + if (StringUtils.isNotBlank(configuration.getUsername()) + && StringUtils.isNotBlank(configuration.getPassword())) { String string = configuration.getUsername() + ":" + configuration.getPassword(); authHeader = Base64.encodeBase64String(string.getBytes()); }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java index ab11a68..e46342e 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java @@ -18,12 +18,19 @@ package org.apache.streams.components.http.provider; +import org.apache.streams.components.http.HttpProviderConfiguration; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; @@ -37,13 +44,6 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; -import org.apache.streams.components.http.HttpProviderConfiguration; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +60,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -236,7 +237,7 @@ public class SimpleHttpProvider implements StreamsProvider { protected List<ObjectNode> execute(URI uri) { - Preconditions.checkNotNull(uri); + Objects.requireNonNull(uri); List<ObjectNode> results = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java index 2333c4b..a74e6bc 100644 --- a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java +++ b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java @@ -23,7 +23,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java index 5eea60e..ce88a23 100644 --- a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java +++ b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java @@ -21,7 +21,6 @@ package org.apache.streams.config; import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java index 319b32a..83e98b5 100644 --- a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java +++ b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java ---------------------------------------------------------------------- diff --git a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java index 82cc6bc..80fa7de 100644 --- a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java +++ b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java @@ -18,13 +18,14 @@ package org.apache.streams.config.test; +import org.apache.streams.config.ComponentConfiguration; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; + import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigRenderOptions; -import org.apache.streams.config.ComponentConfiguration; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfigurator; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java ---------------------------------------------------------------------- diff --git a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java index a29d8c7..64010c1 100644 --- a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java +++ b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java @@ -18,34 +18,19 @@ package org.apache.streams.config.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.streams.config.ComponentConfiguration; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintStream; -import java.io.Serializable; -import java.util.Map; -import java.util.Scanner; - /** - * Test for - * @see {@link org.apache.streams.config.StreamsConfigurator} + * Test for {@link org.apache.streams.config.StreamsConfigurator} */ public class StreamsConfiguratorTest { - private static final ObjectMapper mapper = new ObjectMapper(); - @Test public void testDetectConfiguration() throws Exception { @@ -53,7 +38,7 @@ public class StreamsConfiguratorTest { Config detected = StreamsConfigurator.getConfig(); - junit.framework.Assert.assertEquals(config, detected); + Assert.assertEquals(config, detected); StreamsConfiguration defaultPojo = StreamsConfigurator.detectConfiguration(); @@ -63,7 +48,7 @@ public class StreamsConfiguratorTest { assert(configuredPojo != null); - junit.framework.Assert.assertEquals(configuredPojo, defaultPojo); + Assert.assertEquals(configuredPojo, defaultPojo); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java index e3bfe70..42cce8a 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java @@ -37,7 +37,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Queues; import com.typesafe.config.Config; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +79,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable public KinesisPersistReader() { Config config = StreamsConfigurator.config.getConfig("kinesis"); this.config = new ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } /** @@ -88,7 +87,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable */ public KinesisPersistReader(KinesisReaderConfiguration config) { this.config = config; - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public void setConfig(KinesisReaderConfiguration config) { @@ -176,7 +175,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable while( !executor.isTerminated()) { try { executor.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) {} + } catch (InterruptedException ignored) {} } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java index a93fda8..37891be 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java @@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.Record; import com.google.common.collect.Maps; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java index b61a364..6e2db0f 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java @@ -37,7 +37,6 @@ import com.amazonaws.services.kinesis.model.PutRecordResult; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.typesafe.config.Config; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +76,7 @@ public class KinesisPersistWriter implements StreamsPersistWriter { public KinesisPersistWriter() { Config config = StreamsConfigurator.config.getConfig("kinesis"); this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config); - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } /** @@ -85,7 +84,7 @@ public class KinesisPersistWriter implements StreamsPersistWriter { */ public KinesisPersistWriter(KinesisWriterConfiguration config) { this.config = config; - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public void setConfig(KinesisWriterConfiguration config) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java index f34782a..e70c361 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java @@ -20,7 +20,6 @@ package org.apache.streams.s3; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java index e8ca0c7..87bd8c8 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java @@ -22,7 +22,6 @@ import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.Upload; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index 753b439..da1a00e 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -39,7 +39,6 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Queues; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,15 +146,15 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab ObjectListing listing = this.amazonS3Client.listObjects(request); - this.files = new ArrayList<String>(); + this.files = new ArrayList<>(); - /** + /* * If you can list files that are in this path, then you must be dealing with a directory * if you cannot list files that are in this path, then you are most likely dealing with * a simple file. */ - boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false; - boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false; + boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0; + boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0; if (hasCommonPrefixes || hasObjectSummaries) { // Handle the 'directory' use case http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index f0e9626..82bcba7 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -23,7 +23,6 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.util.ComponentUtils; import com.google.common.base.Strings; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +54,7 @@ public class S3PersistReaderTask implements Runnable { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); LOGGER.info("Reading: {} ", file); - String line = ""; + String line; try { while ((line = bufferedReader.readLine()) != null) { if ( !Strings.isNullOrEmpty(line) ) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index ef6e831..34e4b67 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -39,7 +39,6 @@ import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +69,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab private ObjectMapper objectMapper; private AmazonS3Client amazonS3Client; private S3WriterConfiguration s3WriterConfiguration; - private final List<String> writtenFiles = new ArrayList<String>(); + private final List<String> writtenFiles = new ArrayList<>(); protected LineReadWriteUtil lineWriterUtil; private final AtomicLong totalBytesWritten = new AtomicLong(); @@ -79,7 +78,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab private final AtomicInteger totalRecordsWritten = new AtomicInteger(); private AtomicInteger fileLineCounter = new AtomicInteger(); - private static Map<String, String> objectMetaData = new HashMap<String, String>(); + private static Map<String, String> objectMetaData = new HashMap<>(); static { objectMetaData.put("line[0]", "id"); @@ -103,7 +102,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab } public Map<String, String> getObjectMetaData() { - return this.objectMetaData; + return objectMetaData; } public ObjectMapper getObjectMapper() { @@ -115,7 +114,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab } public void setObjectMetaData(Map<String, String> val) { - this.objectMetaData = val; + objectMetaData = val; } public S3PersistWriter() { @@ -203,7 +202,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath(), fileName, - this.objectMetaData); + objectMetaData); // reset the counter this.fileLineCounter = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java index 6358071..dfbd91e 100644 --- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java +++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java @@ -24,12 +24,11 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintStream; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -42,14 +41,14 @@ public class ConsolePersistWriter implements StreamsPersistWriter { private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class); - protected PrintStream printStream = System.out; + private PrintStream printStream = System.out; protected volatile Queue<StreamsDatum> persistQueue; private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); public ConsolePersistWriter() { - this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + this.persistQueue = new ConcurrentLinkedQueue<>(); } public ConsolePersistWriter(PrintStream printStream) { @@ -63,7 +62,7 @@ public class ConsolePersistWriter implements StreamsPersistWriter { } public void prepare(Object configuration) { - Preconditions.checkNotNull(persistQueue); + Objects.requireNonNull(persistQueue); } public void cleanUp() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java index bdff9aa..f488263 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java @@ -19,7 +19,6 @@ package org.apache.streams.elasticsearch; import com.google.common.net.InetAddresses; - import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java index 789b62f..3c4ebc9 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java @@ -21,20 +21,19 @@ package org.apache.streams.elasticsearch; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import com.google.common.base.Preconditions; - import org.elasticsearch.action.delete.DeleteRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Objects; /** * ElasticsearchPersistDeleter deletes documents from elasticsearch. */ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter { - public static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName(); + private static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName(); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class); @@ -84,9 +83,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl public void delete(String index, String type, String id) { DeleteRequest deleteRequest; - Preconditions.checkNotNull(index); - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(type); + Objects.requireNonNull(index); + Objects.requireNonNull(id); + Objects.requireNonNull(type); // They didn't specify an ID, so we will create one for them. deleteRequest = new DeleteRequest() @@ -104,8 +103,8 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl */ public void add(DeleteRequest request) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + Objects.requireNonNull(request); + Objects.requireNonNull(request.index()); // If our queue is larger than our flush threshold, then we should flush the queue. synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 388497e..27285f9 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -27,7 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Queues; - import org.elasticsearch.search.SearchHit; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -200,7 +199,7 @@ public class ElasticsearchPersistReader implements StreamsPersistReader, Seriali StreamsDatum item; while (query.hasNext()) { SearchHit hit = query.next(); - ObjectNode jsonObject = null; + ObjectNode jsonObject; try { jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class); item = new StreamsDatum(jsonObject, hit.getId()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index f4da436..c8b95d6 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -21,21 +21,20 @@ package org.apache.streams.elasticsearch; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - +import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Objects; /** * ElasticsearchPersistUpdater updates documents to elasticsearch. */ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter { - public static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName(); + private static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName(); private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class); @@ -97,8 +96,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl public void update(String indexName, String type, String id, String parent, String routing, String json) { UpdateRequest updateRequest; - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(json); + Objects.requireNonNull(id); + Objects.requireNonNull(json); // They didn't specify an ID, so we will create one for them. updateRequest = new UpdateRequest() @@ -107,11 +106,11 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl .id(id) .doc(json); - if (!Strings.isNullOrEmpty(parent)) { + if (StringUtils.isNotBlank(parent)) { updateRequest = updateRequest.parent(parent); } - if (!Strings.isNullOrEmpty(routing)) { + if (StringUtils.isNotBlank(routing)) { updateRequest = updateRequest.routing(routing); } @@ -128,8 +127,8 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl */ public void add(UpdateRequest request) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + Objects.requireNonNull(request); + Objects.requireNonNull(request.index()); // If our queue is larger than our flush threshold, then we should flush the queue. synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 07ab734..bc60197 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -27,8 +27,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -52,6 +50,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; @@ -425,9 +424,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Seriali public void add(String indexName, String type, String id, String parent, String routing, String ts, String json) { // make sure that these are not null - Preconditions.checkNotNull(indexName); - Preconditions.checkNotNull(type); - Preconditions.checkNotNull(json); + Objects.requireNonNull(indexName); + Objects.requireNonNull(type); + Objects.requireNonNull(json); IndexRequestBuilder indexRequestBuilder = manager.getClient() .prepareIndex(indexName, type) @@ -451,8 +450,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Seriali protected void add(IndexRequest request) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + Objects.requireNonNull(request); + Objects.requireNonNull(request.index()); // If our queue is larger than our flush threshold, then we should flush the queue. synchronized (this) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 06a6dc8..a77101b 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -23,7 +23,6 @@ import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.core.JsonProcessingException; - import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java index 6ce15d4..28f8b38 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.typesafe.config.Config; - import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java index bef190e..cc35b05 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java @@ -27,7 +27,6 @@ import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import com.typesafe.config.Config; - import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java index 2a64fbc..105c453 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -26,7 +26,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java index 721ad42..e5efbb9 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java @@ -28,7 +28,6 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java index 69394ee..ab6341c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java @@ -32,9 +32,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -53,9 +50,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Queue; import java.util.Set; @@ -124,7 +122,7 @@ public class PercolateTagProcessor implements StreamsProcessor { @Override public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = Lists.newArrayList(); + List<StreamsDatum> result = new ArrayList<>(); String json; ObjectNode node; @@ -172,9 +170,8 @@ public class PercolateTagProcessor implements StreamsProcessor { ArrayNode tagArray = JsonNodeFactory.instance.arrayNode(); - Iterator<PercolateResponse.Match> matchIterator = response.iterator(); - while (matchIterator.hasNext()) { - tagArray.add(matchIterator.next().getId().string()); + for (PercolateResponse.Match aResponse : response) { + tagArray.add(aResponse.getId().string()); } LOGGER.trace("Percolate matches: {}", tagArray); @@ -202,14 +199,14 @@ public class PercolateTagProcessor implements StreamsProcessor { mapper = StreamsJacksonMapper.getInstance(); - Preconditions.checkNotNull(config); + Objects.requireNonNull(config); manager = new ElasticsearchClientManager(config); if ( config.getTags() != null && config.getTags().getAdditionalProperties().size() > 0) { // initial write tags to index createIndexIfMissing(config.getIndex()); - if ( config.getReplaceTags() == true ) { + if (config.getReplaceTags()) { deleteOldQueries(config.getIndex()); } for (String tag : config.getTags().getAdditionalProperties().keySet()) { @@ -219,7 +216,7 @@ public class PercolateTagProcessor implements StreamsProcessor { } bulkBuilder = manager.getClient().prepareBulk(); - if (writePercolateRules() == true) { + if (writePercolateRules()) { LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); } else { LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); @@ -230,7 +227,7 @@ public class PercolateTagProcessor implements StreamsProcessor { @Override public void cleanUp() { - if ( config.getCleanupTags() == true ) { + if (config.getCleanupTags()) { deleteOldQueries(config.getIndex()); } manager.getClient().close(); @@ -311,7 +308,7 @@ public class PercolateTagProcessor implements StreamsProcessor { * @return result */ public Set<String> getActivePercolateTags(String index) { - Set<String> tags = new HashSet<String>(); + Set<String> tags = new HashSet<>(); SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000); SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); SearchHits hits = response.getHits(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java index caa0b8d..91afef6 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java @@ -24,11 +24,9 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor; -import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.apache.commons.lang.SerializationUtils; import org.elasticsearch.client.Client; import org.junit.Assert; @@ -36,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.util.HashMap; import java.util.Map; /** @@ -70,7 +69,7 @@ public class DatumFromMetadataProcessorIT { @Test public void testDatumFromMetadataProcessor() { - Map<String, Object> metadata = Maps.newHashMap(); + Map<String, Object> metadata = new HashMap<>(); metadata.put("index", testConfiguration.getIndexes().get(0)); metadata.put("type", testConfiguration.getTypes().get(0)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java index 553a711..d3b1fd3 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java @@ -18,34 +18,26 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater; -import org.apache.streams.elasticsearch.ElasticsearchPersistWriter; import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; -import org.elasticsearch.action.count.CountRequest; -import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; @@ -62,14 +54,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileInputStream; import java.io.InputStream; -import java.net.URL; import java.util.List; -import java.util.Properties; import java.util.Set; -import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java index 6b52ce5..d35b8a2 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java @@ -18,14 +18,6 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Strings; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.elasticsearch.ElasticsearchClientManager; @@ -34,6 +26,15 @@ import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -61,7 +62,7 @@ import java.net.URL; import java.util.List; import java.util.Set; -import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -104,7 +105,7 @@ public class ElasticsearchParentChildWriterIT { DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); assertTrue(deleteIndexResponse.isAcknowledged()); - }; + } PutIndexTemplateRequestBuilder putTemplateRequestBuilder = testClient.admin().indices().preparePutTemplate("mappings"); URL templateURL = ElasticsearchParentChildWriterIT.class.getResource("/ActivityChildObjectParent.json"); @@ -133,7 +134,7 @@ public class ElasticsearchParentChildWriterIT { if(indicesExistsResponse.isExists()) { DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); - }; + } ElasticsearchPersistWriter testPersistWriter = new ElasticsearchPersistWriter(testConfiguration); testPersistWriter.prepare(null); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java index e356aff..603ec5f 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java @@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -53,7 +52,7 @@ import java.io.File; import java.io.InputStream; import java.util.List; -import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -120,7 +119,7 @@ public class ElasticsearchPersistUpdaterIT { Activity update = new Activity(); update.setAdditionalProperty("updated", Boolean.TRUE); update.setAdditionalProperty("str", "str"); - update.setAdditionalProperty("long", 10l); + update.setAdditionalProperty("long", 10L); update.setActor( new ActivityObject() .withAdditionalProperty("updated", Boolean.TRUE) http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java index f290971..3fec9f3 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java @@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; - import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -53,7 +52,7 @@ import java.io.File; import java.io.InputStream; import java.util.List; -import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -91,7 +90,7 @@ public class ElasticsearchPersistWriterIT { DeleteIndexRequest deleteIndexRequest = Requests.deleteIndexRequest(testConfiguration.getIndex()); DeleteIndexResponse deleteIndexResponse = testClient.admin().indices().delete(deleteIndexRequest).actionGet(); assertTrue(deleteIndexResponse.isAcknowledged()); - }; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java index 8900ef3..3e9f3e1 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestDocumentToMetadataProcessor.java @@ -18,24 +18,17 @@ package org.apache.streams.elasticsearch.test; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.commons.lang.SerializationUtils; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang.SerializationUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -/** - * Created by sblackmon on 10/20/14. - */ public class TestDocumentToMetadataProcessor { private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java index 76f10b1..7bd4341 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java @@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.ActivityObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; - import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.SerializationUtils; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java index b921ba5..32d6032 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java @@ -26,9 +26,7 @@ import org.apache.streams.core.StreamsResultSet; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; import com.squareup.tape.QueueFile; - import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +38,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; import java.math.BigInteger; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -50,11 +49,11 @@ import java.util.concurrent.Executors; */ public class FileBufferPersistReader implements StreamsPersistReader, Serializable { - public static final String STREAMS_ID = "FileBufferPersistReader"; + private static final String STREAMS_ID = "FileBufferPersistReader"; private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistReader.class); - protected volatile Queue<StreamsDatum> persistQueue; + private volatile Queue<StreamsDatum> persistQueue; private ObjectMapper mapper; @@ -109,7 +108,7 @@ public class FileBufferPersistReader implements StreamsPersistReader, Serializab } StreamsResultSet current; - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue)); persistQueue.clear(); return current; @@ -164,7 +163,7 @@ public class FileBufferPersistReader implements StreamsPersistReader, Serializab LOGGER.error(ex.getMessage()); } - Preconditions.checkNotNull(queueFile); + Objects.requireNonNull(queueFile); this.persistQueue = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java index 76dfafc..a4348f6 100644 --- a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java +++ b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java @@ -26,15 +26,15 @@ import org.apache.streams.util.GuidUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.squareup.tape.QueueFile; - +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -43,11 +43,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializable { - public static final String STREAMS_ID = "FileBufferPersistWriter"; + private static final String STREAMS_ID = "FileBufferPersistWriter"; private static final Logger LOGGER = LoggerFactory.getLogger(FileBufferPersistWriter.class); - protected volatile Queue<StreamsDatum> persistQueue; + private volatile Queue<StreamsDatum> persistQueue; private ObjectMapper mapper; @@ -74,9 +74,9 @@ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializab String key = entry.getId() != null ? entry.getId() : GuidUtils.generateGuid("filewriter"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(key)); + Preconditions.checkArgument(StringUtils.isNotBlank(key)); Preconditions.checkArgument(entry.getDocument() instanceof String); - Preconditions.checkArgument(!Strings.isNullOrEmpty((String) entry.getDocument())); + Preconditions.checkArgument(StringUtils.isNotBlank((String) entry.getDocument())); byte[] item = ((String)entry.getDocument()).getBytes(); try { @@ -102,7 +102,7 @@ public class FileBufferPersistWriter implements StreamsPersistWriter, Serializab Preconditions.checkArgument(file.exists()); Preconditions.checkArgument(file.canWrite()); - Preconditions.checkNotNull(queueFile); + Objects.requireNonNull(queueFile); this.persistQueue = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java index 7c3e019..de8419d 100644 --- a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java +++ b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/FileBufferPersistIT.java @@ -18,9 +18,6 @@ package org.apache.streams.filebuffer.test; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import org.apache.streams.console.ConsolePersistReader; import org.apache.streams.console.ConsolePersistWriter; import org.apache.streams.core.StreamBuilder; @@ -30,19 +27,18 @@ import org.apache.streams.filebuffer.FileBufferConfiguration; import org.apache.streams.filebuffer.FileBufferPersistReader; import org.apache.streams.filebuffer.FileBufferPersistWriter; import org.apache.streams.local.builders.LocalStreamBuilder; + import org.junit.After; import org.junit.Before; import org.junit.Test; - import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; -import static org.mockito.Mockito.mock; - import java.io.File; +import java.util.Arrays; +import java.util.HashMap; import java.util.Map; - -import static org.mockito.Mockito.mock; +import java.util.concurrent.ConcurrentLinkedQueue; /** * Tests {@link org.apache.streams.filebuffer.FileBufferPersistWriter } @@ -52,12 +48,12 @@ public class FileBufferPersistIT { private FileBufferConfiguration testConfiguration; - ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class); - ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class); + private ConsolePersistReader reader = Mockito.mock(ConsolePersistReader.class); + private ConsolePersistWriter writer = Mockito.mock(ConsolePersistWriter.class); - StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}"); - StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}"); - StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}"); + private StreamsDatum testDatum1 = new StreamsDatum("{\"datum\":1}"); + private StreamsDatum testDatum2 = new StreamsDatum("{\"datum\":2}"); + private StreamsDatum testDatum3 = new StreamsDatum("{\"datum\":3}"); @Before public void prepareTest() { @@ -71,8 +67,8 @@ public class FileBufferPersistIT { PowerMockito.when(reader.readCurrent()) .thenReturn( - new StreamsResultSet(Queues.newConcurrentLinkedQueue( - Lists.newArrayList(testDatum1, testDatum2, testDatum3))) + new StreamsResultSet(new ConcurrentLinkedQueue<>( + Arrays.asList(testDatum1, testDatum2, testDatum3))) ).thenReturn(null); } @@ -81,7 +77,7 @@ public class FileBufferPersistIT { assert(testConfiguration != null); - Map<String, Object> streamConfig = Maps.newHashMap(); + Map<String, Object> streamConfig = new HashMap<>(); streamConfig.put(LocalStreamBuilder.TIMEOUT_KEY, 1000); StreamBuilder builder = new LocalStreamBuilder(1, streamConfig); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java index 728e368..391e445 100644 --- a/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java +++ b/streams-contrib/streams-persist-filebuffer/src/test/java/org/apache/streams/filebuffer/test/TestFileBufferPersist.java @@ -23,6 +23,7 @@ import org.apache.streams.core.StreamsResultSet; import org.apache.streams.filebuffer.FileBufferConfiguration; import org.apache.streams.filebuffer.FileBufferPersistReader; import org.apache.streams.filebuffer.FileBufferPersistWriter; + import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java index fee2f6f..614dc95 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHelper.java @@ -18,24 +18,25 @@ package org.apache.streams.graph; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; +import com.fasterxml.jackson.databind.node.ObjectNode; + /** * Interface for methods allowing persistance to a graph database wrapped with * a rest API. CypherGraphHelper is a good example, for neo4j. */ public interface GraphHelper { - public ObjectNode getVertexRequest(String streamsId); + ObjectNode getVertexRequest(String streamsId); - public ObjectNode getVertexRequest(Long vertexId); + ObjectNode getVertexRequest(Long vertexId); - public ObjectNode createVertexRequest(ActivityObject activityObject); + ObjectNode createVertexRequest(ActivityObject activityObject); - public ObjectNode mergeVertexRequest(ActivityObject activityObject); + ObjectNode mergeVertexRequest(ActivityObject activityObject); - public ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination); + ObjectNode createEdgeRequest(Activity activity, ActivityObject source, ActivityObject destination); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java index 847328a..5b2dec6 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java @@ -34,9 +34,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -45,7 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -60,10 +60,10 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { private static final Logger LOGGER = LoggerFactory.getLogger(GraphHttpPersistWriter.class); private static final long MAX_WRITE_LATENCY = 1000; - protected GraphHttpConfiguration configuration; + private GraphHttpConfiguration configuration; - protected QueryGraphHelper queryGraphHelper; - protected HttpGraphHelper httpGraphHelper; + private QueryGraphHelper queryGraphHelper; + private HttpGraphHelper httpGraphHelper; private static ObjectMapper mapper; @@ -73,7 +73,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from JVM 'graph'. */ public GraphHttpPersistWriter() { - this(new ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); + this(new ComponentConfigurator<>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); } /** @@ -94,7 +94,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { protected ObjectNode preparePayload(StreamsDatum entry) throws Exception { Activity activity = null; - ActivityObject activityObject = null; + ActivityObject activityObject; Object document = entry.getDocument(); if (document instanceof Activity) { @@ -137,7 +137,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { // always add vertices first - List<String> labels = Lists.newArrayList("streams"); + List<String> labels = Collections.singletonList("streams"); if ( activityObject != null ) { if ( activityObject.getObjectType() != null ) { @@ -151,20 +151,17 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { ActivityObject actor = activity.getActor(); Provider provider = activity.getProvider(); - if ( provider != null - && !Strings.isNullOrEmpty(provider.getId()) ) { + if (provider != null && StringUtils.isNotBlank(provider.getId())) { labels.add(provider.getId()); } - if (actor != null - && !Strings.isNullOrEmpty(actor.getId())) { + if (actor != null && StringUtils.isNotBlank(actor.getId())) { if (actor.getObjectType() != null) { labels.add(actor.getObjectType()); } statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor))); } - if (activityObject != null - && !Strings.isNullOrEmpty(activityObject.getId())) { + if (activityObject != null && StringUtils.isNotBlank(activityObject.getId())) { if (activityObject.getObjectType() != null) { labels.add(activityObject.getObjectType()); } @@ -173,7 +170,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { // then add edge - if (!Strings.isNullOrEmpty(activity.getVerb())) { + if (StringUtils.isNotBlank(activity.getVerb())) { statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity))); } } @@ -186,7 +183,7 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { @Override protected ObjectNode executePost(HttpPost httpPost) { - Preconditions.checkNotNull(httpPost); + Objects.requireNonNull(httpPost); ObjectNode result = null; @@ -239,8 +236,8 @@ public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter { httpGraphHelper = new Neo4jHttpGraphHelper(); } - Preconditions.checkNotNull(queryGraphHelper); - Preconditions.checkNotNull(httpGraphHelper); + Objects.requireNonNull(queryGraphHelper); + Objects.requireNonNull(httpGraphHelper); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java index 7c6e341..9560083 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java @@ -33,11 +33,10 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -58,7 +57,7 @@ public class GraphVertexReader extends SimpleHttpProvider implements StreamsPers * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 'graph'. */ public GraphVertexReader() { - this(new ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); + this(new ComponentConfigurator<>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph"))); } /** @@ -83,7 +82,7 @@ public class GraphVertexReader extends SimpleHttpProvider implements StreamsPers * @return result */ public List<ObjectNode> parse(JsonNode jsonNode) { - List<ObjectNode> results = Lists.newArrayList(); + List<ObjectNode> results = new ArrayList<>(); ObjectNode root = (ObjectNode) jsonNode; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java index 17b8840..ca1f4e4 100644 --- a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java +++ b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java @@ -19,7 +19,6 @@ package org.apache.streams.graph; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.javatuples.Pair; import java.util.Map; @@ -30,6 +29,6 @@ import java.util.Map; */ public interface HttpGraphHelper { - public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters); + ObjectNode createHttpRequest(Pair<String, Map<String, Object>> queryPlusParameters); }
