Merge remote-tracking branch 'origin/pr/7' into streamstutorial
Conflicts:
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/145ec847
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/145ec847
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/145ec847
Branch: refs/heads/master
Commit: 145ec8472c9cf12e8e376ce09596f354644e587b
Parents: 7293594 f62afa5
Author: sblackmon <[email protected]>
Authored: Thu May 8 14:15:27 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Thu May 8 14:15:27 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchConfigurator.java | 4 +-
.../ElasticsearchPersistWriter.java | 273 +++++++++----------
.../ElasticsearchWriterConfiguration.json | 5 +-
.../provider/SysomosHeartbeatStream.java | 45 +--
.../local/builders/LocalStreamBuilder.java | 161 +++++++----
.../local/tasks/StreamsPersistWriterTask.java | 6 +-
.../local/tasks/StreamsProcessorTask.java | 2 +-
7 files changed, 271 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --cc
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 9ac234c,8a343a6..e07e50f
---
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@@ -51,17 -51,12 +51,19 @@@ public class ElasticsearchConfigurator
String index = elasticsearch.getString("index");
String type = elasticsearch.getString("type");
+ Long maxMsBeforeFlush =
elasticsearch.hasPath("MaxTimeBetweenFlushMs") ?
elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
+ if( elasticsearch.hasPath("bulk"))
+
elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
+
+ if( elasticsearch.hasPath("batchSize"))
+
elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
+
elasticsearchWriterConfiguration.setIndex(index);
elasticsearchWriterConfiguration.setType(type);
+
elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
+
return elasticsearchWriterConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc
streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 80d2775,5756e1c..ce23197
---
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -49,8 -54,11 +54,12 @@@ public class ElasticsearchPersistWrite
private static final long WAITING_DOCS_LIMIT = 10000;
private static final int BYTES_IN_MB = 1024 * 1024;
private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private static final long DEFAULT_MAX_WAIT = 10000;
++ private static final int DEFAULT_BATCH_SIZE = 100;
private final List<String> affectedIndexes = new ArrayList<String>();
+ private final ScheduledExecutorService backgroundFlushTask =
Executors.newSingleThreadScheduledExecutor();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private ObjectMapper mapper = new StreamsJacksonMapper();
private ElasticsearchClientManager manager;
@@@ -59,11 -67,11 +68,10 @@@
private String parentID = null;
private BulkRequestBuilder bulkRequest;
private OutputStreamWriter currentWriter = null;
- private int batchSize = 50;
- private int totalRecordsWritten = 0;
- private long maxMsBeforeFlush;
++ private int batchSize;
++ private long maxTimeBetweenFlushMs;
+ private boolean veryLargeBulk = false; // by default this setting is set
to false
- private long batchSize;
- private boolean veryLargeBulk; // by default this setting is set to false
- private int totalRecordsWritten = 0;
-
protected Thread task;
protected volatile Queue<StreamsDatum> persistQueue;
@@@ -99,8 -106,7 +108,6 @@@
this.veryLargeBulk = veryLargeBulk;
}
- private final List<String> affectedIndexes = new ArrayList<String>();
--
public int getTotalOutstanding() {
return this.totalSent - (this.totalFailed + this.totalOk);
}
@@@ -151,6 -155,6 +156,14 @@@
this.flushThresholdSizeInBytes = sizeInBytes;
}
++ public long getMaxTimeBetweenFlushMs() {
++ return maxTimeBetweenFlushMs;
++ }
++
++ public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
++ this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
++ }
++
public boolean isConnected() {
return (client != null);
}
@@@ -252,12 -250,13 +259,6 @@@
}
@Override
-- public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
- start();
- }
-
- @Override
- maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ?
DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs();
- mapper = StreamsJacksonMapper.getInstance();
- start();
- }
-
- @Override
public DatumStatusCounter getDatumStatusCounter() {
DatumStatusCounter counters = new DatumStatusCounter();
counters.incrementAttempt(this.batchItemsSent);
@@@ -267,7 -266,17 +268,17 @@@
}
public void start() {
-
+ backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.debug("Checking to see if data needs to be flushed");
+ long time = System.currentTimeMillis() - lastWrite.get();
- if (time > maxMsBeforeFlush && batchItemsSent > 0) {
++ if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
+ LOGGER.debug("Background Flush task determined {} are
waiting to be flushed. It has been {} since the last write to ES",
batchItemsSent, time);
+ flushInternal();
+ }
+ }
- }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
++ }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
manager = new ElasticsearchClientManager(config);
client = manager.getClient();
@@@ -383,70 -394,20 +396,62 @@@
}
public void add(IndexRequest indexRequest) {
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(indexRequest.index());
- bulkRequest.add(indexRequest);
- try {
- trackItemAndBytesWritten(indexRequest.source().length());
- } catch (NullPointerException x) {
- LOGGER.warn("NPE adding/sizing indexrequest");
- }
+ lock.writeLock().lock();
+ checkAndCreateBulkRequest();
+ checkIndexImplications(indexRequest.index());
+ bulkRequest.add(indexRequest);
+ try {
+ trackItemAndBytesWritten(indexRequest.source().length());
+ } catch (NullPointerException x) {
+ LOGGER.warn("NPE adding/sizing indexrequest");
+ } finally {
+ lock.writeLock().unlock();
}
+
}
+ private void trackItemAndBytesWritten(long sizeInBytes)
+ {
+ currentItems++;
+ batchItemsSent++;
+ batchSizeInBytes += sizeInBytes;
+
+ // If our queue is larger than our flush threashold, then we should
flush the queue.
+ if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
+ (currentItems >= batchSize) )
+ flushInternal();
+ }
+
- private void checkAndCreateBulkRequest()
- {
- // Synchronize to ensure that we don't lose any records
- synchronized (this)
- {
- if(bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- }
- }
-
+ private void checkIndexImplications(String indexName)
+ {
+
+ // check to see if we have seen this index before.
+ if(this.affectedIndexes.contains(indexName))
+ return;
+
+ // we haven't log this index.
+ this.affectedIndexes.add(indexName);
+
+ // Check to see if we are in 'veryLargeBulk' mode
+ // if we aren't, exit early
+ if(!this.veryLargeBulk)
+ return;
+
+
+ // They are in 'very large bulk' mode we want to turn off refreshing
the index.
+
+ // Create a request then add the setting to tell it to stop
refreshing the interval
+ UpdateSettingsRequest updateSettingsRequest = new
UpdateSettingsRequest(indexName);
+
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
+ }
+
public void createIndexIfMissing(String indexName) {
if (!this.manager.getClient()
.admin()
@@@ -512,15 -473,15 +517,24 @@@
return toReturn;
}
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
- veryLargeBulk = this.config.getBulk();
- batchSize = this.config.getBatchSize();
++ veryLargeBulk = config.getBulk() == null ? Boolean.FALSE :
config.getBulk();
++ batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE :
(int)(config.getBatchSize().longValue());
++ maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ?
DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue();
+ start();
+ }
+
+ /**
+ * This method is to ONLY be called by flushInternal otherwise the counts
will be off.
+ * @param bulkRequest
+ * @param thisSent
+ * @param thisSizeInBytes
+ */
private void flush(final BulkRequestBuilder bulkRequest, final Integer
thisSent, final Long thisSizeInBytes) {
+ final Object messenger = new Object();
+ LOGGER.debug("Attempting to write {} items to ES", thisSent);
bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
@@@ -558,18 -521,16 +574,9 @@@
e.printStackTrace();
}
});
-
- this.notify();
}
-- private void trackItemAndBytesWritten(long sizeInBytes) {
-- batchItemsSent++;
-- batchSizeInBytes += sizeInBytes;
-- // If our queue is larger than our flush threashold, then we should
flush the queue.
-- if (batchSizeInBytes > flushThresholdSizeInBytes)
-- flushInternal();
-- }
private void checkAndCreateBulkRequest() {
// Synchronize to ensure that we don't lose any records
@@@ -579,32 -543,32 +589,5 @@@
}
}
-- private void checkIndexImplications(String indexName) {
--
-- // check to see if we have seen this index before.
-- if (this.affectedIndexes.contains(indexName))
-- return;
--
-- // we haven't log this index.
-- this.affectedIndexes.add(indexName);
--
-- // Check to see if we are in 'veryLargeBulk' mode
-- // if we aren't, exit early
-- if (!this.veryLargeBulk)
-- return;
--
--
-- // They are in 'very large bulk' mode we want to turn off refreshing
the index.
--
-- // Create a request then add the setting to tell it to stop
refreshing the interval
-- UpdateSettingsRequest updateSettingsRequest = new
UpdateSettingsRequest(indexName);
--
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
--
-- // submit to ElasticSearch
-- this.manager.getClient()
-- .admin()
-- .indices()
-- .updateSettings(updateSettingsRequest)
-- .actionGet();
-- }
}
++
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --cc
streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 5e169b7,c56aa82..b107be6
---
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@@ -14,15 -14,9 +14,18 @@@
"type": "string",
"description": "Type to write as"
},
- "MaxTimeBetweenFlushMs": {
- "type": "String",
- "format": "utc-millisec"
+ "bulk": {
+ "type": "boolean",
+ "description": "Index in large or small batches",
+ "default": "false"
+ },
+ "batchSize": {
+ "type": "integer",
+ "description": "Item Count before flush",
+ "default": 100
- }
++ },
++ "maxTimeBetweenFlushMs": {
++ "type": "integer"
+ }
}
}