Merge branch 'streamstutorial' of
https://git-wip-us.apache.org/repos/asf/incubator-streams into streamstutorial
Conflicts:
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c59ab10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c59ab10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c59ab10e
Branch: refs/heads/master
Commit: c59ab10ed92c13d3b116c75f63a2fb921f0a43dc
Parents: 7e4e109 1cc9d0b
Author: sblackmon <[email protected]>
Authored: Thu May 8 15:11:44 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Thu May 8 15:11:44 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +-
.../ElasticsearchPersistWriter.java | 261 +++++++++++--------
.../ElasticsearchWriterConfiguration.json | 5 +-
.../provider/SysomosHeartbeatStream.java | 45 ++--
.../provider/TwitterTimelineProvider.java | 10 +-
.../local/builders/LocalStreamBuilder.java | 161 ++++++++----
.../local/tasks/StreamsPersistWriterTask.java | 6 +-
.../local/tasks/StreamsProcessorTask.java | 2 +-
8 files changed, 304 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c59ab10e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index e99c4ff,ce23197..97796e2
---
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -373,26 -396,30 +397,29 @@@ public class ElasticsearchPersistWrite
}
public void add(IndexRequest indexRequest) {
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(indexRequest.index());
- bulkRequest.add(indexRequest);
- try {
- trackItemAndBytesWritten(indexRequest.source().length());
- } catch (NullPointerException x) {
- LOGGER.warn("NPE adding/sizing indexrequest");
- }
+ lock.writeLock().lock();
+ checkAndCreateBulkRequest();
+ checkIndexImplications(indexRequest.index());
+ bulkRequest.add(indexRequest);
+ try {
+ trackItemAndBytesWritten(indexRequest.source().length());
+ } catch (NullPointerException x) {
+ LOGGER.warn("NPE adding/sizing indexrequest");
+ } finally {
+ lock.writeLock().unlock();
}
-
}
- private void checkAndCreateBulkRequest()
+ private void trackItemAndBytesWritten(long sizeInBytes)
{
- // Synchronize to ensure that we don't lose any records
- synchronized (this)
- {
- if(bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- }
+ currentItems++;
+ batchItemsSent++;
+ batchSizeInBytes += sizeInBytes;
+
+ // If our queue is larger than our flush threashold, then we should
flush the queue.
+ if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+ (currentItems >= batchSize) )
+ flushInternal();
}
private void checkIndexImplications(String indexName)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c59ab10e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --cc
streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 06ba7be,799abe7..3799737
---
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@@ -22,8 -22,12 +22,12 @@@ import twitter4j.conf.ConfigurationBuil
import twitter4j.json.DataObjectFactory;
import java.io.Serializable;
-import java.math.BigInteger;
+ import java.lang.Math;
+import java.math.BigInteger;
- import java.util.*;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;