Repository: usergrid Updated Branches: refs/heads/delete-event-updates [created] 70d7a9586
Make the graph read repair directly compact nodes in graph instead of queueing events. Misc prop file changes. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70d7a958 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70d7a958 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70d7a958 Branch: refs/heads/delete-event-updates Commit: 70d7a9586ece0f32ec5aa50334cd4d70f440b2c6 Parents: 5eed978 Author: Michael Russo <michaelaru...@gmail.com> Authored: Mon Oct 26 22:26:07 2015 -0700 Committer: Michael Russo <michaelaru...@gmail.com> Committed: Mon Oct 26 22:26:07 2015 -0700 ---------------------------------------------------------------------- .../index/IndexProcessorFig.java | 4 ++-- .../read/traverse/AbstractReadGraphFilter.java | 25 ++++++++++---------- .../traverse/ReadGraphCollectionFilter.java | 6 ++--- .../traverse/ReadGraphConnectionFilter.java | 6 ++--- 4 files changed, 21 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index ec9b315..7650c62 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -65,14 +65,14 @@ public interface IndexProcessorFig extends GuicyFig { * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs. * If the timeout occurs, the messages will become visible again for re-processing. */ - @Default( "5000" ) // 5 seconds + @Default( "30000" ) // 30 seconds @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT ) int getIndexQueueVisibilityTimeout(); /** * The number of worker threads used to read index write requests from the queue. */ - @Default( "8" ) + @Default( "16" ) @Key( ELASTICSEARCH_WORKER_COUNT ) int getWorkerCount(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index 621edd2..9d050c8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -20,10 +20,10 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; @@ -50,16 +50,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class ); private final GraphManagerFactory graphManagerFactory; - private final AsyncEventService asyncEventService; + private final RxTaskScheduler rxTaskScheduler; /** * Create a new instance of our command */ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory, - final AsyncEventService asyncEventService ) { + final RxTaskScheduler rxTaskScheduler) { this.graphManagerFactory = graphManagerFactory; - this.asyncEventService = asyncEventService; + this.rxTaskScheduler = rxTaskScheduler; } @@ -109,25 +109,26 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, if(isDeleted){ - logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge ); - asyncEventService.queueDeleteEdge( applicationScope, markedEdge ); + logger.trace("Edge {} is deleted, deleting the edge", markedEdge); + graphManager.deleteEdge(markedEdge).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); } if(isSourceNodeDeleted){ final Id sourceNodeId = markedEdge.getSourceNode(); - logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId ); - - asyncEventService.queueEntityDelete( applicationScope, sourceNodeId ); + logger.trace("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId); + graphManager.compactNode(sourceNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); } if(isTargetNodeDelete){ final Id targetNodeId = markedEdge.getTargetNode(); - logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId ); - - asyncEventService.queueEntityDelete( applicationScope, targetNodeId ); + logger.trace("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId ); + graphManager.compactNode(targetNodeId).subscribeOn(rxTaskScheduler.getAsyncIOScheduler()) + .subscribe(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java index db5a0a8..1d63bc6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -41,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) { - super( graphManagerFactory, asyncEventService ); + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String collectionName ) { + super( graphManagerFactory, rxTaskScheduler ); this.collectionName = collectionName; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/70d7a958/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java index 93e8fd4..efe94db 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; -import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -41,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) { - super( graphManagerFactory, asyncEventService ); + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final RxTaskScheduler rxTaskScheduler, @Assisted final String connectionName ) { + super( graphManagerFactory, rxTaskScheduler ); this.connectionName = connectionName; }