Merge branch 'master' into usergrid-1318-queue Conflicts: content/releases/index.html stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java website/content/releases/index.html
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7cc5c1c0 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7cc5c1c0 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7cc5c1c0 Branch: refs/heads/usergrid-1318-queue Commit: 7cc5c1c0701649bfdf31d695c10856f87510f181 Parents: f8c3a2d fe3bf56 Author: Dave Johnson <snoopd...@apache.org> Authored: Mon Oct 31 11:45:26 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Mon Oct 31 11:45:26 2016 -0400 ---------------------------------------------------------------------- .../docs/_sources/data-storage/collections.txt | 144 + .../_sources/installation/deployment-guide.txt | 125 +- .../docs/_sources/orgs-and-apps/application.txt | 104 + content/docs/data-storage/collections.html | 209 +- content/docs/index.html | 3 + content/docs/installation/deployment-guide.html | 16 +- content/docs/orgs-and-apps/application.html | 141 + content/docs/searchindex.js | 2 +- content/releases/index.html | 5 +- docs/installation/deployment-guide.md | 125 +- .../asyncevents/AsyncEventService.java | 3 +- .../asyncevents/AsyncEventServiceImpl.java | 189 +- .../index/IndexProcessorFig.java | 9 + .../corepersistence/index/ReIndexAction.java | 5 +- .../index/ReIndexServiceImpl.java | 4 +- .../read/traverse/AbstractReadGraphFilter.java | 2 +- .../AbstractReadReverseGraphFilter.java | 2 +- .../exceptions/AbstractExceptionMapper.java | 2 +- .../PasswordPolicyViolationExceptionMapper.java | 48 + .../UserResource/bad_confirmation_token.jsp | 33 + .../collection/users/PermissionsResourceIT.java | 4 +- .../collection/users/UserResourceIT.java | 38 +- .../usergrid/rest/management/AdminUsersIT.java | 51 + .../rest/management/ExternalSSOEnabledIT.java | 2 +- .../rest/management/ManagementResourceIT.java | 4 +- .../rest/management/RegistrationIT.java | 6 +- .../cassandra/ManagementServiceImpl.java | 77 +- .../usergrid/security/PasswordPolicy.java | 53 + .../usergrid/security/PasswordPolicyFig.java | 79 + .../usergrid/security/PasswordPolicyImpl.java | 156 + .../security/sso/ApigeeSSO2Provider.java | 111 +- .../PasswordPolicyViolationException.java | 46 + .../services/guice/ServiceModuleImpl.java | 8 + .../usergrid/security/ApigeeSSO2ProviderIT.java | 297 + .../usergrid/security/PasswordPolicyTest.java | 47 + .../security/PasswordPolicyTestFig.java | 161 + .../usergrid/tools/RemoveAdminUserFromOrg.java | 230 + website/content/releases/index.html | 6 +- website/tmp/checksums | 3 + website/tmp/compiled_content | 14130 +++++++++++++++++ website/tmp/dependencies | 10 + website/tmp/rule_memory | Bin 0 -> 13045 bytes 42 files changed, 16355 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index f941c11,dba4edf..9f931d3 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@@ -65,18 -78,11 +65,20 @@@ import rx.Subscriber import rx.Subscription; import rx.schedulers.Schedulers; +import java.io.IOException; +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + import static org.apache.commons.lang.StringUtils.isNotEmpty; + /** - * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner. + * TODO, this whole class is becoming a nightmare. + * We need to remove all consume from this class and refactor it into the following manner. * * 1. Produce. Keep the code in the handle as is * 2. Consume: Move the code into a refactored system @@@ -99,10 -105,13 +101,13 @@@ public class AsyncEventServiceImpl impl // SQS maximum receive messages is 10 public int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars + public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars + - private final QueueManager queue; - private final QueueManager utilityQueue; + private final LegacyQueueManager queue; ++ private final LegacyQueueManager utilityQueue; private final IndexProcessorFig indexProcessorFig; - private final QueueFig queueFig; + private final LegacyQueueFig queueFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; @@@ -155,8 -165,10 +161,14 @@@ this.rxTaskScheduler = rxTaskScheduler; - LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL); - QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); - QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL); ++ LegacyQueueScope queueScope = ++ new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL); ++ ++ LegacyQueueScope utilityQueueScope = ++ new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL); ++ this.queue = queueManagerFactory.getQueueManager(queueScope); + this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope); this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; @@@ -225,11 -245,20 +245,20 @@@ } } + private void offerBatchToUtilityQueue(final List operations){ + try { + //signal to SQS + this.utilityQueue.sendMessages(operations); + } catch (IOException e) { + throw new RuntimeException("Unable to queue message", e); + } + } + /** - * Take message from SQS + * Take message */ - private List<QueueMessage> take() { + private List<LegacyQueueMessage> take() { final Timer.Context timer = this.readTimer.time(); @@@ -242,38 -271,59 +271,67 @@@ } } + /** + * Take message from SQS utility queue + */ - private List<QueueMessage> takeFromUtilityQueue() { ++ private List<LegacyQueueMessage> takeFromUtilityQueue() { + + final Timer.Context timer = this.readTimer.time(); + + try { + return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class); + } + finally { + //stop our timer + timer.stop(); + } + } + - /** - * Ack message in SQS + * Ack message */ - public void ack(final List<QueueMessage> messages) { + public void ack(final List<LegacyQueueMessage> messages) { final Timer.Context timer = this.ackTimer.time(); - try{ - queue.commitMessages( messages ); + try { - //decrement our in-flight counter - inFlight.decrementAndGet(); + for ( LegacyQueueMessage legacyQueueMessage : messages ) { + try { + queue.commitMessage( legacyQueueMessage ); + inFlight.decrementAndGet(); - }catch(Exception e){ - throw new RuntimeException("Unable to ack messages", e); - }finally { - timer.stop(); - } + } catch ( Throwable t ) { + logger.error("Continuing after error acking message: " + legacyQueueMessage.getMessageId() ); + } + } + } catch (Exception e) { + throw new RuntimeException( "Unable to ack messages", e ); + } finally { + timer.stop(); + } } + /** + * calls the event handlers and returns a result with information on whether + * it needs to be ack'd and whether it needs to be indexed + * Ack message in SQS + */ - public void ackUtilityQueue(final List<QueueMessage> messages) { ++ public void ackUtilityQueue(final List<LegacyQueueMessage> messages) { + try{ + utilityQueue.commitMessages( messages ); + }catch(Exception e){ + throw new RuntimeException("Unable to ack messages", e); + } + } + + /** - * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed ++ * calls the event handlers and returns a result with information on whether ++ * it needs to be ack'd and whether it needs to be indexed * @param messages * @return */ @@@ -553,11 -593,10 +610,11 @@@ //send to the topic so all regions index the batch - offerTopic( elasticsearchIndexEvent ); + offerTopic( elasticsearchIndexEvent, forUtilityQueue ); } - private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException { + private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) + throws IndexDocNotFoundException { Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); @@@ -755,18 -794,22 +817,23 @@@ } - private void startWorker() { + private void startWorker(final String type) { + Preconditions.checkNotNull(type, "Worker type required"); synchronized (mutex) { + boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase()); + - Observable<List<QueueMessage>> consumer = - Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { + Observable<List<LegacyQueueMessage>> consumer = + Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() { @Override - public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { + public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) { //name our thread so it's easy to see - Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet(); ++ long threadNum = isUtilityQueue ? ++ counterUtility.incrementAndGet() : counter.incrementAndGet(); + Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum ); - List<QueueMessage> drainList = null; + List<LegacyQueueMessage> drainList = null; do { try { @@@ -802,42 -850,45 +874,48 @@@ } ) //this won't block our read loop, just reads and proceeds .flatMap( sqsMessages -> { - //do this on a different schedule, and introduce concurrency with flatmap for faster processing + //do this on a different schedule, and introduce concurrency + // with flatmap for faster processing return Observable.just( sqsMessages ) - .map( messages -> { - if ( messages == null || messages.size() == 0 ) { - // no messages came from the queue, move on - return null; - } - - try { - // process the messages - List<IndexEventResult> indexEventResults = callEventHandlers( messages ); - - // submit the processed messages to index producer - List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults ); - - if ( messagesToAck.size() < messages.size() ) { - logger.warn( "Missing {} message(s) from index processing", - messages.size() - messagesToAck.size() ); - } - - // ack each message if making it to this point - if( messagesToAck.size() > 0 ){ - ack( messagesToAck ); - } - - return messagesToAck; - } - catch ( Exception e ) { - logger.error( "Failed to ack messages", e ); - return null; - //do not rethrow so we can process all of them - } - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); - + .map( messages -> { + if ( messages == null || messages.size() == 0 ) { + // no messages came from the queue, move on + return null; + } + + try { + // process the messages - List<IndexEventResult> indexEventResults = callEventHandlers( messages ); ++ List<IndexEventResult> indexEventResults = ++ callEventHandlers( messages ); + + // submit the processed messages to index producer - List<QueueMessage> messagesToAck = submitToIndex( indexEventResults, isUtilityQueue ); ++ List<LegacyQueueMessage> messagesToAck = ++ submitToIndex( indexEventResults, isUtilityQueue ); + + if ( messagesToAck.size() < messages.size() ) { + logger.warn( "Missing {} message(s) from index processing", + messages.size() - messagesToAck.size() ); + } + + // ack each message if making it to this point + if( messagesToAck.size() > 0 ){ + + if ( isUtilityQueue ){ + ackUtilityQueue( messagesToAck ); + }else{ + ack( messagesToAck ); + } + } + + return messagesToAck; + } + catch ( Exception e ) { + logger.error( "Failed to ack messages", e ); + return null; + //do not rethrow so we can process all of them + } + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ); //end flatMap }, indexProcessorFig.getEventConcurrencyFactor() ); @@@ -853,7 -904,7 +931,7 @@@ * Submit results to index and return the queue messages to be ack'd * */ - private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { - private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) { ++ private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) { // if nothing came back then return empty list if(indexEventResults==null){ @@@ -890,17 -941,16 +968,18 @@@ EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince); - queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false); + queueIndexOperationMessage( - eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null)); ++ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false); } - public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { + public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) { final List<EntityIndexEvent> batch = new ArrayList<>(); edges.forEach(e -> { //change to id scope to avoid serialization issues -- batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); ++ batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), ++ new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince)); }); http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/website/content/releases/index.html ---------------------------------------------------------------------- diff --cc website/content/releases/index.html index 24fbe66,28fb6d7..7e54686 --- a/website/content/releases/index.html +++ b/website/content/releases/index.html @@@ -31,10 -31,7 +31,8 @@@ Project releases are approved by vote of the Apache Usergrid Project Management Committee (PMC). Support for a release is provided by project volunteers on the project <a href="http://usergrid.apache.org/community/#mailing-lists">mailing lists</a>. Bugs found in a release may be discussed on the list and reported through the <a href="https://issues.apache.org/jira/browse/USERGRID">issue tracker</a>. The user mailing list and issue tracker are the only support options hosted by the Apache Usergrid project. </p> <p> - Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a> - </p> - <p> - The PGP signatures can be verified using PGP or GPG. First download the <a href="https://www.apache.org/dist/usergrid/KEYS">KEYS</a> as well as the <a href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/">asc signature</a> file for the particular distribution. Then verify the signatures using: ++ + Note: When downloading from a mirror, please be sure to verify that checksums and signatures are correct. To do so, use the checksum and signature files from the main Apache site at <a href="https://www.apache.org/dist/usergrid/usergrid-2/v2.1.0/">https://dist.apache.org/repos/dist/release/usergrid/usergrid-2/v2.1.0/</a>. Find here the KEYS file, which contains all OpenPGP keys we use to sign releases here: <a href="https://www.apache.org/dist/usergrid/KEYS">https://www.apache.org/dist/usergrid/KEYS</a> </p> <p> % pgpk -a KEYS