Fixes empty payload notification issue.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a Branch: refs/heads/master Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8 Parents: 3e15585 Author: Todd Nine <tn...@apigee.com> Authored: Mon Oct 19 13:53:30 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Oct 19 13:53:30 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 55 ++++++++++++-------- .../asyncevents/AsyncIndexProvider.java | 4 +- .../index/AmazonAsyncEventServiceTest.java | 2 +- 3 files changed, 36 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index f8ef5e7..6b2eb45 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -89,6 +89,21 @@ import rx.Subscription; import rx.schedulers.Schedulers; +/** + * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. + * + * 1. Produce. Keep the code in the handle as is + * 2. Consume: Move the code into a refactored system + * 2.1 A central dispatcher + * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own + * impl that will then emit a stream of batch operations to perform + * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler + * 2.4 The batch handler will roll up the operations into a batch size, and then queue them + * 2.5 The receive batch handler will execute the batch operations + * + * TODO determine how we error handle? + * + */ @Singleton public class AmazonAsyncEventService implements AsyncEventService { @@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements AsyncEventService { public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( applicationScope ); - offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy))); + offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), + new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) ); } @@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements AsyncEventService { final String message = esMapPersistence.getString( messageId.toString() ); - String highConsistency = null; + final IndexOperationMessage indexOperationMessage; if(message == null){ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" ); - highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); - - } + final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); - //read the value from the string + if(highConsistency == null){ + logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" ); - final IndexOperationMessage indexOperationMessage; + throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); + } - //our original local read has it, parse it. - if(message != null){ - indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); - } - //we tried to read it at a higher consistency level and it works - else if (highConsistency != null){ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class ); - } - //we couldn't find it, bail - else{ - logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" ); - - throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); + } else{ + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); } + //read the value from the string + + Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" ); + Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" ); //now execute it @@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements AsyncEventService { .map(result -> result.getQueueMessage().get()) .collect(Collectors.toList()); - //send the batch - //TODO: should retry? - queueIndexOperationMessage( combined ); + //only Q it if it's empty + if(!combined.isEmpty()) { + queueIndexOperationMessage( combined ); + } return messagesToAck; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 1649046..2bace8d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -105,10 +105,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { case LOCAL: return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously()); case SQS: - throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead"); + throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); case SNS: return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, - entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); + entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index 8ee47a2..625a8fd 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); + return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); }