Repository: usergrid Updated Branches: refs/heads/2.1-release 70b7e2750 -> 8da60e2aa
Fix issue where read repair was causing empty messages to be queued. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/91354471 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/91354471 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/91354471 Branch: refs/heads/2.1-release Commit: 913544719ddc84c1eb6cbbdfe463a7712ad73051 Parents: dfc70f4 Author: Michael Russo <michaelaru...@gmail.com> Authored: Wed Nov 4 14:02:53 2015 -0800 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Wed Nov 4 14:02:53 2015 -0800 ---------------------------------------------------------------------- .../corepersistence/asyncevents/AmazonAsyncEventService.java | 8 +++++--- .../pipeline/read/traverse/AbstractReadGraphFilter.java | 2 +- .../persistence/index/impl/IndexOperationMessage.java | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 6b9abbc..16e119c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -490,6 +490,11 @@ public class AmazonAsyncEventService implements AsyncEventService { */ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { + // don't try to produce something with nothing + if(indexOperationMessage.isEmpty()){ + return; + } + final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); final UUID newMessageId = UUIDGenerator.newTimeUUID(); @@ -760,10 +765,7 @@ public class AmazonAsyncEventService implements AsyncEventService { .map(result -> result.getQueueMessage().get()) .collect(Collectors.toList()); - //only Q it if it's empty - if(!combined.isEmpty()) { queueIndexOperationMessage( combined ); - } return messagesToAck; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 89230d7..78a3450 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -245,8 +245,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() { return observable -> observable - .filter((IndexOperationMessage msg) -> !msg.isEmpty()) .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) + .filter(msg -> !msg.isEmpty()) .doOnNext(indexOperation -> { asyncEventService.queueIndexOperationMessage(indexOperation); }); http://git-wip-us.apache.org/repos/asf/usergrid/blob/91354471/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java index bcee308..7d19ce3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java @@ -115,7 +115,7 @@ public class IndexOperationMessage implements Serializable { } public void ingest(IndexOperationMessage singleMessage) { - this.indexRequests.addAll(singleMessage.getIndexRequests().stream().collect(Collectors.toList())); - this.deIndexRequests.addAll(singleMessage.getDeIndexRequests().stream().collect(Collectors.toList())); + this.indexRequests.addAll(singleMessage.getIndexRequests()); + this.deIndexRequests.addAll(singleMessage.getDeIndexRequests()); } }