Final changes to enhance parallel loading of devices for push notifications.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06caa250 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06caa250 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06caa250 Branch: refs/heads/master Commit: 06caa2509407322498c025b1b3d39135d82777cc Parents: cc3cbfe Author: Michael Russo <mru...@apigee.com> Authored: Sun Apr 17 18:02:32 2016 +0100 Committer: Michael Russo <mru...@apigee.com> Committed: Sun Apr 17 18:02:32 2016 +0100 ---------------------------------------------------------------------- .../pipeline/builder/IdBuilder.java | 2 +- .../persistence/MultiQueryIterator.java | 2 +- .../persistence/NotificationGraphIterator.java | 59 ++---- .../persistence/PagingResultsIterator.java | 25 ++- .../apache/usergrid/persistence/PathQuery.java | 14 +- .../apache/usergrid/persistence/Results.java | 4 + .../impl/ApplicationQueueManagerImpl.java | 211 +++++-------------- 7 files changed, 102 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java index 65cf7c1..781d7d5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java @@ -151,7 +151,7 @@ public class IdBuilder { public Observable<ResultsPage<Id>> build(){ //we must add our resume filter so we drop our previous page first element if it's present - return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute(); + return pipeline.withFilter( new IdResumeFilter() ).withFilter(new ResultsPageCollector<>()).execute(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java index c5de5c1..9e28204 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java @@ -62,7 +62,7 @@ public class MultiQueryIterator implements ResultsIterator { EntityRef ref = source.next(); Results r = getResultsFor( ref ); if ( r.size() > 0 ) { - currentIterator = new PagingResultsIterator( r, query.getResultsLevel() ); + currentIterator = new PagingResultsIterator( r, query.getResultsLevel(), null); return currentIterator.hasNext(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java index a1f3246..a1b162d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java @@ -23,10 +23,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; public class NotificationGraphIterator implements ResultsIterator, Iterable { @@ -67,18 +63,20 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { Object next = source.next(); Results r; -// if(next instanceof UUID){ -// -// UUID id = (UUID) next; -// r = getResultsForId(id, "user"); -// -// }else { - EntityRef ref = (EntityRef) next; - r = getResultsFor(ref); - // } + EntityRef ref = (EntityRef) next; + r = getResultsFor(ref); if (r.size() > 0) { - currentIterator = new PagingResultsIterator(r, query.getResultsLevel()); + + + if(ref.getType().equals(Group.ENTITY_TYPE)) { + + currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), Query.Level.REFS); + }else{ + currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), null); + + } + return currentIterator.hasNext(); } } @@ -122,26 +120,13 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { // if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities if( ref.getType().equals(Group.ENTITY_TYPE)){ - // query users using IDs as we don't need to load the full entities just to find their devices - Query usersQuery = new Query(); - usersQuery.setCollection("users"); - usersQuery.setResultsLevel(Query.Level.IDS); - usersQuery.setLimit(1000); + // groups->users is a passthrough to devices, load our max limit + query.setLimit(Query.MAX_LIMIT); - - // set the query level for the iterator temporarily to IDS + // set the query level for the when fetching users to IDS, we don't need the full entity query.setResultsLevel(Query.Level.IDS); - return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery); - - -// List<EntityRef> refs = -// results.getIds().stream() -// .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList()); -// -// // set the query level for the iterator back to REFS after mapping our IDS -// query.setResultsLevel(Query.Level.REFS); -// return Results.fromRefList(refs); + return entityManager.searchCollection(ref, "users", query); } @@ -151,8 +136,6 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { devicesQuery.setCollection("devices"); devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); - //query.setCollection("devices"); - //query.setResultsLevel(Query.Level.CORE_PROPERTIES); return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery); } @@ -177,14 +160,4 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable { } } - - private Results getResultsForId(UUID uuid, String type) { - - EntityRef ref = new SimpleEntityRef(type, uuid); - return getResultsFor(ref); - - - } - - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java index a883e1b..640ee06 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java @@ -19,6 +19,8 @@ package org.apache.usergrid.persistence; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; + import org.apache.usergrid.persistence.Query.Level; @@ -28,20 +30,23 @@ public class PagingResultsIterator implements ResultsIterator, Iterable { private Results results; private Iterator currentPageIterator; private Level level; + private Level overrideLevel; public PagingResultsIterator( Results results ) { - this( results, results.level ); + this( results, results.level, null); } /** * @param level overrides the default level from the Results - in case you want to return, say, UUIDs where the * Query was set for Entities + * @param overrideLevel */ - public PagingResultsIterator( Results results, Level level ) { + public PagingResultsIterator(Results results, Level level, Level overrideLevel) { this.results = results; this.level = level; + this.overrideLevel = overrideLevel; initCurrentPageIterator(); } @@ -86,16 +91,32 @@ public class PagingResultsIterator implements ResultsIterator, Iterable { */ private boolean initCurrentPageIterator() { List currentPage; + Level origLevel = level; + if(overrideLevel != null){ + level=overrideLevel; + if(results.getIds()!=null){ + + List<EntityRef> userRefs = results.getIds().stream() + .map( uuid -> new SimpleEntityRef("user", uuid)).collect(Collectors.toList()); + + results.setRefs(userRefs); + + } + } + if ( results != null ) { switch ( level ) { case IDS: currentPage = results.getIds(); + level = origLevel; break; case REFS: currentPage = results.getRefs(); + level = origLevel; break; default: currentPage = results.getEntities(); + level = origLevel; } if ( currentPage.size() > 0 ) { currentPageIterator = currentPage.iterator(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java index 215f6ac..30636ab 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java @@ -88,7 +88,7 @@ public class PathQuery<E> { try { if ( uuid != null && type != null ) { - return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() ); + return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null); } else { return new MultiQueryIterator( em, source.refIterator( em, false), query ); @@ -103,7 +103,7 @@ public class PathQuery<E> { try { if ( uuid != null && type != null ) { - return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() ); + return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null); }else { return new NotificationGraphIterator(em, source.refIterator(em, true), query); @@ -130,6 +130,12 @@ public class PathQuery<E> { UUID entityId = em.getUniqueIdFromAlias( entityType, name ); + if( entityId == null){ + throw new + IllegalArgumentException("Entity with name "+name+" not found. Unable to send push notification"); + } + + return em.getEntities(Collections.singletonList(entityId), entityType); } @@ -143,12 +149,12 @@ public class PathQuery<E> { if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){ - return new PagingResultsIterator( getHeadResults( em ), Level.REFS ); + return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null); } if ( type != null && uuid != null) { - return new PagingResultsIterator( getHeadResults( em ), Level.REFS ); + return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null); } else { Query q = query; http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java index 2a84622..3502581 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java @@ -436,6 +436,10 @@ public class Results implements Iterable<Entity> { level = Level.REFS; } + public void setRefsOnly( List<EntityRef> resultsRefs ) { + refs = resultsRefs; + } + public Results withRefs( List<EntityRef> resultsRefs ) { setRefs( resultsRefs ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 1cbb2c6..2f39ae4 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -52,6 +52,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private final Meter queueMeter; private final Meter sendMeter; + private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency"; + HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once @@ -91,25 +93,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { return; } - if (logger.isTraceEnabled()) { - logger.trace("notification {} start queuing", notification.getUuid()); - } - final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues - //get devices in querystring, and make sure you have access + // Get devices in querystring, and make sure you have access if (pathQuery != null) { final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap(); if (logger.isTraceEnabled()) { logger.trace("notification {} start query", notification.getUuid()); } - logger.info("notification {} start query", notification.getUuid()); + + logger.info("Notification {} started processing", notification.getUuid()); - // the main iterator can use graph traversal or index querying + // The main iterator can use graph traversal or index querying based on payload property. Default is Index. final Iterator<Device> iterator; if( notification.getUseGraph()){ iterator = pathQuery.graphIterator(em); @@ -117,15 +116,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { iterator = pathQuery.iterator(em); } -// //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule -// if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { -// if(logger.isTraceEnabled()){ -// logger.trace("Scheduling notification job as it has multiple pages of devices."); -// } -// jobScheduler.scheduleQueueJob(notification, true); -// em.update(notification); -// return; -// } + /**** Old code to scheduler large sets of data, but now the processing is fired off async in the background. + Leaving this only a reference of how to do it, if needed in future. + + //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, + //also if this is already a job then don't reschedule + + if (iterator instanceof ResultsIterator + && ((ResultsIterator) iterator).hasPages() && jobExecution == null) { + + if(logger.isTraceEnabled()){ + logger.trace("Scheduling notification job as it has multiple pages of devices."); + } + jobScheduler.scheduleQueueJob(notification, true); + em.update(notification); + return; + } + ****/ + final UUID appId = em.getApplication().getUuid(); final Map<String, Object> payloads = notification.getPayloads(); @@ -182,87 +190,57 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }; - final Map<String, Object> filters = notification.getFilters(); + final Map<String, Object> filters = notification.getFilters(); + Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator)) - Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator)) -// .flatMap(entity -> { -// -// if(entity.getType().equals(Device.ENTITY_TYPE)){ -// return Observable.from(Collections.singletonList(entity)); -// } -// -// // if it's not a device, drill down and get them -// return Observable.from(getDevices(entity)); -// -// }) - .distinct() .flatMap( entityRef -> { return Observable.just(entityRef).flatMap(ref->{ List<Entity> entities = new ArrayList<>(); + if( ref.getType().equals(User.ENTITY_TYPE)){ + Query devicesQuery = new Query(); devicesQuery.setCollection("devices"); devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); try { - entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities(); + entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities(); }catch (Exception e){ - logger.error("Unable to load devices for user: {}", ref); + logger.error("Unable to load devices for user: {}", ref.getUuid()); return Observable.empty(); } + }else if ( ref.getType().equals(Device.ENTITY_TYPE)){ + try{ + entities.add(em.get(ref)); -// if( ref.getType().equals(User.ENTITY_TYPE)){ -// -// Query devicesQuery = new Query(); -// devicesQuery.setCollection("devices"); -// devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES); -// -// try { -// -// entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities(); -// -// }catch (Exception e){ -// -// logger.error("Unable to load devices for user: {}", ref.getUuid()); -// return Observable.empty(); -// } -// -// -// }else if ( ref.getType().equals(Device.ENTITY_TYPE)){ -// -// try{ -// entities.add(em.get(ref)); -// -// }catch(Exception e){ -// -// logger.error("Unable to load device: {}", ref.getUuid()); -// return Observable.empty(); -// -// } -// -// } + }catch(Exception e){ + + logger.error("Unable to load device: {}", ref.getUuid()); + return Observable.empty(); + + } + + } return Observable.from(entities); }) + .distinct( deviceRef -> deviceRef.getUuid()) .filter( device -> { - logger.info("Filtering device: {}", device.getUuid()); - if(logger.isTraceEnabled()) { logger.trace("Filtering device: {}", device.getUuid()); } - if(notification.getUseGraph() && filters.size() > 0 ) { for (Map.Entry<String, Object> entry : filters.entrySet()) { @@ -280,7 +258,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { } } - if(logger.isTraceEnabled()) { logger.trace("Push notification filter did not match for notification {}, so removing from notification", device.getUuid(), notification.getUuid()); @@ -321,20 +298,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { }).subscribeOn(Schedulers.io()); - }, 100) - //.map( entityRef -> entityRef.getUuid() ) - //.buffer(10) -// .flatMap( uuids -> { -// -// if(logger.isTraceEnabled()) { -// logger.trace("Processing batch of {} device(s)", uuids.size()); -// } -// -// -// return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io()); -// -// }, 10) - + }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50"))) .doOnError(throwable -> { logger.error("Error while processing devices for notification : {}", notification.getUuid()); @@ -355,7 +319,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { notification.setProcessingFinished(System.currentTimeMillis()); notification.setDeviceProcessedCount(deviceCount.get()); em.update(notification); - logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid()); + logger.info("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get()); } catch (Exception e) { logger.error("Unable to set processing finished timestamp for notification"); @@ -622,10 +586,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { try { while (!subscriber.isUnsubscribed() && input.hasNext()) { //send our input to the next + //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName()); subscriber.onNext((T) input.next()); } //tell the subscriber we don't have any more data + //logger.debug("finished iterator: {}", input.getClass().getSimpleName()); + subscriber.onCompleted(); } catch (Throwable t) { logger.error("failed on subscriber", t); @@ -678,90 +645,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { return true; } - private List<EntityRef> getDevices(EntityRef ref) { - - List<EntityRef> devices = new ArrayList<>(); - - final int LIMIT = Query.MID_LIMIT; - - try { - - if (User.ENTITY_TYPE.equals(ref.getType())) { - - UUID start = null; - boolean initial = true; - int resultSize = 0; - while( initial || resultSize >= Query.DEFAULT_LIMIT) { - - initial = false; - - final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT, - Query.Level.REFS, true).getRefs(); - - resultSize = mydevices.size(); - - if(mydevices.size() > 0){ - start = mydevices.get(mydevices.size() - 1 ).getUuid(); - } - - devices.addAll( mydevices ); - - } - - } else if (Group.ENTITY_TYPE.equals(ref.getType())) { - - UUID start = null; - boolean initial = true; - int resultSize = 0; - - while( initial || resultSize >= LIMIT){ - - initial = false; - - final List<EntityRef> myusers = em.getCollection(ref, "users", start, - LIMIT, Query.Level.REFS, true).getRefs(); - resultSize = myusers.size(); - - if(myusers.size() > 0){ - start = myusers.get(myusers.size() - 1 ).getUuid(); - } - - - Observable.from(myusers).flatMap( user -> { - - try { - devices.addAll(em.getCollection(user, "devices", null, 100, - Query.Level.REFS, true).getRefs()); - }catch (Exception e){ - logger.error ("Unable to fetch devices for user: {}", user.getUuid()); - } - return Observable.from(Collections.singletonList(user)); - - }, 50).toBlocking().lastOrDefault(null); - - - - - - } - - } - } catch (Exception e) { - - if (ref != null){ - logger.error("Error while retrieving devices for entity type {} and uuid {}. Error: {}", - ref.getType(), ref.getUuid(), e); - }else{ - logger.error("Error while retrieving devices. Entity ref was null."); - } - - throw new RuntimeException("Unable to retrieve devices for EntityRef", e); - - } - - return devices; - } - private String getProviderId(EntityRef device, Notifier notifier) throws Exception { try {