Updated Persist writer to have a configurable background flush thread
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c837a2cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c837a2cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c837a2cf Branch: refs/heads/master Commit: c837a2cf8800efd324003ba5a9299c16b9f8701a Parents: e6ffe29 Author: mfranklin <[email protected]> Authored: Thu May 1 13:59:32 2014 -0400 Committer: mfranklin <[email protected]> Committed: Thu May 1 13:59:32 2014 -0400 ---------------------------------------------------------------------- .../ElasticsearchConfigurator.java | 4 +- .../ElasticsearchPersistWriter.java | 184 +++++++++++-------- .../ElasticsearchWriterConfiguration.json | 6 +- 3 files changed, 116 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 483a177..8a343a6 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory; import java.util.List; /** - * Created by sblackmon on 12/10/13. + * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration */ public class ElasticsearchConfigurator { @@ -51,9 +51,11 @@ public class ElasticsearchConfigurator { String index = elasticsearch.getString("index"); String type = elasticsearch.getString("type"); + Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null; elasticsearchWriterConfiguration.setIndex(index); elasticsearchWriterConfiguration.setType(type); + elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush); return elasticsearchWriterConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index ff7e0f5..c574eeb 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -36,10 +36,15 @@ import java.io.OutputStreamWriter; import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable { public static final String STREAMS_ID = "ElasticsearchPersistWriter"; - public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD; private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class); @@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private static final long WAITING_DOCS_LIMIT = 10000; private static final int BYTES_IN_MB = 1024 * 1024; private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; + private static final long MAX_MS_BEFORE_FLUSH = 10000; private final List<String> affectedIndexes = new ArrayList<String>(); + private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private ObjectMapper mapper = new StreamsJacksonMapper(); private ElasticsearchClientManager manager; @@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private OutputStreamWriter currentWriter = null; private int batchSize = 50; private int totalRecordsWritten = 0; + private long maxMsBeforeFlush; private boolean veryLargeBulk = false; // by default this setting is set to false protected Thread task; @@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private volatile int batchItemsSent = 0; private volatile int totalByteCount = 0; private volatile int byteCount = 0; + private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); public ElasticsearchPersistWriter() { Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = (ElasticsearchWriterConfiguration) ElasticsearchConfigurator.detectConfiguration(config); + this.config = ElasticsearchConfigurator.detectWriterConfiguration(config); } public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { @@ -173,6 +183,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab try { flush(); + backgroundFlushTask.shutdownNow(); } catch (IOException e) { e.printStackTrace(); } @@ -240,6 +251,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab @Override public void prepare(Object configurationObject) { + maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs(); mapper = StreamsJacksonMapper.getInstance(); start(); } @@ -254,7 +266,17 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab } public void start() { - + backgroundFlushTask.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + LOGGER.debug("Checking to see if data needs to be flushed"); + long time = System.currentTimeMillis() - lastWrite.get(); + if (time > maxMsBeforeFlush && batchItemsSent > 0) { + LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time); + flushInternal(); + } + } + }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS); manager = new ElasticsearchClientManager(config); client = manager.getClient(); @@ -262,62 +284,63 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab } public void flushInternal() { - synchronized (this) { - // we do not have a working bulk request, we can just exit here. - if (this.bulkRequest == null || batchItemsSent == 0) - return; + //This is unlocked in the callback + lock.writeLock().lock(); + // we do not have a working bulk request, we can just exit here. + if (this.bulkRequest == null || batchItemsSent == 0) + return; - // call the flush command. - flush(this.bulkRequest, batchItemsSent, batchSizeInBytes); + // call the flush command. + flush(this.bulkRequest, batchItemsSent, batchSizeInBytes); - // null the flush request, this will be created in the 'add' function below - this.bulkRequest = null; + // null the flush request, this will be created in the 'add' function below + this.bulkRequest = null; - // record the proper statistics, and add it to our totals. - this.totalSizeInBytes += this.batchSizeInBytes; - this.totalSent += batchItemsSent; + // record the proper statistics, and add it to our totals. + this.totalSizeInBytes += this.batchSizeInBytes; + this.totalSent += batchItemsSent; - // reset the current batch statistics - this.batchSizeInBytes = 0; - this.batchItemsSent = 0; + // reset the current batch statistics + this.batchSizeInBytes = 0; + this.batchItemsSent = 0; - try { - int count = 0; - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { - /**************************************************************************** - * Author: - * Smashew - * - * Date: - * 2013-10-20 - * - * Note: - * With the information that we have on hand. We need to develop a heuristic - * that will determine when the cluster is having a problem indexing records - * by telling it to pause and wait for it to catch back up. A - * - * There is an impact to us, the caller, whenever this happens as well. Items - * that are not yet fully indexed by the server sit in a queue, on the client - * that can cause the heap to overflow. This has been seen when re-indexing - * large amounts of data to a small cluster. The "deletes" + "indexes" can - * cause the server to have many 'outstandingItems" in queue. Running this - * software with large amounts of data, on a small cluster, can re-create - * this problem. - * - * DO NOT DELETE THESE LINES - ****************************************************************************/ - - // wait for the flush to catch up. We are going to cap this at - while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) - Thread.sleep(10); - - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) - LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); - } - } catch (Exception e) { - LOGGER.info("We were broken from our loop: {}", e.getMessage()); + try { + int count = 0; + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { + /**************************************************************************** + * Author: + * Smashew + * + * Date: + * 2013-10-20 + * + * Note: + * With the information that we have on hand. We need to develop a heuristic + * that will determine when the cluster is having a problem indexing records + * by telling it to pause and wait for it to catch back up. A + * + * There is an impact to us, the caller, whenever this happens as well. Items + * that are not yet fully indexed by the server sit in a queue, on the client + * that can cause the heap to overflow. This has been seen when re-indexing + * large amounts of data to a small cluster. The "deletes" + "indexes" can + * cause the server to have many 'outstandingItems" in queue. Running this + * software with large amounts of data, on a small cluster, can re-create + * this problem. + * + * DO NOT DELETE THESE LINES + ****************************************************************************/ + + // wait for the flush to catch up. We are going to cap this at + while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) + Thread.sleep(10); + + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) + LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); } + } catch (Exception e) { + LOGGER.info("We were broken from our loop: {}", e.getMessage()); } + } public void add(String indexName, String type, String json) { @@ -353,32 +376,35 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab public void add(UpdateRequest updateRequest) { Preconditions.checkNotNull(updateRequest); - synchronized (this) { - checkAndCreateBulkRequest(); - checkIndexImplications(updateRequest.index()); - bulkRequest.add(updateRequest); - try { - Optional<Integer> size = Objects.firstNonNull( - Optional.fromNullable(updateRequest.doc().source().length()), - Optional.fromNullable(updateRequest.script().length())); - trackItemAndBytesWritten(size.get().longValue()); - } catch (NullPointerException x) { - trackItemAndBytesWritten(1000); - } + lock.writeLock().lock(); + checkAndCreateBulkRequest(); + checkIndexImplications(updateRequest.index()); + bulkRequest.add(updateRequest); + try { + Optional<Integer> size = Objects.firstNonNull( + Optional.fromNullable(updateRequest.doc().source().length()), + Optional.fromNullable(updateRequest.script().length())); + trackItemAndBytesWritten(size.get().longValue()); + } catch (NullPointerException x) { + trackItemAndBytesWritten(1000); + } finally { + lock.writeLock().unlock(); } } public void add(IndexRequest indexRequest) { - synchronized (this) { - checkAndCreateBulkRequest(); - checkIndexImplications(indexRequest.index()); - bulkRequest.add(indexRequest); - try { - trackItemAndBytesWritten(indexRequest.source().length()); - } catch (NullPointerException x) { - LOGGER.warn("NPE adding/sizing indexrequest"); - } + lock.writeLock().lock(); + checkAndCreateBulkRequest(); + checkIndexImplications(indexRequest.index()); + bulkRequest.add(indexRequest); + try { + trackItemAndBytesWritten(indexRequest.source().length()); + } catch (NullPointerException x) { + LOGGER.warn("NPE adding/sizing indexrequest"); + } finally { + lock.writeLock().unlock(); } + } public void createIndexIfMissing(String indexName) { @@ -447,6 +473,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab } private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { + LOGGER.debug("Attempting to write {} items to ES", thisSent); bulkRequest.execute().addListener(new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkItemResponses) { @@ -473,6 +500,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab if (thisSent != (thisOk + thisFailed)) LOGGER.error("We sent more items than this"); + lastWrite.set(System.currentTimeMillis()); + lock.writeLock().unlock(); + LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis), MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); @@ -480,12 +510,11 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab @Override public void onFailure(Throwable e) { + lock.writeLock().unlock(); LOGGER.error("Error bulk loading: {}", e.getMessage()); e.printStackTrace(); } }); - - this.notify(); } private void trackItemAndBytesWritten(long sizeInBytes) { @@ -499,9 +528,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab private void checkAndCreateBulkRequest() { // Synchronize to ensure that we don't lose any records - synchronized (this) { + lock.writeLock().lock(); + try { if (bulkRequest == null) bulkRequest = this.manager.getClient().prepareBulk(); + } finally { + lock.writeLock().unlock(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json index 21aad5c..c56aa82 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json @@ -13,6 +13,10 @@ "type": { "type": "string", "description": "Type to write as" - } + }, + "MaxTimeBetweenFlushMs": { + "type": "String", + "format": "utc-millisec" + } } } \ No newline at end of file
