Repository: usergrid
Updated Branches:
  refs/heads/master e504aae65 -> fdd4e0be3


Make it possible to configure either LOCAL, DISTRIBUTED or DISTRIBUTED_SNS 
queue; also: set Core tests to use DISTRIBUTED queue (i.e. Qakka).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/278abd29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/278abd29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/278abd29

Branch: refs/heads/master
Commit: 278abd29473938c0b8169e9ddf8d1e6f2ce0539b
Parents: e504aae
Author: Dave Johnson <snoopd...@apache.org>
Authored: Fri Nov 18 17:13:34 2016 -0500
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Fri Nov 18 17:13:34 2016 -0500

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   9 +-
 .../corepersistence/CpEntityManager.java        |   3 +-
 .../usergrid/corepersistence/GuiceFactory.java  |   2 +-
 .../asyncevents/AsyncIndexProvider.java         | 103 ++++---------------
 .../corepersistence/TestCoreModule.java         |   6 +-
 .../corepersistence/TestIndexModule.java        |  10 +-
 .../resources/usergrid-custom-test.properties   |   1 +
 .../persistence/index/guice/IndexModule.java    |  11 +-
 .../index/guice/TestIndexModule.java            |   8 +-
 .../persistence/queue/LegacyQueueManager.java   |   9 ++
 .../persistence/queue/guice/QueueModule.java    |  48 +++++++--
 .../usergrid/persistence/queue/TestModule.java  |   2 +-
 12 files changed, 114 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 781eede..ef4bb04 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -44,6 +44,7 @@ import 
org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphN
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
+import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
 
 
@@ -52,6 +53,11 @@ import java.util.concurrent.ThreadPoolExecutor;
  */
 public class CoreModule extends AbstractModule {
 
+    private final Properties properties;
+
+    public CoreModule( Properties properties ) {
+        this.properties = properties;
+    }
 
     @Override
     protected void configure() {
@@ -79,7 +85,8 @@ public class CoreModule extends AbstractModule {
                 bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} 
).to( AllNodesInGraphImpl.class );
             }
         } );
-        install( new IndexModule() {
+
+        install( new IndexModule( properties ) {
             @Override
             public void configureMigrationProvider() {
                 bind( new 
TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 87a8649..c6a757e 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -3106,6 +3106,7 @@ public class CpEntityManager implements EntityManager {
             Entity refreshEntity = create("refresh", map);
             EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo
                 = 
managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
+
             try {
                 for (int i = 0; i < 20; i++) {
                     if (searchCollection(
@@ -3118,7 +3119,7 @@ public class CpEntityManager implements EntityManager {
                         hasFinished = true;
                         break;
                     }
-                    Thread.sleep(100);
+                    Thread.sleep(500);
 
                     indexRefreshCommandInfo
                         = 
managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
index f2be27a..1758801 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java
@@ -148,7 +148,7 @@ public class GuiceFactory implements FactoryBean<Injector> {
             Module serviceModule 
=(Module)applicationContext.getBean("serviceModule");
             moduleList.add( serviceModule);
         }
-        moduleList.add(new CoreModule());
+        moduleList.add(new CoreModule( systemProperties ));
         moduleList.add(new PersistenceModule(applicationContext));
         //we have to inject a couple of spring beans into our Guice.  Wire it 
with PersistenceModule
         injector = Guice.createInjector( moduleList );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 561dfdc..81960f5 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -27,7 +27,7 @@ import 
org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.apache.usergrid.persistence.queue.LocalQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
@@ -36,6 +36,8 @@ import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 
+import static 
org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL;
+
 
 /**
  * A provider to allow users to configure their queue impl via properties
@@ -100,88 +102,25 @@ public class AsyncIndexProvider implements 
Provider<AsyncEventService> {
     private AsyncEventService getIndexService() {
         final String value = indexProcessorFig.getQueueImplementation();
 
-        final Implementations impl = Implementations.valueOf(value);
-
-        switch (impl) {
-
-            case LOCAL:
-                AsyncEventServiceImpl eventService =
-                    new AsyncEventServiceImpl(scope -> new LocalQueueManager(),
-                        indexProcessorFig,
-                        indexProducer,
-                        metricsFactory,
-                        entityCollectionManagerFactory,
-                        indexLocationStrategyFactory,
-                        entityIndexFactory,
-                        eventBuilder,
-                        mapManagerFactory,
-                        queueFig,rxTaskScheduler);
-                eventService.MAX_TAKE = 1000;
-                return eventService;
-
-            case SQS:
-                throw new IllegalArgumentException(
-                    "Configuration value of SQS is no longer allowed. Use SNS 
instead with only a single region.");
-
-            case SNS:
-                return new AsyncEventServiceImpl(
-                    queueManagerFactory,
-                    indexProcessorFig,
-                    indexProducer,
-                    metricsFactory,
-                    entityCollectionManagerFactory,
-                    indexLocationStrategyFactory,
-                    entityIndexFactory,
-                    eventBuilder,
-                    mapManagerFactory,
-                    queueFig,
-                    rxTaskScheduler );
-
-            case MULTIREGION:
-                return new AsyncEventServiceImpl(
-                    queueManagerFactory,
-                    indexProcessorFig,
-                    indexProducer,
-                    metricsFactory,
-                    entityCollectionManagerFactory,
-                    indexLocationStrategyFactory,
-                    entityIndexFactory,
-                    eventBuilder,
-                    mapManagerFactory,
-                    queueFig,
-                    rxTaskScheduler );
-
-            default:
-                throw new IllegalArgumentException("Configuration value of " + 
getErrorValues() + " are allowed");
+        final LegacyQueueManager.Implementation impl = 
LegacyQueueManager.Implementation.valueOf(value);
+
+        final AsyncEventServiceImpl asyncEventService = new 
AsyncEventServiceImpl(
+            queueManagerFactory,
+            indexProcessorFig,
+            indexProducer,
+            metricsFactory,
+            entityCollectionManagerFactory,
+            indexLocationStrategyFactory,
+            entityIndexFactory,
+            eventBuilder,
+            mapManagerFactory,
+            queueFig,
+            rxTaskScheduler );
+
+        if ( impl.equals( LOCAL )) {
+            asyncEventService.MAX_TAKE = 1000;
         }
-    }
-
-
-    private String getErrorValues() {
-        String values = "";
-
-        for (final Implementations impl : Implementations.values()) {
-            values += impl + ", ";
-        }
-
-        values = values.substring(0, values.length() - 2);
 
-        return values;
-    }
-
-
-    /**
-     * Different implementations
-     */
-    public static enum Implementations {
-        TEST,
-        LOCAL,
-        SQS,         // deprecated
-        SNS,         // deprecated
-        MULTIREGION; // built-in Akka-based queue
-
-        public String asString() {
-            return toString();
-        }
+        return asyncEventService;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
index 32b9845..8cab054 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence;
 
 import org.apache.usergrid.persistence.core.guice.TestModule;
 
+import java.util.Properties;
+
 
 /**
  * Test guice module for our core guice configuration
@@ -29,6 +31,8 @@ public class TestCoreModule extends TestModule {
     @Override
     protected void configure() {
 
-        install( new CoreModule() );
+        Properties properties = new Properties();
+        properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" );
+        install( new CoreModule( properties ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
index 95000bf..ecd16ef 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.persistence.PersistenceModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
 
+import java.util.Properties;
+
 
 public class TestIndexModule extends TestModule {
 
@@ -32,12 +34,14 @@ public class TestIndexModule extends TestModule {
     @Override
     protected void configure() {
 
-
         //TODO, refactor to guice to get rid of this
-        final ApplicationContext singleton = 
ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext();
+        final ApplicationContext singleton =
+            
ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext();
 
         //this will break, we need to untagle this and move to guice in core 
completely
-        install( new CoreModule() );
+        Properties properties = new Properties();
+        properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" );
+        install( new CoreModule( properties ) );
         install( new PersistenceModule( singleton ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties 
b/stack/core/src/test/resources/usergrid-custom-test.properties
index 55d881f..4070d4a 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -28,6 +28,7 @@ elasticsearch.management_number_replicas=0
 elasticsearch.managment_index=usergrid_core_management
 #cassandra.keyspace.application=core_tests_schema
 elasticsearch.queue_impl.resolution=true
+elasticsearch.queue_impl=LOCAL
 
 elasticsearch.buffer_timeout=1
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 47399c7..86d5d36 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -35,9 +35,18 @@ import 
org.apache.usergrid.persistence.queue.guice.QueueModule;
 
 import org.safehaus.guicyfig.GuicyFigModule;
 
+import java.util.Properties;
+
 
 public abstract class IndexModule extends AbstractModule {
 
+    private final Properties properties;
+
+    public IndexModule( Properties properties ) {
+        this.properties = properties;
+    }
+
+
     @Override
     protected void configure() {
 
@@ -45,7 +54,7 @@ public abstract class IndexModule extends AbstractModule {
         install(new GuicyFigModule(IndexFig.class));
 
         install(new MapModule());
-        install(new QueueModule());
+        install(new QueueModule( properties.getProperty( 
"elasticsearch.queue_impl" ) ));
 
         bind( EntityIndexFactory.class ).to( EsEntityIndexFactoryImpl.class );
         bind(IndexCache.class).to(EsIndexCacheImpl.class);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 3bc6193..62b2bd2 100644
--- 
a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ 
b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -32,6 +32,8 @@ import com.google.inject.TypeLiteral;
 
 import rx.Observable;
 
+import java.util.Properties;
+
 
 public class TestIndexModule extends TestModule {
 
@@ -42,7 +44,11 @@ public class TestIndexModule extends TestModule {
         install( new CommonModule());
 
         // configure collections and our core astyanax framework
-        install( new IndexModule(){
+
+        Properties properties = new Properties();
+        properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" );
+
+        install( new IndexModule( properties ) {
             @Override
             public  void configureMigrationProvider(){
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
index 6627148..afe229d 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -27,6 +27,15 @@ import java.util.List;
 public interface LegacyQueueManager {
 
     /**
+     * Different implementations
+     */
+    enum Implementation {
+        LOCAL,           // local in-memory queue
+        DISTRIBUTED,     // built-in Akka-based queue
+        DISTRIBUTED_SNS; // SNS queue
+    }
+
+    /**
      * Read messages from queue
      * @param limit
      * @param klass class to cast the return from

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index f0e0900..e426247 100644
--- 
a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ 
b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -21,20 +21,35 @@ package org.apache.usergrid.persistence.queue.guice;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import org.apache.usergrid.persistence.qakka.QakkaModule;
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
-import org.apache.usergrid.persistence.queue.LegacyQueueManager;
-import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
-import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory;
+import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager;
 import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
+import java.util.Properties;
+
 
 /**
  * Simple module for wiring our collection api
  */
 public class QueueModule extends AbstractModule {
 
+    private LegacyQueueManager.Implementation implementation;
+
+    public QueueModule( String queueManagerType ) {
+
+        if ( "LOCAL".equals( queueManagerType ) ) {
+            this.implementation = LegacyQueueManager.Implementation.LOCAL;
+        }
+        else if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
+            this.implementation = 
LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
+        }
+        else if ( "DISTRIBUTED".equals( queueManagerType ) ) {
+            this.implementation = 
LegacyQueueManager.Implementation.DISTRIBUTED;
+        }
+    }
+
 
     @Override
     protected void configure() {
@@ -43,8 +58,29 @@ public class QueueModule extends AbstractModule {
 
         
bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
 
-        install( new 
FactoryModuleBuilder().implement(LegacyQueueManager.class, 
QakkaQueueManager.class)
-            .build(LegacyQueueManagerInternalFactory.class));
+        switch (implementation) {
+
+            case LOCAL:
+
+                install( new FactoryModuleBuilder().implement( 
LegacyQueueManager.class, LocalQueueManager.class )
+                    .build( LegacyQueueManagerInternalFactory.class ) );
+                break;
+
+            case DISTRIBUTED_SNS:
+                install( new FactoryModuleBuilder().implement( 
LegacyQueueManager.class, SNSQueueManagerImpl.class )
+                    .build( LegacyQueueManagerInternalFactory.class ) );
+                break;
+
+            case DISTRIBUTED:
+                install( new FactoryModuleBuilder().implement( 
LegacyQueueManager.class, QakkaQueueManager.class )
+                    .build( LegacyQueueManagerInternalFactory.class ) );
+                break;
+
+            default:
+                throw new IllegalArgumentException(
+                    "Queue implemetation value of " + implementation + " not 
allowed");
+
+        }
 
         install( new QakkaModule() );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java
 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java
index 9d2ed24..78be24f 100644
--- 
a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java
+++ 
b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java
@@ -35,7 +35,7 @@ public class TestModule  extends AbstractModule {
 
         install( new CommonModule() );
         install( new ActorSystemModule() );
-        install( new QueueModule() );
+        install( new QueueModule( "DISTRIBUTED" ) );
 
     }
 

Reply via email to