Repository: incubator-streams
Updated Branches:
  refs/heads/master c38eef6cf -> e68300936


userstream now writing to ES in real-time


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/522096e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/522096e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/522096e8

Branch: refs/heads/master
Commit: 522096e8a914ea738a97a536ef19a1416453e5f0
Parents: f877c5f
Author: Steve Blackmon <[email protected]>
Authored: Mon Apr 14 10:25:27 2014 -0500
Committer: Steve Blackmon <[email protected]>
Committed: Mon Apr 14 10:25:27 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-elasticsearch/pom.xml       |  1 -
 .../ElasticsearchConfigurator.java              |  7 ++
 .../ElasticsearchPersistWriter.java             | 25 ++++---
 .../ElasticsearchWriterConfiguration.json       | 10 +++
 .../twitter/processor/TwitterTypeConverter.java |  2 +-
 .../twitter/provider/TwitterStreamProvider.java | 69 +++++++++-----------
 .../provider/TwitterTimelineProvider.java       | 32 ++++-----
 .../local/tasks/StreamsProviderTask.java        |  2 +-
 8 files changed, 78 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/pom.xml 
b/streams-contrib/streams-persist-elasticsearch/pom.xml
index 07433ba..95f3357 100644
--- a/streams-contrib/streams-persist-elasticsearch/pom.xml
+++ b/streams-contrib/streams-persist-elasticsearch/pom.xml
@@ -62,7 +62,6 @@
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>build-helper-maven-plugin</artifactId>
-                <version>1.8</version>
                 <executions>
                     <execution>
                         <id>add-source</id>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 20b5c08..e6bf3ca 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -52,9 +52,16 @@ public class ElasticsearchConfigurator {
         String index = elasticsearch.getString("index");
         String type = elasticsearch.getString("type");
 
+        if( elasticsearch.hasPath("bulk"))
+            
elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
+
+        if( elasticsearch.hasPath("batchSize"))
+            
elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
+
         elasticsearchWriterConfiguration.setIndex(index);
         elasticsearchWriterConfiguration.setType(type);
 
+
         return elasticsearchWriterConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ad89013..405d1b8 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -51,34 +51,27 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     private BulkRequestBuilder bulkRequest;
     private OutputStreamWriter currentWriter = null;
 
-    private int batchSize = 50;
-    private int totalRecordsWritten = 0;
-    private boolean veryLargeBulk = false;  // by default this setting is set 
to false
+    private long batchSize;
+    private boolean veryLargeBulk;  // by default this setting is set to false
 
     private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 
1024l;
     private static final long WAITING_DOCS_LIMIT = 10000;
 
     public volatile long flushThresholdSizeInBytes = 
DEFAULT_BULK_FLUSH_THRESHOLD;
 
+    private volatile int currentItems = 0;
     private volatile int totalSent = 0;
     private volatile int totalSeconds = 0;
     private volatile int totalAttempted = 0;
     private volatile int totalOk = 0;
     private volatile int totalFailed = 0;
     private volatile int totalBatchCount = 0;
+    private volatile int totalRecordsWritten = 0;
     private volatile long totalSizeInBytes = 0;
 
     private volatile long batchSizeInBytes = 0;
     private volatile int batchItemsSent = 0;
 
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
-
     private final List<String> affectedIndexes = new ArrayList<String>();
 
     public int getTotalOutstanding()                           { return 
this.totalSent - (this.totalFailed + this.totalOk); }
@@ -99,7 +92,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new StreamsJacksonMapper();
+    private ObjectMapper mapper;
 
     private ElasticsearchWriterConfiguration config;
 
@@ -245,6 +238,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
             // reset the current batch statistics
             this.batchSizeInBytes = 0;
             this.batchItemsSent = 0;
+            this.currentItems = 0;
 
             try
             {
@@ -323,6 +317,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
                 LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - 
Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
                         MEGABYTE_FORMAT.format((double) thisSizeInBytes / 
(double)(1024*1024)), NUMBER_FORMAT.format(thisOk), 
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
                         MEGABYTE_FORMAT.format((double) totalSizeInBytes / 
(double)(1024*1024)), NUMBER_FORMAT.format(totalOk), 
NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), 
NUMBER_FORMAT.format(getTotalOutstanding()));
+
             }
 
             @Override
@@ -409,11 +404,13 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
     private void trackItemAndBytesWritten(long sizeInBytes)
     {
+        currentItems++;
         batchItemsSent++;
         batchSizeInBytes += sizeInBytes;
 
         // If our queue is larger than our flush threashold, then we should 
flush the queue.
-        if(batchSizeInBytes > flushThresholdSizeInBytes)
+        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+                (currentItems >= batchSize) )
             flushInternal();
     }
 
@@ -530,6 +527,8 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     @Override
     public void prepare(Object configurationObject) {
         mapper = StreamsJacksonMapper.getInstance();
+        veryLargeBulk = this.config.getBulk();
+        batchSize = this.config.getBatchSize();
         start();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 21aad5c..5e169b7 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -13,6 +13,16 @@
         "type": {
             "type": "string",
             "description": "Type to write as"
+        },
+        "bulk": {
+            "type": "boolean",
+            "description": "Index in large or small batches",
+            "default": "false"
+        },
+        "batchSize": {
+            "type": "integer",
+            "description": "Item Count before flush",
+            "default": 100
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
index 68820c9..5e88bdd 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java
@@ -27,7 +27,7 @@ import java.util.Queue;
  */
 public class TwitterTypeConverter implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "TwitterTypeConverter";
+    public final static String STREAMS_ID = "TwitterTypeConverter";
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterTypeConverter.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 3df7d02..ba88803 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -1,10 +1,7 @@
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -19,10 +16,9 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.BasicAuth;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.core.*;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
 import org.apache.streams.twitter.processor.TwitterEventProcessor;
 import org.joda.time.DateTime;
@@ -31,7 +27,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.Collection;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.*;
@@ -39,7 +34,7 @@ import java.util.concurrent.*;
 /**
  * Created by sblackmon on 12/10/13.
  */
-public class TwitterStreamProvider implements StreamsProvider, Serializable {
+public class TwitterStreamProvider implements StreamsProvider, Serializable, 
DatumStatusCountable {
 
     public final static String STREAMS_ID = "TwitterStreamProvider";
 
@@ -47,8 +42,6 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
 
     private TwitterStreamConfiguration config;
 
-    private Class klass;
-
     public TwitterStreamConfiguration getConfig() {
         return config;
     }
@@ -57,9 +50,9 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
         this.config = config;
     }
 
-    protected BlockingQueue inQueue = new LinkedBlockingQueue<String>(10000);
+    protected BlockingQueue hosebirdQueue = new 
LinkedBlockingQueue<String>(1000);
 
-    protected volatile Queue<StreamsDatum> providerQueue;
+    protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>(1000);
 
     protected Hosts hosebirdHosts;
     protected Authentication auth;
@@ -68,6 +61,9 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
 
     protected ListeningExecutorService executor;
 
+    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+    private DatumStatusCounter countersTotal = new DatumStatusCounter();
+
     private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
@@ -83,50 +79,43 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
         this.config = config;
     }
 
-    public TwitterStreamProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("twitter");
-        this.config = TwitterStreamConfigurator.detectConfiguration(config);
-        this.klass = klass;
-        providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-    }
-
-    public TwitterStreamProvider(TwitterStreamConfiguration config, Class 
klass) {
-        this.config = config;
-        this.klass = klass;
-        providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    }
-
     @Override
     public void startStream() {
 
         for (int i = 0; i < 5; i++) {
-            executor.submit(new TwitterEventProcessor(inQueue, providerQueue, 
klass));
+            executor.submit(new TwitterEventProcessor(hosebirdQueue, 
providerQueue, String.class));
         }
 
         new Thread(new TwitterStreamProviderTask(this)).start();
+
     }
 
     @Override
     public synchronized StreamsResultSet readCurrent() {
-        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
-        Iterators.addAll(currentIterator, providerQueue.iterator());
 
-        StreamsResultSet current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+        StreamsResultSet current;
 
-        providerQueue.clear();
+        synchronized( TwitterStreamProvider.class ) {
+            current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+            current.setCounter(new DatumStatusCounter());
+            current.getCounter().add(countersCurrent);
+            countersTotal.add(countersCurrent);
+            countersCurrent = new DatumStatusCounter();
+            providerQueue.clear();
+        }
 
         return current;
     }
 
     @Override
     public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
+        throw new NotImplementedException();
     }
 
     @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
+    public StreamsResultSet readRange(DateTime start, DateTime end)
+    {
+        throw new NotImplementedException();
     }
 
     @Override
@@ -134,8 +123,6 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
 
         executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
-        Preconditions.checkNotNull(this.klass);
-
         Preconditions.checkNotNull(config.getEndpoint());
 
         if(config.getEndpoint().equals("userstream") ) {
@@ -212,7 +199,8 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
             .hosts(hosebirdHosts)
             .endpoint(endpoint)
             .authentication(auth)
-            .processor(new StringDelimitedProcessor(inQueue))
+            .connectionTimeout(1200000)
+            .processor(new StringDelimitedProcessor(hosebirdQueue))
             .build();
 
     }
@@ -220,7 +208,12 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable {
     @Override
     public void cleanUp() {
         for (int i = 0; i < 5; i++) {
-            inQueue.add(TwitterEventProcessor.TERMINATE);
+            hosebirdQueue.add(TwitterEventProcessor.TERMINATE);
         }
     }
+
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return countersTotal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b9551ad..20ee951 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,10 +2,6 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -25,7 +21,9 @@ import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.*;
 
 /**
@@ -120,7 +118,8 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
     @Override
     public void startStream() {
-        // no op
+        LOGGER.debug("{} startStream", STREAMS_ID);
+        throw new org.apache.commons.lang.NotImplementedException();
     }
 
     private void captureTimeline(long currentId) {
@@ -182,15 +181,20 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     }
 
     public StreamsResultSet readCurrent() {
+        LOGGER.debug("{} readCurrent", STREAMS_ID);
 
         Preconditions.checkArgument(ids.hasNext());
 
-        LOGGER.info("readCurrent");
+        StreamsResultSet current;
+
+        synchronized( TwitterTimelineProvider.class ) {
+
+            while( ids.hasNext() ) {
+                Long currentId = ids.next();
+                LOGGER.info("Provider Task Starting: {}", currentId);
+                captureTimeline(currentId);
+            }
 
-        while( ids.hasNext() ) {
-            Long currentId = ids.next();
-            LOGGER.info("Provider Task Starting: {}", currentId);
-            captureTimeline(currentId);
         }
 
         LOGGER.info("Finished.  Cleaning up...");
@@ -212,11 +216,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
     public StreamsResultSet readRange(DateTime start, DateTime end) {
         LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
+        throw new NotImplementedException();
     }
 
     void shutdownAndAwaitTermination(ExecutorService pool) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/522096e8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 5cf515c..101b47c 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -32,7 +32,7 @@ public class StreamsProviderTask extends BaseStreamsTask 
implements DatumStatusC
     private static final int START = 0;
     private static final int END = 1;
 
-    private static final int DEFAULT_TIMEOUT_MS = 30000;
+    private static final int DEFAULT_TIMEOUT_MS = 1000000;
 
     private StreamsProvider provider;
     private AtomicBoolean keepRunning;

Reply via email to