Merge branch 'master' into usergrid-1318-queue

Conflicts:
        content/releases/index.html
        
stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
        website/content/releases/index.html


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7cc5c1c0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7cc5c1c0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7cc5c1c0

Branch: refs/heads/usergrid-1318-queue
Commit: 7cc5c1c0701649bfdf31d695c10856f87510f181
Parents: f8c3a2d fe3bf56
Author: Dave Johnson <snoopd...@apache.org>
Authored: Mon Oct 31 11:45:26 2016 -0400
Committer: Dave Johnson <snoopd...@apache.org>
Committed: Mon Oct 31 11:45:26 2016 -0400

----------------------------------------------------------------------
 .../docs/_sources/data-storage/collections.txt  |   144 +
 .../_sources/installation/deployment-guide.txt  |   125 +-
 .../docs/_sources/orgs-and-apps/application.txt |   104 +
 content/docs/data-storage/collections.html      |   209 +-
 content/docs/index.html                         |     3 +
 content/docs/installation/deployment-guide.html |    16 +-
 content/docs/orgs-and-apps/application.html     |   141 +
 content/docs/searchindex.js                     |     2 +-
 content/releases/index.html                     |     5 +-
 docs/installation/deployment-guide.md           |   125 +-
 .../asyncevents/AsyncEventService.java          |     3 +-
 .../asyncevents/AsyncEventServiceImpl.java      |   189 +-
 .../index/IndexProcessorFig.java                |     9 +
 .../corepersistence/index/ReIndexAction.java    |     5 +-
 .../index/ReIndexServiceImpl.java               |     4 +-
 .../read/traverse/AbstractReadGraphFilter.java  |     2 +-
 .../AbstractReadReverseGraphFilter.java         |     2 +-
 .../exceptions/AbstractExceptionMapper.java     |     2 +-
 .../PasswordPolicyViolationExceptionMapper.java |    48 +
 .../UserResource/bad_confirmation_token.jsp     |    33 +
 .../collection/users/PermissionsResourceIT.java |     4 +-
 .../collection/users/UserResourceIT.java        |    38 +-
 .../usergrid/rest/management/AdminUsersIT.java  |    51 +
 .../rest/management/ExternalSSOEnabledIT.java   |     2 +-
 .../rest/management/ManagementResourceIT.java   |     4 +-
 .../rest/management/RegistrationIT.java         |     6 +-
 .../cassandra/ManagementServiceImpl.java        |    77 +-
 .../usergrid/security/PasswordPolicy.java       |    53 +
 .../usergrid/security/PasswordPolicyFig.java    |    79 +
 .../usergrid/security/PasswordPolicyImpl.java   |   156 +
 .../security/sso/ApigeeSSO2Provider.java        |   111 +-
 .../PasswordPolicyViolationException.java       |    46 +
 .../services/guice/ServiceModuleImpl.java       |     8 +
 .../usergrid/security/ApigeeSSO2ProviderIT.java |   297 +
 .../usergrid/security/PasswordPolicyTest.java   |    47 +
 .../security/PasswordPolicyTestFig.java         |   161 +
 .../usergrid/tools/RemoveAdminUserFromOrg.java  |   230 +
 website/content/releases/index.html             |     6 +-
 website/tmp/checksums                           |     3 +
 website/tmp/compiled_content                    | 14130 +++++++++++++++++
 website/tmp/dependencies                        |    10 +
 website/tmp/rule_memory                         |   Bin 0 -> 13045 bytes
 42 files changed, 16355 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --cc 
stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index f941c11,dba4edf..9f931d3
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@@ -65,18 -78,11 +65,20 @@@ import rx.Subscriber
  import rx.Subscription;
  import rx.schedulers.Schedulers;
  
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.*;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
+ import static org.apache.commons.lang.StringUtils.isNotEmpty;
+ 
  
  /**
 - * TODO, this whole class is becoming a nightmare.  We need to remove all 
consume from this class and refactor it into the following manner.
 + * TODO, this whole class is becoming a nightmare.
 + * We need to remove all consume from this class and refactor it into the 
following manner.
   *
   * 1. Produce.  Keep the code in the handle as is
   * 2. Consume:  Move the code into a refactored system
@@@ -99,10 -105,13 +101,13 @@@ public class AsyncEventServiceImpl impl
      // SQS maximum receive messages is 10
      public int MAX_TAKE = 10;
      public static final String QUEUE_NAME = "index"; //keep this short as AWS 
limits queue name size to 80 chars
+     public static final String QUEUE_NAME_UTILITY = "utility"; //keep this 
short as AWS limits queue name size to 80 chars
+ 
  
 -    private final QueueManager queue;
 -    private final QueueManager utilityQueue;
 +    private final LegacyQueueManager queue;
++    private final LegacyQueueManager utilityQueue;
      private final IndexProcessorFig indexProcessorFig;
 -    private final QueueFig queueFig;
 +    private final LegacyQueueFig queueFig;
      private final IndexProducer indexProducer;
      private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -155,8 -165,10 +161,14 @@@
  
          this.rxTaskScheduler = rxTaskScheduler;
  
-         LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, 
LegacyQueueScope.RegionImplementation.ALL);
 -        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, 
QueueScope.RegionImplementation.ALL);
 -        QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, 
QueueScope.RegionImplementation.ALL);
++        LegacyQueueScope queueScope =
++            new LegacyQueueScopeImpl(QUEUE_NAME, 
LegacyQueueScope.RegionImplementation.ALL);
++
++        LegacyQueueScope utilityQueueScope =
++            new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, 
LegacyQueueScope.RegionImplementation.ALL);
++
          this.queue = queueManagerFactory.getQueueManager(queueScope);
+         this.utilityQueue = 
queueManagerFactory.getQueueManager(utilityQueueScope);
  
          this.indexProcessorFig = indexProcessorFig;
          this.queueFig = queueFig;
@@@ -225,11 -245,20 +245,20 @@@
          }
      }
  
+     private void offerBatchToUtilityQueue(final List operations){
+         try {
+             //signal to SQS
+             this.utilityQueue.sendMessages(operations);
+         } catch (IOException e) {
+             throw new RuntimeException("Unable to queue message", e);
+         }
+     }
+ 
  
      /**
 -     * Take message from SQS
 +     * Take message
       */
 -    private List<QueueMessage> take() {
 +    private List<LegacyQueueMessage> take() {
  
          final Timer.Context timer = this.readTimer.time();
  
@@@ -242,38 -271,59 +271,67 @@@
          }
      }
  
+     /**
+      * Take message from SQS utility queue
+      */
 -    private List<QueueMessage> takeFromUtilityQueue() {
++    private List<LegacyQueueMessage> takeFromUtilityQueue() {
+ 
+         final Timer.Context timer = this.readTimer.time();
+ 
+         try {
+             return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+         }
+         finally {
+             //stop our timer
+             timer.stop();
+         }
+     }
+ 
  
 -
      /**
 -     * Ack message in SQS
 +     * Ack message
       */
 -    public void ack(final List<QueueMessage> messages) {
 +    public void ack(final List<LegacyQueueMessage> messages) {
  
          final Timer.Context timer = this.ackTimer.time();
  
 -        try{
 -            queue.commitMessages( messages );
 +        try {
  
 -            //decrement our in-flight counter
 -            inFlight.decrementAndGet();
 +            for ( LegacyQueueMessage legacyQueueMessage : messages ) {
 +                try {
 +                    queue.commitMessage( legacyQueueMessage );
 +                    inFlight.decrementAndGet();
  
 -        }catch(Exception e){
 -            throw new RuntimeException("Unable to ack messages", e);
 -        }finally {
 -            timer.stop();
 -        }
 +                } catch ( Throwable t ) {
 +                    logger.error("Continuing after error acking message: " + 
legacyQueueMessage.getMessageId() );
 +                }
 +            }
  
 +        } catch (Exception e) {
 +            throw new RuntimeException( "Unable to ack messages", e );
  
 +        } finally {
 +            timer.stop();
 +        }
      }
  
 +
      /**
 +     * calls the event handlers and returns a result with information on 
whether
 +     * it needs to be ack'd and whether it needs to be indexed
+      * Ack message in SQS
+      */
 -    public void ackUtilityQueue(final List<QueueMessage> messages) {
++    public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
+         try{
+             utilityQueue.commitMessages( messages );
+         }catch(Exception e){
+             throw new RuntimeException("Unable to ack messages", e);
+         }
+     }
+ 
+     /**
 -     * calls the event handlers and returns a result with information on 
whether it needs to be ack'd and whether it needs to be indexed
++     * calls the event handlers and returns a result with information on 
whether
++     * it needs to be ack'd and whether it needs to be indexed
       * @param messages
       * @return
       */
@@@ -553,11 -593,10 +610,11 @@@
  
          //send to the topic so all regions index the batch
  
-         offerTopic( elasticsearchIndexEvent );
+         offerTopic( elasticsearchIndexEvent, forUtilityQueue );
      }
  
 -    private void handleIndexOperation(final ElasticsearchIndexEvent 
elasticsearchIndexEvent) throws IndexDocNotFoundException {
 +    private void handleIndexOperation(final ElasticsearchIndexEvent 
elasticsearchIndexEvent)
 +        throws IndexDocNotFoundException {
  
          Preconditions.checkNotNull( elasticsearchIndexEvent, 
"elasticsearchIndexEvent cannot be null" );
  
@@@ -755,18 -794,22 +817,23 @@@
      }
  
  
-     private void startWorker() {
+     private void startWorker(final String type) {
+         Preconditions.checkNotNull(type, "Worker type required");
          synchronized (mutex) {
  
+             boolean isUtilityQueue = isNotEmpty(type) && 
type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+ 
 -            Observable<List<QueueMessage>> consumer =
 -                    Observable.create( new 
Observable.OnSubscribe<List<QueueMessage>>() {
 +            Observable<List<LegacyQueueMessage>> consumer =
 +                    Observable.create( new 
Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                          @Override
 -                        public void call( final Subscriber<? super 
List<QueueMessage>> subscriber ) {
 +                        public void call( final Subscriber<? super 
List<LegacyQueueMessage>> subscriber ) {
  
                              //name our thread so it's easy to see
-                             Thread.currentThread().setName( "QueueConsumer_" 
+ counter.incrementAndGet() );
 -                            long threadNum = isUtilityQueue ? 
counterUtility.incrementAndGet() : counter.incrementAndGet();
++                            long threadNum = isUtilityQueue ?
++                                counterUtility.incrementAndGet() : 
counter.incrementAndGet();
+                             Thread.currentThread().setName( "QueueConsumer_" 
+ type+ "_" + threadNum );
  
 -                                List<QueueMessage> drainList = null;
 +                            List<LegacyQueueMessage> drainList = null;
  
                              do {
                                  try {
@@@ -802,42 -850,45 +874,48 @@@
                      } )        //this won't block our read loop, just reads 
and proceeds
                          .flatMap( sqsMessages -> {
  
 -                            //do this on a different schedule, and introduce 
concurrency with flatmap for faster processing
 +                            //do this on a different schedule, and introduce 
concurrency
 +                            // with flatmap for faster processing
                              return Observable.just( sqsMessages )
  
-                                  .map( messages -> {
-                                      if ( messages == null || messages.size() 
== 0 ) {
-                                          // no messages came from the queue, 
move on
-                                          return null;
-                                      }
- 
-                                      try {
-                                          // process the messages
-                                          List<IndexEventResult> 
indexEventResults = callEventHandlers( messages );
- 
-                                          // submit the processed messages to 
index producer
-                                          List<LegacyQueueMessage> 
messagesToAck = submitToIndex( indexEventResults );
- 
-                                          if ( messagesToAck.size() < 
messages.size() ) {
-                                              logger.warn( "Missing {} 
message(s) from index processing",
-                                                 messages.size() - 
messagesToAck.size() );
-                                          }
- 
-                                          // ack each message if making it to 
this point
-                                          if( messagesToAck.size() > 0 ){
-                                              ack( messagesToAck );
-                                          }
- 
-                                          return messagesToAck;
-                                      }
-                                      catch ( Exception e ) {
-                                          logger.error( "Failed to ack 
messages", e );
-                                          return null;
-                                          //do not rethrow so we can process 
all of them
-                                      }
-                                  } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
- 
+                                              .map( messages -> {
+                                                  if ( messages == null || 
messages.size() == 0 ) {
+                                                      // no messages came from 
the queue, move on
+                                                      return null;
+                                                  }
+ 
+                                                  try {
+                                                      // process the messages
 -                                                     List<IndexEventResult> 
indexEventResults = callEventHandlers( messages );
++                                                     List<IndexEventResult> 
indexEventResults =
++                                                         callEventHandlers( 
messages );
+ 
+                                                      // submit the processed 
messages to index producer
 -                                                     List<QueueMessage> 
messagesToAck = submitToIndex( indexEventResults, isUtilityQueue );
++                                                     List<LegacyQueueMessage> 
messagesToAck =
++                                                         submitToIndex( 
indexEventResults, isUtilityQueue );
+ 
+                                                      if ( 
messagesToAck.size() < messages.size() ) {
+                                                          logger.warn( 
"Missing {} message(s) from index processing",
+                                                             messages.size() - 
messagesToAck.size() );
+                                                      }
+ 
+                                                      // ack each message if 
making it to this point
+                                                      if( messagesToAck.size() 
> 0 ){
+ 
+                                                          if ( isUtilityQueue 
){
+                                                              ackUtilityQueue( 
messagesToAck );
+                                                          }else{
+                                                              ack( 
messagesToAck );
+                                                          }
+                                                      }
+ 
+                                                      return messagesToAck;
+                                                  }
+                                                  catch ( Exception e ) {
+                                                      logger.error( "Failed to 
ack messages", e );
+                                                      return null;
+                                                      //do not rethrow so we 
can process all of them
+                                                  }
+                                              } ).subscribeOn( 
rxTaskScheduler.getAsyncIOScheduler() );
                              //end flatMap
                          }, indexProcessorFig.getEventConcurrencyFactor() );
  
@@@ -853,7 -904,7 +931,7 @@@
       * Submit results to index and return the queue messages to be ack'd
       *
       */
-     private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults) {
 -    private List<QueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults, boolean forUtilityQueue) {
++    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> 
indexEventResults, boolean forUtilityQueue) {
  
          // if nothing came back then return empty list
          if(indexEventResults==null){
@@@ -890,17 -941,16 +968,18 @@@
          EntityIndexOperation entityIndexOperation =
              new EntityIndexOperation( applicationScope, id, updatedSince);
  
 -        queueIndexOperationMessage(eventBuilder.buildEntityIndex( 
entityIndexOperation ).toBlocking().lastOrDefault(null), false);
 +        queueIndexOperationMessage(
-             eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(null));
++            eventBuilder.buildEntityIndex( entityIndexOperation 
).toBlocking().lastOrDefault(null), false);
      }
  
-     public void indexBatch(final List<EdgeScope> edges, final long 
updatedSince) {
+     public void indexBatch(final List<EdgeScope> edges, final long 
updatedSince, boolean forUtilityQueue) {
  
          final List<EntityIndexEvent> batch = new ArrayList<>();
          edges.forEach(e -> {
  
              //change to id scope to avoid serialization issues
--            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new 
EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), 
updatedSince));
++            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(),
++                new EntityIdScope(e.getApplicationScope(), 
e.getEdge().getTargetNode()), updatedSince));
  
          });
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7cc5c1c0/website/content/releases/index.html
----------------------------------------------------------------------
diff --cc website/content/releases/index.html
index 24fbe66,28fb6d7..7e54686
--- a/website/content/releases/index.html
+++ b/website/content/releases/index.html
@@@ -31,10 -31,7 +31,8 @@@
                                        Project releases are approved by vote 
of the Apache Usergrid Project Management Committee (PMC). Support for a 
release is provided by project volunteers on the project <a 
href="http://usergrid.apache.org/community/#mailing-lists";>mailing lists</a>. 
Bugs found in a release may be discussed on the list and reported through the 
<a href="https://issues.apache.org/jira/browse/USERGRID";>issue tracker</a>. The 
user mailing list and issue tracker are the only support options hosted by the 
Apache Usergrid project.
                                </p>
                                <p>
-                                       Note: When downloading from a mirror, 
please be sure to verify that checksums and signatures are correct. To do so, 
use the checksum and signature files from the main Apache site at <a 
href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/";>https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/</a>.
 Find here the KEYS file, which contains all OpenPGP keys we use to sign 
releases here: <a 
href="https://www.apache.org/dist/usergrid/KEYS";>https://www.apache.org/dist/usergrid/KEYS</a>
-                               </p>
-                               <p>
-                                       The PGP signatures can be verified 
using PGP or GPG. First download the <a 
href="https://www.apache.org/dist/usergrid/KEYS";>KEYS</a> as well as the <a 
href="https://www.apache.org/dist/release/usergrid/usergrid-2/v2.1.0/";>asc 
signature</a> file for the particular distribution. Then verify the signatures 
using:
++
+                                       Note: When downloading from a mirror, 
please be sure to verify that checksums and signatures are correct. To do so, 
use the checksum and signature files from the main Apache site at <a 
href="https://www.apache.org/dist/usergrid/usergrid-2/v2.1.0/";>https://dist.apache.org/repos/dist/release/usergrid/usergrid-2/v2.1.0/</a>.
 Find here the KEYS file, which contains all OpenPGP keys we use to sign 
releases here: <a 
href="https://www.apache.org/dist/usergrid/KEYS";>https://www.apache.org/dist/usergrid/KEYS</a>
                                </p>
                                <p>
                                        % pgpk -a KEYS

Reply via email to