Updated Persist writer to have a configurable background flush thread

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c837a2cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c837a2cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c837a2cf

Branch: refs/heads/master
Commit: c837a2cf8800efd324003ba5a9299c16b9f8701a
Parents: e6ffe29
Author: mfranklin <[email protected]>
Authored: Thu May 1 13:59:32 2014 -0400
Committer: mfranklin <[email protected]>
Committed: Thu May 1 13:59:32 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +-
 .../ElasticsearchPersistWriter.java             | 184 +++++++++++--------
 .../ElasticsearchWriterConfiguration.json       |   6 +-
 3 files changed, 116 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 483a177..8a343a6 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -8,7 +8,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * Converts a {@link com.typesafe.config.Config} element into an instance of 
ElasticSearchConfiguration
  */
 public class ElasticsearchConfigurator {
 
@@ -51,9 +51,11 @@ public class ElasticsearchConfigurator {
 
         String index = elasticsearch.getString("index");
         String type = elasticsearch.getString("type");
+        Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") 
? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null;
 
         elasticsearchWriterConfiguration.setIndex(index);
         elasticsearchWriterConfiguration.setType(type);
+        
elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
 
         return elasticsearchWriterConfiguration;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index ff7e0f5..c574eeb 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -36,10 +36,15 @@ import java.io.OutputStreamWriter;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ElasticsearchPersistWriter implements StreamsPersistWriter, 
Flushable, Closeable, DatumStatusCountable {
     public static final String STREAMS_ID = "ElasticsearchPersistWriter";
-
     public volatile long flushThresholdSizeInBytes = 
DEFAULT_BULK_FLUSH_THRESHOLD;
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
@@ -49,8 +54,11 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     private static final long WAITING_DOCS_LIMIT = 10000;
     private static final int BYTES_IN_MB = 1024 * 1024;
     private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private static final long MAX_MS_BEFORE_FLUSH = 10000;
 
     private final List<String> affectedIndexes = new ArrayList<String>();
+    private final ScheduledExecutorService backgroundFlushTask = 
Executors.newSingleThreadScheduledExecutor();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private ObjectMapper mapper = new StreamsJacksonMapper();
     private ElasticsearchClientManager manager;
@@ -61,6 +69,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     private OutputStreamWriter currentWriter = null;
     private int batchSize = 50;
     private int totalRecordsWritten = 0;
+    private long maxMsBeforeFlush;
     private boolean veryLargeBulk = false;  // by default this setting is set 
to false
 
     protected Thread task;
@@ -78,10 +87,11 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     private volatile int batchItemsSent = 0;
     private volatile int totalByteCount = 0;
     private volatile int byteCount = 0;
+    private volatile AtomicLong lastWrite = new 
AtomicLong(System.currentTimeMillis());
 
     public ElasticsearchPersistWriter() {
         Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = (ElasticsearchWriterConfiguration) 
ElasticsearchConfigurator.detectConfiguration(config);
+        this.config = 
ElasticsearchConfigurator.detectWriterConfiguration(config);
     }
 
     public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) 
{
@@ -173,6 +183,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
         try {
             flush();
+            backgroundFlushTask.shutdownNow();
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -240,6 +251,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
     @Override
     public void prepare(Object configurationObject) {
+        maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? 
MAX_MS_BEFORE_FLUSH : config.getMaxTimeBetweenFlushMs();
         mapper = StreamsJacksonMapper.getInstance();
         start();
     }
@@ -254,7 +266,17 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     }
 
     public void start() {
-
+        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                LOGGER.debug("Checking to see if data needs to be flushed");
+                long time = System.currentTimeMillis() - lastWrite.get();
+                if (time > maxMsBeforeFlush && batchItemsSent > 0) {
+                    LOGGER.debug("Background Flush task determined {} are 
waiting to be flushed.  It has been {} since the last write to ES", 
batchItemsSent, time);
+                    flushInternal();
+                }
+            }
+        }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
         manager = new ElasticsearchClientManager(config);
         client = manager.getClient();
 
@@ -262,62 +284,63 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     }
 
     public void flushInternal() {
-        synchronized (this) {
-            // we do not have a working bulk request, we can just exit here.
-            if (this.bulkRequest == null || batchItemsSent == 0)
-                return;
+        //This is unlocked in the callback
+        lock.writeLock().lock();
+        // we do not have a working bulk request, we can just exit here.
+        if (this.bulkRequest == null || batchItemsSent == 0)
+            return;
 
-            // call the flush command.
-            flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+        // call the flush command.
+        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
 
-            // null the flush request, this will be created in the 'add' 
function below
-            this.bulkRequest = null;
+        // null the flush request, this will be created in the 'add' function 
below
+        this.bulkRequest = null;
 
-            // record the proper statistics, and add it to our totals.
-            this.totalSizeInBytes += this.batchSizeInBytes;
-            this.totalSent += batchItemsSent;
+        // record the proper statistics, and add it to our totals.
+        this.totalSizeInBytes += this.batchSizeInBytes;
+        this.totalSent += batchItemsSent;
 
-            // reset the current batch statistics
-            this.batchSizeInBytes = 0;
-            this.batchItemsSent = 0;
+        // reset the current batch statistics
+        this.batchSizeInBytes = 0;
+        this.batchItemsSent = 0;
 
-            try {
-                int count = 0;
-                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
-                    
/****************************************************************************
-                     * Author:
-                     * Smashew
-                     *
-                     * Date:
-                     * 2013-10-20
-                     *
-                     * Note:
-                     * With the information that we have on hand. We need to 
develop a heuristic
-                     * that will determine when the cluster is having a 
problem indexing records
-                     * by telling it to pause and wait for it to catch back 
up. A
-                     *
-                     * There is an impact to us, the caller, whenever this 
happens as well. Items
-                     * that are not yet fully indexed by the server sit in a 
queue, on the client
-                     * that can cause the heap to overflow. This has been seen 
when re-indexing
-                     * large amounts of data to a small cluster. The "deletes" 
+ "indexes" can
-                     * cause the server to have many 'outstandingItems" in 
queue. Running this
-                     * software with large amounts of data, on a small 
cluster, can re-create
-                     * this problem.
-                     *
-                     * DO NOT DELETE THESE LINES
-                     
****************************************************************************/
-
-                    // wait for the flush to catch up. We are going to cap 
this at
-                    while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && 
count++ < 500)
-                        Thread.sleep(10);
-
-                    if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
-                        LOGGER.warn("Even after back-off there are {} items 
still in queue.", this.getTotalOutstanding());
-                }
-            } catch (Exception e) {
-                LOGGER.info("We were broken from our loop: {}", 
e.getMessage());
+        try {
+            int count = 0;
+            if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
+                
/****************************************************************************
+                 * Author:
+                 * Smashew
+                 *
+                 * Date:
+                 * 2013-10-20
+                 *
+                 * Note:
+                 * With the information that we have on hand. We need to 
develop a heuristic
+                 * that will determine when the cluster is having a problem 
indexing records
+                 * by telling it to pause and wait for it to catch back up. A
+                 *
+                 * There is an impact to us, the caller, whenever this happens 
as well. Items
+                 * that are not yet fully indexed by the server sit in a 
queue, on the client
+                 * that can cause the heap to overflow. This has been seen 
when re-indexing
+                 * large amounts of data to a small cluster. The "deletes" + 
"indexes" can
+                 * cause the server to have many 'outstandingItems" in queue. 
Running this
+                 * software with large amounts of data, on a small cluster, 
can re-create
+                 * this problem.
+                 *
+                 * DO NOT DELETE THESE LINES
+                 
****************************************************************************/
+
+                // wait for the flush to catch up. We are going to cap this at
+                while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && 
count++ < 500)
+                    Thread.sleep(10);
+
+                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+                    LOGGER.warn("Even after back-off there are {} items still 
in queue.", this.getTotalOutstanding());
             }
+        } catch (Exception e) {
+            LOGGER.info("We were broken from our loop: {}", e.getMessage());
         }
+
     }
 
     public void add(String indexName, String type, String json) {
@@ -353,32 +376,35 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
     public void add(UpdateRequest updateRequest) {
         Preconditions.checkNotNull(updateRequest);
-        synchronized (this) {
-            checkAndCreateBulkRequest();
-            checkIndexImplications(updateRequest.index());
-            bulkRequest.add(updateRequest);
-            try {
-                Optional<Integer> size = Objects.firstNonNull(
-                        
Optional.fromNullable(updateRequest.doc().source().length()),
-                        
Optional.fromNullable(updateRequest.script().length()));
-                trackItemAndBytesWritten(size.get().longValue());
-            } catch (NullPointerException x) {
-                trackItemAndBytesWritten(1000);
-            }
+        lock.writeLock().lock();
+        checkAndCreateBulkRequest();
+        checkIndexImplications(updateRequest.index());
+        bulkRequest.add(updateRequest);
+        try {
+            Optional<Integer> size = Objects.firstNonNull(
+                    
Optional.fromNullable(updateRequest.doc().source().length()),
+                    Optional.fromNullable(updateRequest.script().length()));
+            trackItemAndBytesWritten(size.get().longValue());
+        } catch (NullPointerException x) {
+            trackItemAndBytesWritten(1000);
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 
     public void add(IndexRequest indexRequest) {
-        synchronized (this) {
-            checkAndCreateBulkRequest();
-            checkIndexImplications(indexRequest.index());
-            bulkRequest.add(indexRequest);
-            try {
-                trackItemAndBytesWritten(indexRequest.source().length());
-            } catch (NullPointerException x) {
-                LOGGER.warn("NPE adding/sizing indexrequest");
-            }
+        lock.writeLock().lock();
+        checkAndCreateBulkRequest();
+        checkIndexImplications(indexRequest.index());
+        bulkRequest.add(indexRequest);
+        try {
+            trackItemAndBytesWritten(indexRequest.source().length());
+        } catch (NullPointerException x) {
+            LOGGER.warn("NPE adding/sizing indexrequest");
+        } finally {
+            lock.writeLock().unlock();
         }
+
     }
 
     public void createIndexIfMissing(String indexName) {
@@ -447,6 +473,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
     }
 
     private void flush(final BulkRequestBuilder bulkRequest, final Integer 
thisSent, final Long thisSizeInBytes) {
+        LOGGER.debug("Attempting to write {} items to ES", thisSent);
         bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
             @Override
             public void onResponse(BulkResponse bulkItemResponses) {
@@ -473,6 +500,9 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
                 if (thisSent != (thisOk + thisFailed))
                     LOGGER.error("We sent more items than this");
 
+                lastWrite.set(System.currentTimeMillis());
+                lock.writeLock().unlock();
+
                 LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - 
Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
                         MEGABYTE_FORMAT.format((double) thisSizeInBytes / 
(double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), 
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
                         MEGABYTE_FORMAT.format((double) totalSizeInBytes / 
(double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), 
NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), 
NUMBER_FORMAT.format(getTotalOutstanding()));
@@ -480,12 +510,11 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
             @Override
             public void onFailure(Throwable e) {
+                lock.writeLock().unlock();
                 LOGGER.error("Error bulk loading: {}", e.getMessage());
                 e.printStackTrace();
             }
         });
-
-        this.notify();
     }
 
     private void trackItemAndBytesWritten(long sizeInBytes) {
@@ -499,9 +528,12 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, Flushab
 
     private void checkAndCreateBulkRequest() {
         // Synchronize to ensure that we don't lose any records
-        synchronized (this) {
+        lock.writeLock().lock();
+        try {
             if (bulkRequest == null)
                 bulkRequest = this.manager.getClient().prepareBulk();
+        } finally {
+            lock.writeLock().unlock();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c837a2cf/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 21aad5c..c56aa82 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@ -13,6 +13,10 @@
         "type": {
             "type": "string",
             "description": "Type to write as"
-        }
+        },
+               "MaxTimeBetweenFlushMs": {
+                       "type": "String",
+                       "format": "utc-millisec"
+               }
     }
 }
\ No newline at end of file

Reply via email to