Merge branch 'streamstutorial' of 
https://git-wip-us.apache.org/repos/asf/incubator-streams into streamstutorial

Conflicts:
        
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
        
streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c59ab10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c59ab10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c59ab10e

Branch: refs/heads/master
Commit: c59ab10ed92c13d3b116c75f63a2fb921f0a43dc
Parents: 7e4e109 1cc9d0b
Author: sblackmon <[email protected]>
Authored: Thu May 8 15:11:44 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Thu May 8 15:11:44 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +-
 .../ElasticsearchPersistWriter.java             | 261 +++++++++++--------
 .../ElasticsearchWriterConfiguration.json       |   5 +-
 .../provider/SysomosHeartbeatStream.java        |  45 ++--
 .../provider/TwitterTimelineProvider.java       |  10 +-
 .../local/builders/LocalStreamBuilder.java      | 161 ++++++++----
 .../local/tasks/StreamsPersistWriterTask.java   |   6 +-
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 8 files changed, 304 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c59ab10e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc 
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index e99c4ff,ce23197..97796e2
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -373,26 -396,30 +397,29 @@@ public class ElasticsearchPersistWrite
      }
  
      public void add(IndexRequest indexRequest) {
-         synchronized (this) {
-             checkAndCreateBulkRequest();
-             checkIndexImplications(indexRequest.index());
-             bulkRequest.add(indexRequest);
-             try {
-                 trackItemAndBytesWritten(indexRequest.source().length());
-             } catch (NullPointerException x) {
-                 LOGGER.warn("NPE adding/sizing indexrequest");
-             }
+         lock.writeLock().lock();
+         checkAndCreateBulkRequest();
+         checkIndexImplications(indexRequest.index());
+         bulkRequest.add(indexRequest);
+         try {
+             trackItemAndBytesWritten(indexRequest.source().length());
+         } catch (NullPointerException x) {
+             LOGGER.warn("NPE adding/sizing indexrequest");
+         } finally {
+             lock.writeLock().unlock();
          }
 -
      }
  
-     private void checkAndCreateBulkRequest()
+     private void trackItemAndBytesWritten(long sizeInBytes)
      {
-         // Synchronize to ensure that we don't lose any records
-         synchronized (this)
-         {
-             if(bulkRequest == null)
-                 bulkRequest = this.manager.getClient().prepareBulk();
-         }
+         currentItems++;
+         batchItemsSent++;
+         batchSizeInBytes += sizeInBytes;
+ 
+         // If our queue is larger than our flush threashold, then we should 
flush the queue.
+         if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+                 (currentItems >= batchSize) )
+             flushInternal();
      }
  
      private void checkIndexImplications(String indexName)

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c59ab10e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --cc 
streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 06ba7be,799abe7..3799737
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@@ -22,8 -22,12 +22,12 @@@ import twitter4j.conf.ConfigurationBuil
  import twitter4j.json.DataObjectFactory;
  
  import java.io.Serializable;
 -import java.math.BigInteger;
+ import java.lang.Math;
 +import java.math.BigInteger;
- import java.util.*;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Queue;
  import java.util.concurrent.ArrayBlockingQueue;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.ThreadPoolExecutor;

Reply via email to