Improve error handling so that a failure to ack one messages does not abort whole run
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/88d429f9 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/88d429f9 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/88d429f9 Branch: refs/heads/master Commit: 88d429f95ab44018ecb290db8806e239e251757a Parents: 4d45d1c Author: Dave Johnson <snoopd...@apache.org> Authored: Wed Oct 5 09:53:33 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Wed Oct 5 09:53:33 2016 -0400 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 197 ++++++++++--------- 1 file changed, 109 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/88d429f9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 6add88c..71ad5fb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -20,19 +20,15 @@ package org.apache.usergrid.corepersistence.asyncevents; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; - +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; import org.apache.usergrid.corepersistence.asyncevents.model.*; -import org.apache.usergrid.persistence.index.impl.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.usergrid.corepersistence.index.EntityIndexOperation; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; @@ -50,6 +46,9 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexLocationStrategy; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.index.impl.IndexingUtils; import org.apache.usergrid.persistence.map.MapManager; import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.map.MapScope; @@ -57,30 +56,27 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.LegacyQueueFig; -import org.apache.usergrid.persistence.queue.LegacyQueueManager; -import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; -import org.apache.usergrid.persistence.queue.LegacyQueueMessage; -import org.apache.usergrid.persistence.queue.LegacyQueueScope; +import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.inject.Inject; -import com.google.inject.Singleton; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.Subscriber; import rx.Subscription; import rx.schedulers.Schedulers; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + /** - * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. + * TODO, this whole class is becoming a nightmare. + * We need to remove all consume from this class and refactor it into the following manner. * * 1. Produce. Keep the code in the handle as is * 2. Consume: Move the code into a refactored system @@ -231,7 +227,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** - * Take message from SQS + * Take message */ private List<LegacyQueueMessage> take() { @@ -247,31 +243,37 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - /** - * Ack message in SQS + * Ack message */ public void ack(final List<LegacyQueueMessage> messages) { final Timer.Context timer = this.ackTimer.time(); - try{ - queue.commitMessages( messages ); + try { - //decrement our in-flight counter - inFlight.decrementAndGet(); + for ( LegacyQueueMessage legacyQueueMessage : messages ) { + try { + queue.commitMessage( legacyQueueMessage ); + inFlight.decrementAndGet(); - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); - }finally { - timer.stop(); - } + } catch ( Throwable t ) { + logger.error("Continuing after error acking message: " + legacyQueueMessage.getMessageId() ); + } + } + } catch (Exception e) { + throw new RuntimeException( "Unable to ack messages", e ); + } finally { + timer.stop(); + } } + /** - * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed + * calls the event handlers and returns a result with information on whether + * it needs to be ack'd and whether it needs to be indexed * @param messages * @return */ @@ -396,6 +398,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { return indexEventResults.collect(Collectors.toList()); } + @Override public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( @@ -429,7 +432,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final AsyncEvent event = ( AsyncEvent ) message.getBody(); Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); - Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); + Preconditions.checkArgument(event instanceof EntityIndexEvent, + String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; @@ -441,10 +445,12 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Id entityId = entityIdScope.getId(); final long updatedAfter = entityIndexEvent.getUpdatedAfter(); - final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); + final EntityIndexOperation entityIndexOperation = + new EntityIndexOperation( applicationScope, entityId, updatedAfter); // default this observable's return to empty index operation message if nothing is emitted - return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(new IndexOperationMessage()); + return eventBuilder.buildEntityIndex( entityIndexOperation ) + .toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -464,15 +470,18 @@ public class AsyncEventServiceImpl implements AsyncEventService { final AsyncEvent event = (AsyncEvent) message.getBody(); Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); - Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); + Preconditions.checkArgument(event instanceof EdgeIndexEvent, + String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() ); + final EntityCollectionManager ecm = + entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() ); // default this observable's return to empty index operation message if nothing is emitted return ecm.load( edgeIndexEvent.getEntityId() ) - .flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) ) + .flatMap( loadedEntity -> + eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge())) .toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -493,7 +502,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { final AsyncEvent event = (AsyncEvent) message.getBody(); Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" ); - Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); + Preconditions.checkArgument(event instanceof EdgeDeleteEvent, + String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event; @@ -506,7 +516,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { } // default this observable's return to empty index operation message if nothing is emitted - return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(new IndexOperationMessage()); + return eventBuilder.buildDeleteEdge(applicationScope, edge) + .toBlocking().lastOrDefault(new IndexOperationMessage()); } @@ -545,7 +556,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { offerTopic( elasticsearchIndexEvent ); } - private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException { + private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) + throws IndexDocNotFoundException { Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); @@ -560,7 +572,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { if(message == null) { // provide some time back pressure before performing a quorum read - if ( queueFig.getQuorumFallback() && System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { + if ( queueFig.getQuorumFallback() && System.currentTimeMillis() > + elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { if(logger.isDebugEnabled()){ logger.debug("ES batch with id {} not found, reading with strong consistency", messageId); @@ -569,10 +582,12 @@ public class AsyncEventServiceImpl implements AsyncEventService { final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString()); if (highConsistency == null) { - throw new RuntimeException("ES batch with id "+messageId+" not found when reading with strong consistency"); + throw new RuntimeException("ES batch with id " + + messageId+" not found when reading with strong consistency"); } - indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); + indexOperationMessage = + ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); } else { @@ -702,7 +717,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); - Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); + Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, + String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", + event.getClass())); final InitializeApplicationIndexEvent initializeApplicationIndexEvent = ( InitializeApplicationIndexEvent ) event; @@ -785,40 +802,42 @@ public class AsyncEventServiceImpl implements AsyncEventService { } ) //this won't block our read loop, just reads and proceeds .flatMap( sqsMessages -> { - //do this on a different schedule, and introduce concurrency with flatmap for faster processing + //do this on a different schedule, and introduce concurrency + // with flatmap for faster processing return Observable.just( sqsMessages ) - .map( messages -> { - if ( messages == null || messages.size() == 0 ) { - // no messages came from the queue, move on - return null; - } - - try { - // process the messages - List<IndexEventResult> indexEventResults = callEventHandlers( messages ); - - // submit the processed messages to index producer - List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults ); - - if ( messagesToAck.size() < messages.size() ) { - logger.warn( "Missing {} message(s) from index processing", - messages.size() - messagesToAck.size() ); - } - - // ack each message if making it to this point - if( messagesToAck.size() > 0 ){ - ack( messagesToAck ); - } - - return messagesToAck; - } - catch ( Exception e ) { - logger.error( "Failed to ack messages", e ); - return null; - //do not rethrow so we can process all of them - } - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { + // no messages came from the queue, move on + return null; + } + + try { + // process the messages + List<IndexEventResult> indexEventResults = callEventHandlers( messages ); + + // submit the processed messages to index producer + List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults ); + + if ( messagesToAck.size() < messages.size() ) { + logger.warn( "Missing {} message(s) from index processing", + messages.size() - messagesToAck.size() ); + } + + // ack each message if making it to this point + if( messagesToAck.size() > 0 ){ + ack( messagesToAck ); + } + + return messagesToAck; + } + catch ( Exception e ) { + logger.error( "Failed to ack messages", e ); + return null; + //do not rethrow so we can process all of them + } + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); + //end flatMap }, indexProcessorFig.getEventConcurrencyFactor() ); @@ -871,7 +890,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince); - queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null)); + queueIndexOperationMessage( + eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null)); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { @@ -901,7 +921,8 @@ public class AsyncEventServiceImpl implements AsyncEventService { private final Optional<LegacyQueueMessage> queueMessage; private final long creationTime; - public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<LegacyQueueMessage> queueMessage, long creationTime){ + public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, + Optional<LegacyQueueMessage> queueMessage, long creationTime){ this.queueMessage = queueMessage; this.creationTime = creationTime;