Improve error handling so that a failure to ack one messages does not abort 
whole run


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/88d429f9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/88d429f9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/88d429f9

Branch: refs/heads/master
Commit: 88d429f95ab44018ecb290db8806e239e251757a
Parents: 4d45d1c
Author: Dave Johnson <snoopd...@apache.org>
Authored: Wed Oct 5 09:53:33 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Wed Oct 5 09:53:33 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 197 ++++++++++---------
 1 file changed, 109 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/88d429f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 6add88c..71ad5fb 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -20,19 +20,15 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.persistence.index.impl.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
@@ -50,6 +46,9 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -57,30 +56,27 @@ import 
org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
-import org.apache.usergrid.persistence.queue.LegacyQueueManager;
-import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
-import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
-import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
 import rx.schedulers.Schedulers;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 
 /**
- * TODO, this whole class is becoming a nightmare.  We need to remove all 
consume from this class and refactor it into the following manner.
+ * TODO, this whole class is becoming a nightmare.
+ * We need to remove all consume from this class and refactor it into the 
following manner.
  *
  * 1. Produce.  Keep the code in the handle as is
  * 2. Consume:  Move the code into a refactored system
@@ -231,7 +227,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
 
     /**
-     * Take message from SQS
+     * Take message
      */
     private List<LegacyQueueMessage> take() {
 
@@ -247,31 +243,37 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
     }
 
 
-
     /**
-     * Ack message in SQS
+     * Ack message
      */
     public void ack(final List<LegacyQueueMessage> messages) {
 
         final Timer.Context timer = this.ackTimer.time();
 
-        try{
-            queue.commitMessages( messages );
+        try {
 
-            //decrement our in-flight counter
-            inFlight.decrementAndGet();
+            for ( LegacyQueueMessage legacyQueueMessage : messages ) {
+                try {
+                    queue.commitMessage( legacyQueueMessage );
+                    inFlight.decrementAndGet();
 
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }finally {
-            timer.stop();
-        }
+                } catch ( Throwable t ) {
+                    logger.error("Continuing after error acking message: " + 
legacyQueueMessage.getMessageId() );
+                }
+            }
 
+        } catch (Exception e) {
+            throw new RuntimeException( "Unable to ack messages", e );
 
+        } finally {
+            timer.stop();
+        }
     }
 
+
     /**
-     * calls the event handlers and returns a result with information on 
whether it needs to be ack'd and whether it needs to be indexed
+     * calls the event handlers and returns a result with information on 
whether
+     * it needs to be ack'd and whether it needs to be indexed
      * @param messages
      * @return
      */
@@ -396,6 +398,7 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         return indexEventResults.collect(Collectors.toList());
     }
 
+
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope 
applicationScope) {
         IndexLocationStrategy indexLocationStrategy = 
indexLocationStrategyFactory.getIndexLocationStrategy(
@@ -429,7 +432,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         final AsyncEvent event = ( AsyncEvent ) message.getBody();
 
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null 
for handleEntityIndexUpdate");
-        Preconditions.checkArgument(event instanceof EntityIndexEvent, 
String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got 
%s", event.getClass()));
+        Preconditions.checkArgument(event instanceof EntityIndexEvent,
+            String.format("Event Type for handleEntityIndexUpdate must be 
ENTITY_INDEX, got %s", event.getClass()));
 
         final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
 
@@ -441,10 +445,12 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         final Id entityId = entityIdScope.getId();
         final long updatedAfter = entityIndexEvent.getUpdatedAfter();
 
-        final EntityIndexOperation entityIndexOperation = new 
EntityIndexOperation( applicationScope, entityId, updatedAfter);
+        final EntityIndexOperation entityIndexOperation =
+            new EntityIndexOperation( applicationScope, entityId, 
updatedAfter);
 
         // default this observable's return to empty index operation message 
if nothing is emitted
-        return eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildEntityIndex( entityIndexOperation )
+            .toBlocking().lastOrDefault(new IndexOperationMessage());
     }
 
 
@@ -464,15 +470,18 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
         Preconditions.checkNotNull( message, "QueueMessage Body cannot be null 
for handleEdgeIndex" );
-        Preconditions.checkArgument(event instanceof EdgeIndexEvent, 
String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", 
event.getClass()));
+        Preconditions.checkArgument(event instanceof EdgeIndexEvent,
+            String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, 
got %s", event.getClass()));
 
         final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
 
-        final EntityCollectionManager ecm = 
entityCollectionManagerFactory.createCollectionManager( 
edgeIndexEvent.getApplicationScope() );
+        final EntityCollectionManager ecm =
+            entityCollectionManagerFactory.createCollectionManager( 
edgeIndexEvent.getApplicationScope() );
 
         // default this observable's return to empty index operation message 
if nothing is emitted
         return ecm.load( edgeIndexEvent.getEntityId() )
-            .flatMap( loadedEntity -> 
eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, 
edgeIndexEvent.getEdge()) )
+            .flatMap( loadedEntity ->
+                
eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, 
edgeIndexEvent.getEdge()))
             .toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }
@@ -493,7 +502,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
         Preconditions.checkNotNull( message, "QueueMessage Body cannot be null 
for handleEdgeDelete" );
-        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, 
String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", 
event.getClass()));
+        Preconditions.checkArgument(event instanceof EdgeDeleteEvent,
+            String.format("Event Type for handleEdgeDelete must be 
EDGE_DELETE, got %s", event.getClass()));
 
 
         final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
@@ -506,7 +516,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         }
 
         // default this observable's return to empty index operation message 
if nothing is emitted
-        return eventBuilder.buildDeleteEdge(applicationScope, 
edge).toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildDeleteEdge(applicationScope, edge)
+            .toBlocking().lastOrDefault(new IndexOperationMessage());
 
     }
 
@@ -545,7 +556,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         offerTopic( elasticsearchIndexEvent );
     }
 
-    private void handleIndexOperation(final ElasticsearchIndexEvent 
elasticsearchIndexEvent) throws IndexDocNotFoundException {
+    private void handleIndexOperation(final ElasticsearchIndexEvent 
elasticsearchIndexEvent)
+        throws IndexDocNotFoundException {
 
         Preconditions.checkNotNull( elasticsearchIndexEvent, 
"elasticsearchIndexEvent cannot be null" );
 
@@ -560,7 +572,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         if(message == null) {
 
             // provide some time back pressure before performing a quorum read
-            if ( queueFig.getQuorumFallback() && System.currentTimeMillis() > 
elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+            if ( queueFig.getQuorumFallback() && System.currentTimeMillis() >
+                elasticsearchIndexEvent.getCreationTime() + 
queueFig.getLocalQuorumTimeout() ) {
 
                 if(logger.isDebugEnabled()){
                     logger.debug("ES batch with id {} not found, reading with 
strong consistency", messageId);
@@ -569,10 +582,12 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                 final String highConsistency = 
esMapPersistence.getStringHighConsistency(messageId.toString());
                 if (highConsistency == null) {
 
-                   throw new RuntimeException("ES batch with id "+messageId+" 
not found when reading with strong consistency");
+                   throw new RuntimeException("ES batch with id " +
+                       messageId+" not found when reading with strong 
consistency");
                }
 
-               indexOperationMessage = 
ObjectJsonSerializer.INSTANCE.fromString(highConsistency, 
IndexOperationMessage.class);
+               indexOperationMessage =
+                   ObjectJsonSerializer.INSTANCE.fromString(highConsistency, 
IndexOperationMessage.class);
 
            } else {
 
@@ -702,7 +717,9 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
 
     private void handleInitializeApplicationIndex(final AsyncEvent event, 
final LegacyQueueMessage message) {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for 
handleInitializeApplicationIndex");
-        Preconditions.checkArgument(event instanceof 
InitializeApplicationIndexEvent, String.format("Event Type for 
handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", 
event.getClass()));
+        Preconditions.checkArgument(event instanceof 
InitializeApplicationIndexEvent,
+            String.format("Event Type for handleInitializeApplicationIndex 
must be APPLICATION_INDEX, got %s",
+                event.getClass()));
 
         final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
             ( InitializeApplicationIndexEvent ) event;
@@ -785,40 +802,42 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
                     } )        //this won't block our read loop, just reads 
and proceeds
                         .flatMap( sqsMessages -> {
 
-                            //do this on a different schedule, and introduce 
concurrency with flatmap for faster processing
+                            //do this on a different schedule, and introduce 
concurrency
+                            // with flatmap for faster processing
                             return Observable.just( sqsMessages )
 
-                                             .map( messages -> {
-                                                 if ( messages == null || 
messages.size() == 0 ) {
-                                                     // no messages came from 
the queue, move on
-                                                     return null;
-                                                 }
-
-                                                 try {
-                                                     // process the messages
-                                                     List<IndexEventResult> 
indexEventResults = callEventHandlers( messages );
-
-                                                     // submit the processed 
messages to index producer
-                                                     List<LegacyQueueMessage> 
messagesToAck = submitToIndex( indexEventResults );
-
-                                                     if ( messagesToAck.size() 
< messages.size() ) {
-                                                         logger.warn( "Missing 
{} message(s) from index processing",
-                                                            messages.size() - 
messagesToAck.size() );
-                                                     }
-
-                                                     // ack each message if 
making it to this point
-                                                     if( messagesToAck.size() 
> 0 ){
-                                                         ack( messagesToAck );
-                                                     }
-
-                                                     return messagesToAck;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "Failed to 
ack messages", e );
-                                                     return null;
-                                                     //do not rethrow so we 
can process all of them
-                                                 }
-                                             } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+                                 .map( messages -> {
+                                     if ( messages == null || messages.size() 
== 0 ) {
+                                         // no messages came from the queue, 
move on
+                                         return null;
+                                     }
+
+                                     try {
+                                         // process the messages
+                                         List<IndexEventResult> 
indexEventResults = callEventHandlers( messages );
+
+                                         // submit the processed messages to 
index producer
+                                         List<LegacyQueueMessage> 
messagesToAck = submitToIndex( indexEventResults );
+
+                                         if ( messagesToAck.size() < 
messages.size() ) {
+                                             logger.warn( "Missing {} 
message(s) from index processing",
+                                                messages.size() - 
messagesToAck.size() );
+                                         }
+
+                                         // ack each message if making it to 
this point
+                                         if( messagesToAck.size() > 0 ){
+                                             ack( messagesToAck );
+                                         }
+
+                                         return messagesToAck;
+                                     }
+                                     catch ( Exception e ) {
+                                         logger.error( "Failed to ack 
messages", e );
+                                         return null;
+                                         //do not rethrow so we can process 
all of them
+                                     }
+                                 } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
+
                             //end flatMap
                         }, indexProcessorFig.getEventConcurrencyFactor() );
 
@@ -871,7 +890,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         EntityIndexOperation entityIndexOperation =
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
-        queueIndexOperationMessage(eventBuilder.buildEntityIndex( 
entityIndexOperation ).toBlocking().lastOrDefault(null));
+        queueIndexOperationMessage(
+            eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(null));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long 
updatedSince) {
@@ -901,7 +921,8 @@ public class AsyncEventServiceImpl implements 
AsyncEventService {
         private final Optional<LegacyQueueMessage> queueMessage;
         private final long creationTime;
 
-        public IndexEventResult(Optional<IndexOperationMessage> 
indexOperationMessage, Optional<LegacyQueueMessage> queueMessage, long 
creationTime){
+        public IndexEventResult(Optional<IndexOperationMessage> 
indexOperationMessage,
+                                Optional<LegacyQueueMessage> queueMessage, 
long creationTime){
 
             this.queueMessage = queueMessage;
             this.creationTime = creationTime;

Reply via email to