Updated locking to ensure multiple threads can offer to the queue but only one 
can swap the reference


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f0085694
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f0085694
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f0085694

Branch: refs/heads/STREAMS-58
Commit: f00856946e6cb2ee83d05e96af29730d0f88dfe9
Parents: b5fd7e7
Author: mfranklin <[email protected]>
Authored: Tue Apr 22 13:08:58 2014 -0400
Committer: mfranklin <[email protected]>
Committed: Tue Apr 22 13:08:58 2014 -0400

----------------------------------------------------------------------
 .../ElasticsearchPersistReader.java             | 20 +++++++++++++++++---
 .../elasticsearch/ElasticsearchQuery.java       |  2 --
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0085694/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index fd2a155..54d29d2 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -15,6 +15,8 @@ import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * 
***********************************************************************************************************
@@ -35,6 +37,7 @@ public class ElasticsearchPersistReader implements 
StreamsPersistReader, Seriali
     private ElasticsearchReaderConfiguration config;
     private int threadPoolSize = 10;
     private ExecutorService executor;
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
 
     public ElasticsearchPersistReader() {
     }
@@ -68,13 +71,16 @@ public class ElasticsearchPersistReader implements 
StreamsPersistReader, Seriali
 
         StreamsResultSet current;
 
-        synchronized (ElasticsearchPersistReader.class) {
+        try {
+            lock.writeLock().lock();
             current = new StreamsResultSet(persistQueue);
             current.setCounter(new DatumStatusCounter());
 //            current.getCounter().add(countersCurrent);
 //            countersTotal.add(countersCurrent);
 //            countersCurrent = new DatumStatusCounter();
             persistQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
         }
 
         return current;
@@ -99,11 +105,19 @@ public class ElasticsearchPersistReader implements 
StreamsPersistReader, Seriali
         LOGGER.info("PersistReader done");
     }
 
+    //The locking may appear to be counter intuitive but we really don't care 
if multiple threads offer to the queue
+    //as it is a synchronized queue.  What we do care about is that we don't 
want to be offering to the current reference
+    //if the queue is being replaced with a new instance
     protected void write(StreamsDatum entry) {
         boolean success;
         do {
-            success = persistQueue.offer(entry);
-            Thread.yield();
+            try {
+                lock.readLock().lock();
+                success = persistQueue.offer(entry);
+                Thread.yield();
+            }finally {
+                lock.readLock().unlock();
+            }
         }
         while (!success);
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f0085694/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 8c9abda..2430c41 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -20,8 +20,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 public class ElasticsearchQuery implements Iterable<SearchHit>, 
Iterator<SearchHit>, Serializable {
 

Reply via email to