http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java index 9de1863..81b4f79 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java @@ -25,7 +25,6 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.RssStreamConfiguration; import org.apache.streams.rss.provider.perpetual.RssFeedScheduler; import org.apache.streams.util.ComponentUtils; @@ -47,11 +46,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.math.BigInteger; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -65,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class RssStreamProvider implements StreamsProvider { - public static final String STREAMS_ID = "RssStreamProvider"; + private static final String STREAMS_ID = "RssStreamProvider"; private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProvider.class); @@ -142,7 +137,7 @@ public class RssStreamProvider implements StreamsProvider { @Override public void prepare(Object configurationObject) { - this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + this.executor = new ThreadPoolExecutor(1, 4, 15L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.dataQueue = new LinkedBlockingQueue<>(); this.scheduler = getScheduler(this.dataQueue); this.isComplete = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java index e4bfd35..975749f 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/perpetual/RssFeedScheduler.java @@ -22,10 +22,10 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.provider.RssStreamProviderTask; -import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -64,7 +64,7 @@ public class RssFeedScheduler implements Runnable { this.feedDetailsList = feedDetailsList; this.peroid = peroid; this.keepRunning = new AtomicBoolean(true); - this.lastScheduled = Maps.newHashMap(); + this.lastScheduled = new HashMap<>(); this.dataQueue = dataQueue; this.complete = new AtomicBoolean(false); } @@ -116,7 +116,7 @@ public class RssFeedScheduler implements Runnable { } if (currentTime - lastTime > pollInterval) { this.service.execute(new RssStreamProviderTask(this.dataQueue, detail.getUrl())); - this.LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl()); + LOGGER.trace("Scheduled data collection on rss feed, {}", detail.getUrl()); this.lastScheduled.put(detail.getUrl(), currentTime); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java index 1e3aedd..45d5d96 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntryActivitySerializer.java @@ -30,7 +30,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.slf4j.Logger; @@ -38,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; +import java.util.Objects; public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNode> { @@ -84,7 +84,7 @@ public class SyndEntryActivitySerializer implements ActivitySerializer<ObjectNod * @return Activity */ public Activity deserializeWithRomeExtension(ObjectNode entry, boolean withExtension) { - Preconditions.checkNotNull(entry); + Objects.requireNonNull(entry); Activity activity = new Activity(); Provider provider = buildProvider(entry); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java index 6868bfc..63920e7 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/serializer/SyndEntrySerializer.java @@ -21,7 +21,6 @@ package org.apache.streams.rss.serializer; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import com.sun.syndication.feed.module.Module; import com.sun.syndication.feed.rss.Category; import com.sun.syndication.feed.rss.Content; @@ -37,7 +36,6 @@ import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; import java.util.Date; import java.util.List; @@ -250,9 +248,7 @@ public class SyndEntrySerializer { } private void serializeLinks(ObjectNode root, JsonNodeFactory factory, List links) { - if (links == null || links.size() == 0) { - return; - } else if (links.get(0) instanceof String) { + if (links.get(0) instanceof String) { serializeListOfStrings(links, "links", root, factory); } else if (links.get(0) instanceof SyndLinkImpl) { ArrayNode linksArray = factory.arrayNode(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java index 08a58d3..102c3db 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTest.java @@ -24,13 +24,13 @@ import org.apache.streams.rss.RssStreamConfiguration; import org.apache.streams.rss.provider.perpetual.RssFeedScheduler; import com.carrotsearch.randomizedtesting.RandomizedTest; -import com.google.common.collect.Queues; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; /** * Unit tests for {@link org.apache.streams.rss.provider.RssStreamProvider} @@ -44,7 +44,7 @@ public class RssStreamProviderTest extends RandomizedTest { RssStreamProvider provider = null; try { final CountDownLatch latch = new CountDownLatch(1); - BlockingQueue<StreamsDatum> datums = Queues.newLinkedBlockingQueue(); + BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>(); provider = new RssStreamProvider(new RssStreamConfiguration()) { @Override protected RssFeedScheduler getScheduler(BlockingQueue<StreamsDatum> queue) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java index 830f0e7..779c2ab 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/perpetual/RssFeedSchedulerTest.java @@ -18,16 +18,13 @@ package org.apache.streams.rss.provider.perpetual; -import org.apache.streams.core.StreamsDatum; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.provider.RssStreamProviderTask; -import com.google.common.collect.Lists; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -51,15 +48,12 @@ public class RssFeedSchedulerTest { public void testScheduleFeeds() { ExecutorService mockService = mock(ExecutorService.class); final List<String> queuedTasks = new ArrayList<>(5); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed()); - return null; - } + doAnswer(invocationOnMock -> { + queuedTasks.add(((RssStreamProviderTask) invocationOnMock.getArguments()[0]).getRssFeed()); + return null; }).when(mockService).execute(any(Runnable.class)); - RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<StreamsDatum>(), 1); + RssFeedScheduler scheduler = new RssFeedScheduler(mockService, createFeedList(), new LinkedBlockingQueue<>(), 1); scheduler.scheduleFeeds(); assertEquals("Expected 2 Feeds to be scheduled", 2, queuedTasks.size()); assertEquals("Expected Feed 1 to be queued first", "1", queuedTasks.get(0)); @@ -78,7 +72,7 @@ public class RssFeedSchedulerTest { } private List<FeedDetails> createFeedList() { - List<FeedDetails> list = Lists.newLinkedList(); + List<FeedDetails> list = new LinkedList<>(); FeedDetails fd = new FeedDetails(); fd.setPollIntervalMillis(1L); fd.setUrl("1"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java index ccd8b74..c3f10f9 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java @@ -26,7 +26,6 @@ import org.apache.streams.rss.provider.RssStreamProvider; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.junit.Assert; import org.junit.Test; @@ -43,13 +42,11 @@ import java.io.InputStreamReader; import java.io.LineNumberReader; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.util.ArrayList; import java.util.List; import static org.hamcrest.number.OrderingComparison.greaterThan; -/** - * Created by sblackmon on 2/5/14. - */ public class RssStreamProviderIT { private static final Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class); @@ -67,7 +64,7 @@ public class RssStreamProviderIT { BufferedReader br = new BufferedReader(isr); RssStreamConfiguration configuration = new RssStreamConfiguration(); - List<FeedDetails> feedArray = Lists.newArrayList(); + List<FeedDetails> feedArray = new ArrayList<>(); try { while (br.ready()) { String line = br.readLine(); @@ -77,7 +74,6 @@ public class RssStreamProviderIT { } configuration.setFeeds(feedArray); } catch ( Exception ex ) { - System.out.println(ex); ex.printStackTrace(); Assert.fail(); } @@ -101,7 +97,7 @@ public class RssStreamProviderIT { assert (config.canRead()); assert (config.isFile()); - RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + RssStreamProvider.main(new String[]{configfile, outfile}); File out = new File(outfile); assert (out.exists()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java index 01f1999..37ff7cf 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/SyndEntryActivitySerializerIT.java @@ -27,7 +27,6 @@ import org.apache.streams.rss.serializer.SyndEntryActivitySerializer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -35,7 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URL; +import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Scanner; import static org.junit.Assert.assertEquals; @@ -54,8 +55,8 @@ public class SyndEntryActivitySerializerIT { @Test public void testJsonData() throws Exception { Scanner scanner = new Scanner(this.getClass().getResourceAsStream("/TestSyndEntryJson.txt")); - List<Activity> activities = Lists.newLinkedList(); - List<ObjectNode> objects = Lists.newLinkedList(); + List<Activity> activities = new LinkedList<>(); + List<ObjectNode> objects = new LinkedList<>(); SyndEntryActivitySerializer serializer = new SyndEntryActivitySerializer(); @@ -125,7 +126,8 @@ public class SyndEntryActivitySerializerIT { } public void testUrl(String expectedUri, String expectedLink, Activity activity) { - assertTrue((expectedUri == activity.getUrl() || expectedLink == activity.getUrl())); - assertTrue((expectedUri == activity.getObject().getUrl() || expectedLink == activity.getObject().getUrl())); + assertTrue((Objects.equals(expectedUri, activity.getUrl()) || Objects.equals(expectedLink, activity.getUrl()))); + assertTrue((Objects.equals(expectedUri, activity.getObject().getUrl()) || + Objects.equals(expectedLink, activity.getObject().getUrl()))); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java index 6b59d1e..95d8ad9 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosClient.java @@ -19,11 +19,7 @@ package org.apache.streams.sysomos.provider; -import org.apache.streams.sysomos.data.HeartbeatInfo; -import org.apache.streams.sysomos.util.SysomosUtils; - import java.net.HttpURLConnection; -import java.net.URL; /** * Wrapper for the Sysomos API. http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java index f555e8d..915a8cf 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java @@ -30,12 +30,12 @@ import org.apache.streams.twitter.pojo.UserstreamEvent; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; import static org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT; @@ -47,9 +47,9 @@ public class TwitterDocumentClassifier implements DocumentClassifier { @Override public List<Class> detectClasses(Object document) { - Preconditions.checkNotNull(document); + Objects.requireNonNull(document); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT)); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TWITTER_FORMAT)); ObjectNode objectNode; try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java index b8ce79b..9ca354f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java @@ -23,16 +23,17 @@ import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.User; -import com.google.common.collect.Lists; import org.apache.commons.lang.NotImplementedException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity; public class TwitterJsonUserActivityConverter implements ActivityConverter<User> { - public static Class requiredClass = User.class; + private static Class requiredClass = User.class; @Override public Class requiredClass() { @@ -67,12 +68,12 @@ public class TwitterJsonUserActivityConverter implements ActivityConverter<User> Activity activity = new Activity(); updateActivity(user, activity); - return Lists.newArrayList(activity); + return Collections.singletonList(activity); } @Override public List<Activity> toActivityList(List<User> serializedList) { - List<Activity> result = Lists.newArrayList(); + List<Activity> result = new ArrayList<>(); for ( User item : serializedList ) { try { List<Activity> activities = toActivityList(item); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java index e0e2e80..3344d05 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java @@ -32,15 +32,14 @@ import org.apache.streams.twitter.pojo.Entities; import org.apache.streams.twitter.pojo.Hashtag; import org.apache.streams.twitter.pojo.Place; import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.TargetObject; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import org.apache.streams.twitter.pojo.UserMentions; -import org.apache.streams.twitter.provider.TwitterErrorHandler; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -50,6 +49,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.math.DoubleMath.mean; @@ -71,13 +71,11 @@ public class TwitterActivityUtil { public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException { activity.setActor(buildActor(tweet)); activity.setId(formatId(activity.getVerb(), - Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull())); + Optional.ofNullable(Optional.ofNullable(tweet.getIdStr()) + .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null))); if (tweet instanceof Retweet) { - updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share"); + updateActivityContent(activity, (tweet).getRetweetedStatus(), "share"); } else { updateActivityContent(activity, tweet, "post"); } @@ -127,7 +125,7 @@ public class TwitterActivityUtil { } /** - * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet + * Builds the activity {@link ActivityObject} actor from the tweet * @param tweet the object to use as the source * @return a valid Actor populated from the Tweet */ @@ -139,17 +137,15 @@ public class TwitterActivityUtil { } /** - * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User + * Builds the activity {@link ActivityObject} actor from the User * @param user the object to use as the source * @return a valid Actor populated from the Tweet */ public static ActivityObject buildActor(User user) { ActivityObject actor = new ActivityObject(); actor.setId(formatId( - Optional.fromNullable( - user.getIdStr()) - .or(Optional.of(user.getId().toString())) - .orNull() + Optional.ofNullable(Optional.ofNullable(user.getIdStr()) + .orElseGet(Optional.of(user.getId().toString())::get)).orElse(null) )); actor.setObjectType("page"); actor.setDisplayName(user.getName()); @@ -189,16 +185,14 @@ public class TwitterActivityUtil { } /** - * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet + * Creates an {@link ActivityObject} for the tweet * @param tweet the object to use as the source * @return a valid ActivityObject */ public static ActivityObject buildActivityObject(Tweet tweet) { ActivityObject actObj = new ActivityObject(); - String id = Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull(); + String id = Optional.ofNullable(Optional.ofNullable(tweet.getIdStr()) + .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null); if ( id != null ) { actObj.setId(id); } @@ -261,7 +255,7 @@ public class TwitterActivityUtil { } /** - * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet. + * Builds the {@link TargetObject} from the tweet. * @param tweet the object to use as the source * @return currently returns null for all activities */ @@ -278,17 +272,15 @@ public class TwitterActivityUtil { Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); Map<String, Object> location = new HashMap<>(); location.put("id", formatId( - Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull() + Optional.ofNullable(Optional.ofNullable(tweet.getIdStr()) + .orElseGet(Optional.of(tweet.getId().toString())::get)).orElse(null) )); location.put("coordinates", boundingBoxCenter(tweet.getPlace())); extensions.put("location", location); } /** - * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object + * Gets the common twitter {@link Provider} object * @return a provider object representing Twitter */ public static Provider getProvider() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java index ee800fa..764b3ee 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java @@ -33,6 +33,8 @@ import twitter4j.Twitter; import twitter4j.TwitterException; import twitter4j.TwitterObjectFactory; +import java.util.Objects; + /** * Retrieve friend or follower connections for a single user id. */ @@ -43,11 +45,11 @@ public class TwitterFollowingProviderTask implements Runnable { private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); protected TwitterFollowingProvider provider; - protected Twitter client; + private Twitter client; protected Long id; - protected String screenName; + private String screenName; - int count = 0; + private int count = 0; /** * TwitterFollowingProviderTask constructor. @@ -81,7 +83,7 @@ public class TwitterFollowingProviderTask implements Runnable { if ( id != null ) { getFollowing(id); - } else if ( screenName != null) { + } else { getFollowing(screenName); } @@ -89,7 +91,7 @@ public class TwitterFollowingProviderTask implements Runnable { } - protected void getFollowing(Long id) { + private void getFollowing(Long id) { Preconditions.checkArgument( provider.getConfig().getEndpoint().equals("friends") @@ -103,7 +105,7 @@ public class TwitterFollowingProviderTask implements Runnable { } } - protected void getFollowing(String screenName) { + private void getFollowing(String screenName) { twitter4j.User user = null; try { @@ -111,7 +113,7 @@ public class TwitterFollowingProviderTask implements Runnable { } catch (TwitterException ex) { LOGGER.error("Failure looking up " + id); } - Preconditions.checkNotNull(user); + Objects.requireNonNull(user); getFollowing(user.getId()); } @@ -134,12 +136,12 @@ public class TwitterFollowingProviderTask implements Runnable { PagableResponseList<twitter4j.User> list = null; if ( provider.getConfig().getEndpoint().equals("followers") ) { - list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + list = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getMaxItems().intValue()); } else if ( provider.getConfig().getEndpoint().equals("friends") ) { - list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + list = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getMaxItems().intValue()); } - Preconditions.checkNotNull(list); + Objects.requireNonNull(list); Preconditions.checkArgument(list.size() > 0); for (twitter4j.User other : list) { @@ -176,8 +178,6 @@ public class TwitterFollowingProviderTask implements Runnable { break; } curser = list.getNextCursor(); - } catch (TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } catch (Exception ex) { keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); } @@ -194,12 +194,12 @@ public class TwitterFollowingProviderTask implements Runnable { try { twitter4j.IDs ids = null; if ( provider.getConfig().getEndpoint().equals("followers") ) { - ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue()); } else if ( provider.getConfig().getEndpoint().equals("friends") ) { - ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue()); } - Preconditions.checkNotNull(ids); + Objects.requireNonNull(ids); Preconditions.checkArgument(ids.getIDs().length > 0); for (long otherId : ids.getIDs()) { @@ -216,7 +216,7 @@ public class TwitterFollowingProviderTask implements Runnable { .withFollower(new User().withId(id)); } - Preconditions.checkNotNull(follow); + Objects.requireNonNull(follow); if ( count < provider.getConfig().getMaxItems()) { ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java index a4562ef..2be77ac 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java @@ -24,7 +24,6 @@ import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import com.twitter.hbc.core.processor.StringDelimitedProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,9 @@ public class TwitterStreamHelper extends StringDelimitedProcessor { Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0); Object document = mapper.readValue(item, itemClass); StreamsDatum rawDatum = new StreamsDatum(document); - return Lists.newArrayList(rawDatum); + List<StreamsDatum> streamsDatumList = new ArrayList<>(); + streamsDatumList.add(rawDatum); + return streamsDatumList; } return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 1895ee2..a37ec4d 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -35,8 +35,6 @@ import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import com.google.common.util.concurrent.Uninterruptibles; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Constants; @@ -67,25 +65,24 @@ import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Iterator; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** - * TwitterStreamProvider wraps a hosebird client and passes recieved documents + * TwitterStreamProvider wraps a hosebird client and passes received documents * to subscribing components. */ public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable { - public static final String STREAMS_ID = "TwitterStreamProvider"; + private static final String STREAMS_ID = "TwitterStreamProvider"; private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class); @@ -127,9 +124,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter"); TwitterStreamProvider provider = new TwitterStreamProvider(config); - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT)); - PrintStream outStream = null; + PrintStream outStream; try { outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); } catch (FileNotFoundException ex) { @@ -140,9 +137,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat provider.startStream(); do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); + for (StreamsDatum datum : provider.readCurrent()) { String json; try { json = mapper.writeValueAsString(datum.getDocument()); @@ -157,7 +152,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat outStream.flush(); } - public static final int MAX_BATCH = 1000; + private static final int MAX_BATCH = 1000; private TwitterStreamConfiguration config; @@ -169,13 +164,12 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat this.config = config; } - protected volatile Queue<Future<List<StreamsDatum>>> providerQueue; + private volatile Queue<Future<List<StreamsDatum>>> providerQueue; - protected Hosts hosebirdHosts; - protected Authentication auth; + private Authentication auth; protected StreamingEndpoint endpoint; - protected BasicClient client; - protected AtomicBoolean running = new AtomicBoolean(false); + private BasicClient client; + private AtomicBoolean running = new AtomicBoolean(false); protected TwitterStreamHelper processor = new TwitterStreamHelper(this); private DatumStatusCounter countersCurrent = new DatumStatusCounter(); private DatumStatusCounter countersTotal = new DatumStatusCounter(); @@ -204,7 +198,7 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat StreamsResultSet current; synchronized (this) { - Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque(); + Queue<StreamsDatum> drain = new LinkedBlockingDeque<>(); drainTo(drain); current = new StreamsResultSet(drain); current.setCounter(new DatumStatusCounter()); @@ -234,8 +228,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat @Override public void prepare(Object configurationObject) { - Preconditions.checkNotNull(config.getEndpoint()); + Objects.requireNonNull(config.getEndpoint()); + Hosts hosebirdHosts; if (config.getEndpoint().equals("userstream") ) { hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST); @@ -276,8 +271,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat if ( config.getBasicauth() != null ) { - Preconditions.checkNotNull(config.getBasicauth().getUsername()); - Preconditions.checkNotNull(config.getBasicauth().getPassword()); + Objects.requireNonNull(config.getBasicauth().getUsername()); + Objects.requireNonNull(config.getBasicauth().getPassword()); auth = new BasicAuth( config.getBasicauth().getUsername(), @@ -286,10 +281,10 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat } else if ( config.getOauth() != null ) { - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); + Objects.requireNonNull(config.getOauth().getConsumerKey()); + Objects.requireNonNull(config.getOauth().getConsumerSecret()); + Objects.requireNonNull(config.getOauth().getAccessToken()); + Objects.requireNonNull(config.getOauth().getAccessTokenSecret()); auth = new OAuth1(config.getOauth().getConsumerKey(), config.getOauth().getConsumerSecret(), http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 3210f80..214d204 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -35,7 +35,6 @@ import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; @@ -47,7 +46,6 @@ import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.Twitter; -import twitter4j.TwitterException; import twitter4j.TwitterFactory; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory; @@ -59,8 +57,10 @@ import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -78,9 +78,9 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; */ public class TwitterUserInformationProvider implements StreamsProvider, Serializable { - public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final String STREAMS_ID = "TwitterUserInformationProvider"; - private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + private static ObjectMapper MAPPER = new StreamsJacksonMapper(Collections.singletonList(TwitterDateTimeFormat.TWITTER_FORMAT)); private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); @@ -133,9 +133,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ provider.startStream(); do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); + for (StreamsDatum datum : provider.readCurrent()) { String json; try { json = MAPPER.writeValueAsString(datum.getDocument()); @@ -176,7 +174,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { return new ThreadPoolExecutor(numThreads, numThreads, 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + new ArrayBlockingQueue<>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } /** @@ -207,13 +205,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ config = (TwitterUserInformationConfiguration) configurationObject; } - Preconditions.checkNotNull(config); - Preconditions.checkNotNull(config.getOauth()); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getInfo()); + Objects.requireNonNull(config); + Objects.requireNonNull(config.getOauth()); + Objects.requireNonNull(config.getOauth().getConsumerKey()); + Objects.requireNonNull(config.getOauth().getConsumerSecret()); + Objects.requireNonNull(config.getOauth().getAccessToken()); + Objects.requireNonNull(config.getOauth().getAccessTokenSecret()); + Objects.requireNonNull(config.getInfo()); try { lock.writeLock().lock(); @@ -222,13 +220,13 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ lock.writeLock().unlock(); } - Preconditions.checkNotNull(providerQueue); + Objects.requireNonNull(providerQueue); - List<String> screenNames = new ArrayList<String>(); - List<String[]> screenNameBatches = new ArrayList<String[]>(); + List<String> screenNames = new ArrayList<>(); + List<String[]> screenNameBatches = new ArrayList<>(); - List<Long> ids = new ArrayList<Long>(); - List<Long[]> idsBatches = new ArrayList<Long[]>(); + List<Long> ids = new ArrayList<>(); + List<Long[]> idsBatches = new ArrayList<>(); for (String s : config.getInfo()) { if (s != null) { @@ -248,14 +246,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ // add the batch idsBatches.add(ids.toArray(new Long[ids.size()])); // reset the Ids - ids = new ArrayList<Long>(); + ids = new ArrayList<>(); } if (screenNames.size() >= 100) { // add the batch screenNameBatches.add(screenNames.toArray(new String[ids.size()])); // reset the Ids - screenNames = new ArrayList<String>(); + screenNames = new ArrayList<>(); } } } @@ -275,7 +273,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); } - Preconditions.checkNotNull(executor); + Objects.requireNonNull(executor); this.idsBatches = idsBatches.iterator(); this.screenNameBatches = screenNameBatches.iterator(); @@ -284,7 +282,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ @Override public void startStream() { - Preconditions.checkNotNull(executor); + Objects.requireNonNull(executor); Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); @@ -327,8 +325,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } } keepTrying = 10; - } catch (TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } catch (Exception ex) { keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); } @@ -353,8 +349,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } } keepTrying = 10; - } catch (TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } catch (Exception ex) { keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); } @@ -383,7 +377,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); + return new LinkedBlockingQueue<>(); } public StreamsResultSet readNew(BigInteger sequence) { @@ -397,8 +391,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ this.start = start; this.end = end; readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; + return (StreamsResultSet)providerQueue.iterator(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java index 6e269e5..d8967d1 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/processor/YoutubeTypeConverter.java @@ -26,7 +26,6 @@ import org.apache.streams.pojo.json.Activity; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.Video; -import com.google.common.collect.Lists; import com.youtube.serializer.YoutubeActivityUtil; import com.youtube.serializer.YoutubeChannelDeserializer; import com.youtube.serializer.YoutubeEventClassifier; @@ -35,6 +34,7 @@ import org.apache.commons.lang.NotImplementedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -47,7 +47,6 @@ public class YoutubeTypeConverter implements StreamsProcessor { private StreamsJacksonMapper mapper; private Queue<Video> inQueue; private Queue<StreamsDatum> outQueue; - private YoutubeActivityUtil youtubeActivityUtil; private int count = 0; public YoutubeTypeConverter() {} @@ -65,7 +64,7 @@ public class YoutubeTypeConverter implements StreamsProcessor { Object item = streamsDatum.getDocument(); LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - Activity activity = null; + Activity activity; if (item instanceof String) { item = deserializeItem(item); @@ -73,10 +72,10 @@ public class YoutubeTypeConverter implements StreamsProcessor { if (item instanceof Video) { activity = new Activity(); - youtubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); + YoutubeActivityUtil.updateActivity((Video)item, activity, streamsDatum.getId()); } else if (item instanceof Channel) { activity = new Activity(); - this.youtubeActivityUtil.updateActivity((Channel)item, activity, null); + YoutubeActivityUtil.updateActivity((Channel)item, activity, null); } else { throw new NotImplementedException("Type conversion not implement for type : " + item.getClass().getName()); } @@ -90,9 +89,11 @@ public class YoutubeTypeConverter implements StreamsProcessor { } if ( result != null ) { - return Lists.newArrayList(result); + List<StreamsDatum> streamsDatumList = new ArrayList<>(); + streamsDatumList.add(result); + return streamsDatumList; } else { - return Lists.newArrayList(); + return new ArrayList<>(); } } @@ -113,7 +114,6 @@ public class YoutubeTypeConverter implements StreamsProcessor { @Override public void prepare(Object configurationObject) { - youtubeActivityUtil = new YoutubeActivityUtil(); mapper = StreamsJacksonMapper.getInstance(); SimpleModule simpleModule = new SimpleModule(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java index 1442f8b..401b836 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/provider/YoutubeProvider.java @@ -38,8 +38,6 @@ import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.client.repackaged.com.google.common.base.Strings; import com.google.api.services.youtube.YouTube; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -54,9 +52,11 @@ import java.io.IOException; import java.math.BigInteger; import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -65,24 +65,24 @@ import java.util.concurrent.atomic.AtomicBoolean; public abstract class YoutubeProvider implements StreamsProvider { - public static final String STREAMS_ID = "YoutubeProvider"; + private static final String STREAMS_ID = "YoutubeProvider"; private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeProvider.class); private static final int MAX_BATCH_SIZE = 1000; // This OAuth 2.0 access scope allows for full read/write access to the // authenticated user's account. - private List<String> scopes = Lists.newArrayList("https://www.googleapis.com/auth/youtube"); + private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/youtube"); /** * Define a global instance of the HTTP transport. */ - public static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); + private static final HttpTransport HTTP_TRANSPORT = new NetHttpTransport(); /** * Define a global instance of the JSON factory. */ - public static final JsonFactory JSON_FACTORY = new JacksonFactory(); + private static final JsonFactory JSON_FACTORY = new JacksonFactory(); private static final int DEFAULT_THREAD_POOL_SIZE = 5; @@ -104,7 +104,7 @@ public abstract class YoutubeProvider implements StreamsProvider { this.config = new ComponentConfigurator<>(YoutubeConfiguration.class) .detectConfiguration(StreamsConfigurator.getConfig().getConfig("youtube")); - Preconditions.checkNotNull(this.config.getApiKey()); + Objects.requireNonNull(this.config.getApiKey()); } /** @@ -114,7 +114,7 @@ public abstract class YoutubeProvider implements StreamsProvider { public YoutubeProvider(YoutubeConfiguration config) { this.config = config; - Preconditions.checkNotNull(this.config.getApiKey()); + Objects.requireNonNull(this.config.getApiKey()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java index 4754353..0ca161f 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeActivityUtil.java @@ -26,13 +26,12 @@ import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Image; import org.apache.streams.pojo.json.Provider; -import com.google.api.client.util.Maps; +import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.Thumbnail; import com.google.api.services.youtube.model.ThumbnailDetails; import com.google.api.services.youtube.model.Video; import com.google.common.base.Joiner; -import com.google.common.base.Optional; import com.google.common.collect.Lists; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -40,14 +39,15 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class YoutubeActivityUtil { private static final Logger LOGGER = LoggerFactory.getLogger(YoutubeActivityUtil.class); /** - * Given a {@link com.google.api.services.youtube.YouTube.Videos} object and an - * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details + * Given a {@link YouTube.Videos} object and an + * {@link Activity} object, fill out the appropriate details * * @param video Video * @param activity Activity @@ -57,10 +57,7 @@ public class YoutubeActivityUtil { activity.setActor(buildActor(video, video.getSnippet().getChannelId())); activity.setVerb("post"); - activity.setId(formatId(activity.getVerb(), - Optional.fromNullable( - video.getId()) - .orNull())); + activity.setId(formatId(activity.getVerb(), Optional.ofNullable(video.getId()).orElse(null))); activity.setPublished(new DateTime(video.getSnippet().getPublishedAt().getValue())); activity.setTitle(video.getSnippet().getTitle()); @@ -76,8 +73,8 @@ public class YoutubeActivityUtil { /** - * Given a {@link com.google.api.services.youtube.model.Channel} object and an - * {@link org.apache.streams.pojo.json.Activity} object, fill out the appropriate details + * Given a {@link Channel} object and an + * {@link Activity} object, fill out the appropriate details * * @param channel Channel * @param activity Activity @@ -88,7 +85,7 @@ public class YoutubeActivityUtil { activity.setProvider(getProvider()); activity.setVerb("post"); activity.setActor(createActorForChannel(channel)); - Map<String, Object> extensions = Maps.newHashMap(); + Map<String, Object> extensions = new HashMap<>(); extensions.put("youtube", channel); activity.setAdditionalProperty("extensions", extensions); } catch (Throwable throwable) { @@ -111,7 +108,7 @@ public class YoutubeActivityUtil { image.setUrl(channel.getSnippet().getThumbnails().getHigh().getUrl()); actor.setImage(image); actor.setUrl("https://youtube.com/user/" + channel.getId()); - Map<String, Object> actorExtensions = Maps.newHashMap(); + Map<String, Object> actorExtensions = new HashMap<>(); actorExtensions.put("followers", channel.getStatistics().getSubscriberCount()); actorExtensions.put("posts", channel.getStatistics().getVideoCount()); actor.setAdditionalProperty("extensions", actorExtensions); @@ -163,7 +160,7 @@ public class YoutubeActivityUtil { } /** - * Build an {@link org.apache.streams.pojo.json.ActivityObject} actor given the video object + * Build an {@link ActivityObject} actor given the video object * @param video Video * @param id id * @return Actor object @@ -180,7 +177,7 @@ public class YoutubeActivityUtil { } /** - * Gets the common youtube {@link org.apache.streams.pojo.json.Provider} object + * Gets the common youtube {@link Provider} object * @return a provider object representing YouTube */ public static Provider getProvider() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java index ea9a49d..e28b4a1 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeChannelDeserializer.java @@ -20,7 +20,6 @@ package com.youtube.serializer; import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonNode; @@ -33,10 +32,9 @@ import com.google.api.services.youtube.model.ChannelStatistics; import com.google.api.services.youtube.model.ChannelTopicDetails; import com.google.api.services.youtube.model.Thumbnail; import com.google.api.services.youtube.model.ThumbnailDetails; -import com.google.common.collect.Lists; import java.io.IOException; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; /** @@ -45,7 +43,7 @@ import java.util.List; public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> { @Override - public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { + public Channel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { JsonNode node = jp.getCodec().readTree(jp); try { Channel channel = new Channel(); @@ -144,10 +142,9 @@ public class YoutubeChannelDeserializer extends JsonDeserializer<Channel> { if (node == null) { return details; } - List<String> topicIds = Lists.newLinkedList(); - Iterator<JsonNode> it = node.get("topicIds").iterator(); - while (it.hasNext()) { - topicIds.add(it.next().asText()); + List<String> topicIds = new LinkedList<>(); + for (JsonNode jsonNode : node.get("topicIds")) { + topicIds.add(jsonNode.asText()); } details.setTopicIds(topicIds); return details; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java index 65a454c..e7645bd 100644 --- a/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java +++ b/streams-contrib/streams-provider-youtube/src/main/java/com/youtube/serializer/YoutubeEventClassifier.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import java.io.IOException; +import java.util.Objects; public class YoutubeEventClassifier { private static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); @@ -39,7 +40,7 @@ public class YoutubeEventClassifier { * @return Class */ public static Class detectClass(String json) { - Preconditions.checkNotNull(json); + Objects.requireNonNull(json); Preconditions.checkArgument(StringUtils.isNotEmpty(json)); ObjectNode objectNode; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java index 4751f00..0ae4822 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeChannelDataCollectorTest.java @@ -27,13 +27,13 @@ import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrate import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.Channel; import com.google.api.services.youtube.model.ChannelListResponse; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import org.apache.youtube.pojo.YoutubeConfiguration; import org.junit.Test; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -51,7 +51,7 @@ public class YoutubeChannelDataCollectorTest { @Test public void testDataCollector() throws Exception { YouTube youTube = createMockYoutube(); - BlockingQueue<StreamsDatum> queue = Queues.newLinkedBlockingQueue(); + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); BackOffStrategy strategy = new LinearTimeBackOffStrategy(1); UserInfo userInfo = new UserInfo(); userInfo.setUserId(ID); @@ -91,7 +91,7 @@ public class YoutubeChannelDataCollectorTest { private ChannelListResponse createMockResponse() { ChannelListResponse response = new ChannelListResponse(); - List<Channel> channelList = Lists.newLinkedList(); + List<Channel> channelList = new LinkedList<>(); response.setItems(channelList); Channel channel = new Channel(); channel.setId(ID); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java index 5a2af8a..54c0375 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java @@ -23,15 +23,15 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; -import com.google.api.client.util.Maps; -import com.google.api.client.util.Sets; import com.google.api.services.youtube.YouTube; -import com.google.common.collect.Lists; import org.apache.youtube.pojo.YoutubeConfiguration; import org.joda.time.DateTime; import org.junit.Test; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; @@ -54,7 +54,7 @@ public class YoutubeProviderTest { public void testDataCollectorRunsPerUser() { Random random = new Random(System.currentTimeMillis()); int numUsers = random.nextInt(1000); - List<UserInfo> userList = Lists.newLinkedList(); + List<UserInfo> userList = new LinkedList<>(); for ( int i = 0; i < numUsers; ++i ) { userList.add(new UserInfo()); @@ -107,7 +107,7 @@ public class YoutubeProviderTest { provider.setDefaultAfterDate(afterDate); provider.setDefaultBeforeDate(beforeDate); - Set<String> users = Sets.newHashSet(); + Set<String> users = new HashSet<>(); users.add("test_user_1"); users.add("test_user_2"); users.add("test_user_3"); @@ -128,7 +128,7 @@ public class YoutubeProviderTest { config.setApiKey("API_KEY"); YoutubeProvider provider = buildProvider(config); - Map<String, DateTime> users = Maps.newHashMap(); + Map<String, DateTime> users = new HashMap<>(); users.put("user1", new DateTime(System.currentTimeMillis())); users.put("user3", new DateTime(System.currentTimeMillis())); users.put("user4", new DateTime(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java index 1870c14..d6a540e 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeUserActivityCollectorTest.java @@ -24,7 +24,6 @@ import org.apache.streams.google.gplus.configuration.UserInfo; import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; -import com.google.api.client.util.Lists; import com.google.api.services.youtube.YouTube; import com.google.api.services.youtube.model.Activity; import com.google.api.services.youtube.model.ActivityContentDetails; @@ -41,6 +40,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -170,7 +171,7 @@ public class YoutubeUserActivityCollectorTest { private ActivityListResponse buildActivityListResponse(int num) { ActivityListResponse activityListResponse = new ActivityListResponse(); - List<Activity> items = Lists.newArrayList(); + List<Activity> items = new ArrayList<>(); for ( int x = 0; x < num; x++ ) { Activity activity = new Activity(); @@ -193,21 +194,15 @@ public class YoutubeUserActivityCollectorTest { private YouTube buildYouTube(int numBeforeRange, int numAfterRange, int numInRange, DateTime afterDate, DateTime beforeDate) { - YouTube youtube = createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate); + return createYoutubeMock(numBeforeRange, numAfterRange, numInRange, afterDate, beforeDate); - return youtube; } private YouTube createYoutubeMock(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before) { YouTube youtube = mock(YouTube.class); final YouTube.Videos videos = createMockVideos(numBefore, numAfter, numInRange, after, before); - doAnswer(new Answer() { - @Override - public YouTube.Videos answer(InvocationOnMock invocationOnMock) throws Throwable { - return videos; - } - }).when(youtube).videos(); + doAnswer(invocationOnMock -> videos).when(youtube).videos(); return youtube; } @@ -245,7 +240,7 @@ public class YoutubeUserActivityCollectorTest { private static VideoListResponse createMockVideoListResponse(int numBefore, int numAfter, int numInRange, DateTime after, DateTime before, boolean page) { VideoListResponse feed = new VideoListResponse(); - List<Video> list = com.google.common.collect.Lists.newLinkedList(); + List<Video> list = new LinkedList<>(); for (int i = 0; i < numAfter; ++i) { com.google.api.client.util.DateTime published = new com.google.api.client.util.DateTime(after.getMillis() + 1000000); @@ -255,7 +250,7 @@ public class YoutubeUserActivityCollectorTest { list.add(video); } for (int i = 0; i < numInRange; ++i) { - DateTime published = null; + DateTime published; if ((before == null && after == null) || before == null) { published = DateTime.now(); // no date range or end time date range so just make the time now. } else if (after == null) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java index 1052647..0920829 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java @@ -40,9 +40,9 @@ import java.math.BigInteger; */ public interface StreamBuilder extends Serializable { - public StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration); + StreamBuilder setStreamsConfiguration(StreamsConfiguration configuration); - public StreamsConfiguration getStreamsConfiguration(); + StreamsConfiguration getStreamsConfiguration(); /** * Add a {@link org.apache.streams.core.StreamsProcessor} to the data processing stream. @@ -53,7 +53,7 @@ public interface StreamBuilder extends Serializable { * receive data from. * @return this */ - public StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds); + StreamBuilder addStreamsProcessor(String processorId, StreamsProcessor processor, int numTasks, String... connectToIds); /** * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data processing stream. @@ -64,7 +64,7 @@ public interface StreamBuilder extends Serializable { * receive data from. * @return this */ - public StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds); + StreamBuilder addStreamsPersistWriter(String persistWriterId, StreamsPersistWriter writer, int numTasks, String... connectToIds); /** * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute @@ -73,7 +73,7 @@ public interface StreamBuilder extends Serializable { * @param provider provider to execute * @return this */ - public StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider); + StreamBuilder newPerpetualStream(String streamId, StreamsProvider provider); /** * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute @@ -82,7 +82,7 @@ public interface StreamBuilder extends Serializable { * @param provider provider to execute * @return this */ - public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider); + StreamBuilder newReadCurrentStream(String streamId, StreamsProvider provider); /** * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute @@ -92,7 +92,7 @@ public interface StreamBuilder extends Serializable { * @param sequence sequence to pass to {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method * @return this */ - public StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence); + StreamBuilder newReadNewStream(String streamId, StreamsProvider provider, BigInteger sequence); /** * Add a {@link org.apache.streams.core.StreamsProvider} to the data processing stream. The provider will execute @@ -104,25 +104,17 @@ public interface StreamBuilder extends Serializable { * @param end end date * @return this */ - public StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end); + StreamBuilder newReadRangeStream(String streamId, StreamsProvider provider, DateTime start, DateTime end); /** * Builds the stream, and starts it or submits it based on implementation. */ - public void start(); + void start(); /** * Stops the streams processing. No guarantee on a smooth shutdown. Optional method, may not be implemented in * all cases. */ - public void stop(); - - - - - - - - + void stop(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java index 490f454..a86499b 100644 --- a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java +++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java @@ -28,19 +28,19 @@ public interface StreamsOperation extends Serializable { /** * Each operation must publish an identifier. */ - public String getId(); + String getId(); /** * This method will be called after initialization/serialization. Initialize any non-serializable objects here. * @param configurationObject Any object to help intialize the operation. ie. Map, JobContext, Properties, etc. The type * will be based on where the operation is being run (ie. hadoop, storm, locally, etc.) */ - public void prepare(Object configurationObject); + void prepare(Object configurationObject); /** * No guarantee that this method will ever be called. But upon shutdown of the stream, an attempt to call this method * will be made. * Use this method to terminate connections, etc. */ - public void cleanUp(); + void cleanUp(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java index a797ce5..5b85e68 100644 --- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java +++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java @@ -40,11 +40,11 @@ import org.apache.streams.pojo.json.ThroughputQueueBroadcast; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.Lists; import org.slf4j.Logger; import java.lang.management.ManagementFactory; import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -128,7 +128,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple LOGGER.info("BroadcastMonitorThread running"); while (keepRunning) { try { - List<String> messages = Lists.newArrayList(); + List<String> messages = new ArrayList<>(); Set<ObjectName> beans = server.queryNames(null, null); for (ObjectName name : beans) { @@ -185,13 +185,16 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport imple } if (broadcastUri != null) { - if (broadcastUri.getScheme().equals("http")) { - messagePersister = new BroadcastMessagePersister(broadcastUri.toString()); - } else if (broadcastUri.getScheme().equals("udp")) { - messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString()); - } else { - LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined."); - throw new RuntimeException(); + switch (broadcastUri.getScheme()) { + case "http": + messagePersister = new BroadcastMessagePersister(broadcastUri.toString()); + break; + case "udp": + messagePersister = new LogstashUdpMessagePersister(broadcastUri.toString()); + break; + default: + LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP protocol defined."); + throw new RuntimeException(); } } else { messagePersister = new Slf4jMessagePersister(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c64f8435/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java ---------------------------------------------------------------------- diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java index 3f9a4c1..c85ff90 100644 --- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java +++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersisterTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; +import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; @@ -37,7 +38,7 @@ public class LogstashUdpMessagePersisterTest { private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LogstashUdpMessagePersisterTest.class); - DatagramSocket socket = null; + private DatagramSocket socket = null; /** * setup. @@ -56,7 +57,7 @@ public class LogstashUdpMessagePersisterTest { public void testFailedPersist() { LogstashUdpMessagePersister persister = new LogstashUdpMessagePersister("udp://127.0.0.1:56789"); - List<String> messageArray = Lists.newArrayList(); + List<String> messageArray = new ArrayList<>(); for (int x = 0; x < 10; x ++) { messageArray.add("Fake_message #" + x); }
