Fixes empty payload notification issue.

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a

Branch: refs/heads/master
Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8
Parents: 3e15585
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:53:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:53:30 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 55 ++++++++++++--------
 .../asyncevents/AsyncIndexProvider.java         |  4 +-
 .../index/AmazonAsyncEventServiceTest.java      |  2 +-
 3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f8ef5e7..6b2eb45 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -89,6 +89,21 @@ import rx.Subscription;
 import rx.schedulers.Schedulers;
 
 
+/**
+ * TODO, this whole class is becoming a nightmare.  We need to remove all 
consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce.  Keep the code in the handle as is
+ * 2. Consume:  Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation.  Any 
handler will be refactored into it's own
+ *      impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge 
them.  Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and 
then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
 @Singleton
 public class AmazonAsyncEventService implements AsyncEventService {
 
@@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements 
AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope 
applicationScope) {
         IndexLocationStrategy indexLocationStrategy = 
indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope );
-        offerTopic(new 
InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new 
ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+        offerTopic( new InitializeApplicationIndexEvent( 
queueFig.getPrimaryRegion(),
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
 
 
@@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements 
AsyncEventService {
 
         final String message = esMapPersistence.getString( 
messageId.toString() );
 
-        String highConsistency = null;
+        final IndexOperationMessage indexOperationMessage;
 
         if(message == null){
             logger.error( "Received message with id {} to process, unable to 
find it, reading with higher consistency level" );
 
-            highConsistency =  esMapPersistence.getStringHighConsistency( 
messageId.toString() );
-
-        }
+            final String highConsistency =  
esMapPersistence.getStringHighConsistency( messageId.toString() );
 
-        //read the value from the string
+            if(highConsistency == null){
+                logger.error( "Unable to find the ES batch with id {} to 
process at a higher consistency level" );
 
-        final IndexOperationMessage indexOperationMessage;
+                throw new RuntimeException( "Unable to find the ES batch to 
process with message id " + messageId );
+            }
 
-        //our original local read has it, parse it.
-        if(message != null){
-             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( 
message, IndexOperationMessage.class );
-        }
-        //we tried to read it at a higher consistency level and it works
-        else if (highConsistency != null){
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( 
highConsistency, IndexOperationMessage.class );
-        }
 
-        //we couldn't find it, bail
-        else{
-            logger.error( "Unable to find the ES batch with id {} to process 
at a higher consistency level" );
-
-            throw new RuntimeException( "Unable to find the ES batch to 
process with message id " + messageId );
+        } else{
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( 
message, IndexOperationMessage.class );
         }
 
+        //read the value from the string
+
+        Preconditions.checkNotNull( indexOperationMessage, 
"indexOperationMessage cannot be null" );
+        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , 
"queued indexOperationMessage messages should not be empty" );
 
 
         //now execute it
@@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements 
AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             .collect(Collectors.toList());
 
-        //send the batch
-        //TODO: should retry?
-        queueIndexOperationMessage( combined );
+        //only Q it if it's empty
+        if(!combined.isEmpty()) {
+            queueIndexOperationMessage( combined );
+        }
 
         return messagesToAck;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1649046..2bace8d 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -105,10 +105,10 @@ public class AsyncIndexProvider implements 
Provider<AsyncEventService> {
             case LOCAL:
                 return new InMemoryAsyncEventService(eventBuilder, 
rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
-                throw new IllegalArgumentException("Configuration value of SQS 
is no longer allowed. Use SNS instead");
+                throw new IllegalArgumentException("Configuration value of SQS 
is no longer allowed. Use SNS instead with only a single region");
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, 
indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, 
indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler 
);
+                    entityCollectionManagerFactory, 
indexLocationStrategyFactory,entityIndexFactory, eventBuilder, 
mapManagerFactory, queueFig, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + 
getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 8ee47a2..625a8fd 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends 
AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, 
indexProcessorFig, indexProducer, metricsFactory,  
entityCollectionManagerFactory, indexLocationStrategyFactory, 
entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, 
indexProcessorFig, indexProducer, metricsFactory,  
entityCollectionManagerFactory, indexLocationStrategyFactory, 
entityIndexFactory, eventBuilder, mapManagerFactory, queueFig,  rxTaskScheduler 
);
     }
 
 

Reply via email to