Repository: incubator-streams Updated Branches: refs/heads/master c38eef6cf -> e68300936
userstream now writing to ES in real-time Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/522096e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/522096e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/522096e8 Branch: refs/heads/master Commit: 522096e8a914ea738a97a536ef19a1416453e5f0 Parents: f877c5f Author: Steve Blackmon <[email protected]> Authored: Mon Apr 14 10:25:27 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Mon Apr 14 10:25:27 2014 -0500 ---------------------------------------------------------------------- .../streams-persist-elasticsearch/pom.xml | 1 - .../ElasticsearchConfigurator.java | 7 ++ .../ElasticsearchPersistWriter.java | 25 ++++--- .../ElasticsearchWriterConfiguration.json | 10 +++ .../twitter/processor/TwitterTypeConverter.java | 2 +- .../twitter/provider/TwitterStreamProvider.java | 69 +++++++++----------- .../provider/TwitterTimelineProvider.java | 32 ++++----- .../local/tasks/StreamsProviderTask.java | 2 +- 8 files changed, 78 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml b/streams-contrib/streams-persist-elasticsearch/pom.xml index 07433ba..95f3357 100644 --- a/streams-contrib/streams-persist-elasticsearch/pom.xml +++ b/streams-contrib/streams-persist-elasticsearch/pom.xml @@ -62,7 +62,6 @@ <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> - <version>1.8</version> <executions> <execution> <id>add-source</id> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 20b5c08..e6bf3ca 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -52,9 +52,16 @@ public class ElasticsearchConfigurator { String index = elasticsearch.getString("index"); String type = elasticsearch.getString("type"); + if( elasticsearch.hasPath("bulk")) + elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk")); + + if( elasticsearch.hasPath("batchSize")) + elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize")); + elasticsearchWriterConfiguration.setIndex(index); elasticsearchWriterConfiguration.setType(type); + return elasticsearchWriterConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index ad89013..405d1b8 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -51,34 +51,27 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private BulkRequestBuilder bulkRequest; private OutputStreamWriter currentWriter = null; - private int batchSize = 50; - private int totalRecordsWritten = 0; - private boolean veryLargeBulk = false; // by default this setting is set to false + private long batchSize; + private boolean veryLargeBulk; // by default this setting is set to false private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l; private static final long WAITING_DOCS_LIMIT = 10000; public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD; + private volatile int currentItems = 0; private volatile int totalSent = 0; private volatile int totalSeconds = 0; private volatile int totalAttempted = 0; private volatile int totalOk = 0; private volatile int totalFailed = 0; private volatile int totalBatchCount = 0; + private volatile int totalRecordsWritten = 0; private volatile long totalSizeInBytes = 0; private volatile long batchSizeInBytes = 0; private volatile int batchItemsSent = 0; - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public void setVeryLargeBulk(boolean veryLargeBulk) { - this.veryLargeBulk = veryLargeBulk; - } - private final List<String> affectedIndexes = new ArrayList<String>(); public int getTotalOutstanding() { return this.totalSent - (this.totalFailed + this.totalOk); } @@ -99,7 +92,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab protected volatile Queue<StreamsDatum> persistQueue; - private ObjectMapper mapper = new StreamsJacksonMapper(); + private ObjectMapper mapper; private ElasticsearchWriterConfiguration config; @@ -245,6 +238,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab // reset the current batch statistics this.batchSizeInBytes = 0; this.batchItemsSent = 0; + this.currentItems = 0; try { @@ -323,6 +317,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double)(1024*1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis), MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double)(1024*1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); + } @Override @@ -409,11 +404,13 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private void trackItemAndBytesWritten(long sizeInBytes) { + currentItems++; batchItemsSent++; batchSizeInBytes += sizeInBytes; // If our queue is larger than our flush threashold, then we should flush the queue. - if(batchSizeInBytes > flushThresholdSizeInBytes) + if( (batchSizeInBytes > flushThresholdSizeInBytes) || + (currentItems >= batchSize) ) flushInternal(); } @@ -530,6 +527,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab @Override public void prepare(Object configurationObject) { mapper = StreamsJacksonMapper.getInstance(); + veryLargeBulk = this.config.getBulk(); + batchSize = this.config.getBatchSize(); start(); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json index 21aad5c..5e169b7 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json @@ -13,6 +13,16 @@ "type": { "type": "string", "description": "Type to write as" + }, + "bulk": { + "type": "boolean", + "description": "Index in large or small batches", + "default": "false" + }, + "batchSize": { + "type": "integer", + "description": "Item Count before flush", + "default": 100 } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java index 68820c9..5e88bdd 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -27,7 +27,7 @@ import java.util.Queue; */ public class TwitterTypeConverter implements StreamsProcessor { - private final static String STREAMS_ID = "TwitterTypeConverter"; + public final static String STREAMS_ID = "TwitterTypeConverter"; private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 3df7d02..ba88803 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -1,10 +1,7 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -19,10 +16,9 @@ import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.BasicAuth; import com.twitter.hbc.httpclient.auth.OAuth1; import com.typesafe.config.Config; +import org.apache.commons.lang.NotImplementedException; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.core.*; import org.apache.streams.twitter.TwitterStreamConfiguration; import org.apache.streams.twitter.processor.TwitterEventProcessor; import org.joda.time.DateTime; @@ -31,7 +27,6 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.math.BigInteger; -import java.util.Collection; import java.util.List; import java.util.Queue; import java.util.concurrent.*; @@ -39,7 +34,7 @@ import java.util.concurrent.*; /** * Created by sblackmon on 12/10/13. */ -public class TwitterStreamProvider implements StreamsProvider, Serializable { +public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable { public final static String STREAMS_ID = "TwitterStreamProvider"; @@ -47,8 +42,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { private TwitterStreamConfiguration config; - private Class klass; - public TwitterStreamConfiguration getConfig() { return config; } @@ -57,9 +50,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { this.config = config; } - protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000); + protected BlockingQueue hosebirdQueue = new LinkedBlockingQueue<String>(1000); - protected volatile Queue<StreamsDatum> providerQueue; + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(1000); protected Hosts hosebirdHosts; protected Authentication auth; @@ -68,6 +61,9 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { protected ListeningExecutorService executor; + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, @@ -83,50 +79,43 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { this.config = config; } - public TwitterStreamProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("twitter"); - this.config = TwitterStreamConfigurator.detectConfiguration(config); - this.klass = klass; - providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - } - - public TwitterStreamProvider(TwitterStreamConfiguration config, Class klass) { - this.config = config; - this.klass = klass; - providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - - } - @Override public void startStream() { for (int i = 0; i < 5; i++) { - executor.submit(new TwitterEventProcessor(inQueue, providerQueue, klass)); + executor.submit(new TwitterEventProcessor(hosebirdQueue, providerQueue, String.class)); } new Thread(new TwitterStreamProviderTask(this)).start(); + } @Override public synchronized StreamsResultSet readCurrent() { - Collection<StreamsDatum> currentIterator = Lists.newArrayList(); - Iterators.addAll(currentIterator, providerQueue.iterator()); - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); + StreamsResultSet current; - providerQueue.clear(); + synchronized( TwitterStreamProvider.class ) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); + } return current; } @Override public StreamsResultSet readNew(BigInteger sequence) { - return null; + throw new NotImplementedException(); } @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return null; + public StreamsResultSet readRange(DateTime start, DateTime end) + { + throw new NotImplementedException(); } @Override @@ -134,8 +123,6 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(config.getEndpoint()); if(config.getEndpoint().equals("userstream") ) { @@ -212,7 +199,8 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { .hosts(hosebirdHosts) .endpoint(endpoint) .authentication(auth) - .processor(new StringDelimitedProcessor(inQueue)) + .connectionTimeout(1200000) + .processor(new StringDelimitedProcessor(hosebirdQueue)) .build(); } @@ -220,7 +208,12 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable { @Override public void cleanUp() { for (int i = 0; i < 5; i++) { - inQueue.add(TwitterEventProcessor.TERMINATE); + hosebirdQueue.add(TwitterEventProcessor.TERMINATE); } } + + @Override + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b9551ad..20ee951 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -2,10 +2,6 @@ package org.apache.streams.twitter.provider; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -25,7 +21,9 @@ import twitter4j.json.DataObjectFactory; import java.io.Serializable; import java.math.BigInteger; -import java.util.*; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; import java.util.concurrent.*; /** @@ -120,7 +118,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { @Override public void startStream() { - // no op + LOGGER.debug("{} startStream", STREAMS_ID); + throw new org.apache.commons.lang.NotImplementedException(); } private void captureTimeline(long currentId) { @@ -182,15 +181,20 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { } public StreamsResultSet readCurrent() { + LOGGER.debug("{} readCurrent", STREAMS_ID); Preconditions.checkArgument(ids.hasNext()); - LOGGER.info("readCurrent"); + StreamsResultSet current; + + synchronized( TwitterTimelineProvider.class ) { + + while( ids.hasNext() ) { + Long currentId = ids.next(); + LOGGER.info("Provider Task Starting: {}", currentId); + captureTimeline(currentId); + } - while( ids.hasNext() ) { - Long currentId = ids.next(); - LOGGER.info("Provider Task Starting: {}", currentId); - captureTimeline(currentId); } LOGGER.info("Finished. Cleaning up..."); @@ -212,11 +216,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { public StreamsResultSet readRange(DateTime start, DateTime end) { LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; + throw new NotImplementedException(); } void shutdownAndAwaitTermination(ExecutorService pool) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index 5cf515c..101b47c 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -32,7 +32,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC private static final int START = 0; private static final int END = 1; - private static final int DEFAULT_TIMEOUT_MS = 30000; + private static final int DEFAULT_TIMEOUT_MS = 1000000; private StreamsProvider provider; private AtomicBoolean keepRunning;
