Repository: usergrid
Updated Branches:
  refs/heads/2.1-release 70b7e2750 -> 8da60e2aa


Fix issue where read repair was causing empty messages to be queued.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/91354471
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/91354471
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/91354471

Branch: refs/heads/2.1-release
Commit: 913544719ddc84c1eb6cbbdfe463a7712ad73051
Parents: dfc70f4
Author: Michael Russo <michaelaru...@gmail.com>
Authored: Wed Nov 4 14:02:53 2015 -0800
Committer: Michael Russo <michaelaru...@gmail.com>
Committed: Wed Nov 4 14:02:53 2015 -0800

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java | 8 +++++---
 .../pipeline/read/traverse/AbstractReadGraphFilter.java      | 2 +-
 .../persistence/index/impl/IndexOperationMessage.java        | 4 ++--
 3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6b9abbc..16e119c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -490,6 +490,11 @@ public class AmazonAsyncEventService implements 
AsyncEventService {
      */
     public void queueIndexOperationMessage( final IndexOperationMessage 
indexOperationMessage ) {
 
+        // don't try to produce something with nothing
+        if(indexOperationMessage.isEmpty()){
+            return;
+        }
+
         final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( 
indexOperationMessage );
 
         final UUID newMessageId = UUIDGenerator.newTimeUUID();
@@ -760,10 +765,7 @@ public class AmazonAsyncEventService implements 
AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             .collect(Collectors.toList());
 
-        //only Q it if it's empty
-        if(!combined.isEmpty()) {
             queueIndexOperationMessage( combined );
-        }
 
         return messagesToAck;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 89230d7..78a3450 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -245,8 +245,8 @@ public abstract class AbstractReadGraphFilter extends 
AbstractPathFilter<Id, Id,
     private Observable.Transformer<IndexOperationMessage, 
IndexOperationMessage> applyCollector() {
 
         return observable -> observable
-            .filter((IndexOperationMessage msg) -> !msg.isEmpty())
             .collect(() -> new IndexOperationMessage(), (collector, single) -> 
collector.ingest(single))
+            .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
                 asyncEventService.queueIndexOperationMessage(indexOperation);
             });

http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index bcee308..7d19ce3 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -115,7 +115,7 @@ public class IndexOperationMessage implements Serializable {
     }
 
     public void ingest(IndexOperationMessage singleMessage) {
-        
this.indexRequests.addAll(singleMessage.getIndexRequests().stream().collect(Collectors.toList()));
-        
this.deIndexRequests.addAll(singleMessage.getDeIndexRequests().stream().collect(Collectors.toList()));
+        this.indexRequests.addAll(singleMessage.getIndexRequests());
+        this.deIndexRequests.addAll(singleMessage.getDeIndexRequests());
     }
 }

Reply via email to