Repository: usergrid Updated Branches: refs/heads/expose-reindex ec8e5c7ca -> 230e99501 (forced update)
Initial commit for exposing collection re-index to non sysadmin users. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/230e9950 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/230e9950 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/230e9950 Branch: refs/heads/expose-reindex Commit: 230e9950152583dc4e6697c4348df650333e3b18 Parents: 4a65910 Author: Michael Russo <russomich...@google.com> Authored: Wed Jun 28 11:07:15 2017 -0700 Committer: Michael Russo <russomich...@google.com> Committed: Wed Oct 4 21:57:12 2017 -0700 ---------------------------------------------------------------------- .../corepersistence/index/ReIndexService.java | 33 +++++- .../index/ReIndexServiceImpl.java | 106 +++++++++++++------ .../rest/applications/CollectionResource.java | 40 ++++--- .../usergrid/rest/system/IndexResource.java | 78 ++++++++++++-- 4 files changed, 199 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/230e9950/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index b9238e5..d37f117 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.index; +import org.apache.usergrid.utils.StringUtils; + /** * An interface for re-indexing all entities in an application */ @@ -47,6 +49,13 @@ public interface ReIndexService { */ ReIndexStatus getStatus( final String jobId ); + /** + * Get the status of a collection job + * @param collectionName The collectionName for the rebuild index + * @return + */ + ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ); + /** * The response when requesting a re-index operation @@ -56,14 +65,27 @@ public interface ReIndexService { final Status status; final long numberProcessed; final long lastUpdated; + final String collectionName; public ReIndexStatus( final String jobId, final Status status, final long numberProcessed, - final long lastUpdated ) { - this.jobId = jobId; + final long lastUpdated, final String collectionName ) { + + if(StringUtils.isNotEmpty(jobId)){ + this.jobId = jobId; + }else { + this.jobId = ""; + } + this.status = status; this.numberProcessed = numberProcessed; this.lastUpdated = lastUpdated; + + if(StringUtils.isNotEmpty(collectionName)){ + this.collectionName = collectionName; + }else { + this.collectionName = ""; + } } @@ -74,6 +96,13 @@ public interface ReIndexService { return jobId; } + /** + * Get the jobId used to resume this operation + */ + public String getCollectionName() { + return collectionName; + } + /** * Get the last updated time, as a long http://git-wip-us.apache.org/repos/asf/usergrid/blob/230e9950/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 05602fc..d4fb249 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -75,6 +75,7 @@ public class ReIndexServiceImpl implements ReIndexService { private static final String MAP_COUNT_KEY = "count"; private static final String MAP_STATUS_KEY = "status"; private static final String MAP_UPDATED_KEY = "lastUpdated"; + private static final String MAP_SEPARATOR = "|||"; private final AllApplicationsObservable allApplicationsObservable; @@ -140,7 +141,9 @@ public class ReIndexServiceImpl implements ReIndexService { // create an observable that loads a batch to be indexed - if(reIndexRequestBuilder.getCollectionName().isPresent()) { + final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent(); + + if(isForCollection) { String collectionName = InflectionUtils.pluralize( CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() )); @@ -175,12 +178,36 @@ public class ReIndexServiceImpl implements ReIndexService { if( edgeScopes.size() > 0 ) { writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); } - writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); }) - .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) + if( isForCollection ){ + writeStateMetaForCollection( + appId.get().getApplication().getUuid().toString(), + reIndexRequestBuilder.getCollectionName().get(), + Status.INPROGRESS, count.get(), + System.currentTimeMillis() ); + }else{ + writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); + } + }) + .doOnCompleted(() ->{ + if( isForCollection ){ + writeStateMetaForCollection( + appId.get().getApplication().getUuid().toString(), + reIndexRequestBuilder.getCollectionName().get(), + Status.COMPLETE, count.get(), + System.currentTimeMillis() ); + }else { + writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis()); + } + }) .subscribeOn( Schedulers.io() ).subscribe(); + if(isForCollection){ + return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() ); + + } + - return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); + return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" ); } @@ -196,38 +223,15 @@ public class ReIndexServiceImpl implements ReIndexService { return getIndexResponse( jobId ); } - - /** - * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete - */ - private class FlushingCollector { - - private final String jobId; - private long count; - - - private FlushingCollector( final String jobId ) { - this.jobId = jobId; - } - - - public void flushBuffer( final List<EdgeScope> buffer ) { - count += buffer.size(); - - //write our cursor state - if ( buffer.size() > 0 ) { - writeCursorState( jobId, buffer.get( buffer.size() - 1 ) ); - } - - writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() ); - } - - public void complete(){ - writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() ); - } + @Override + public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) { + Preconditions.checkNotNull( collectionName, "appIdString must not be null" ); + Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); + return getIndexResponseForCollection( appIdString, collectionName ); } + /** * Get the resume edge scope * @@ -346,7 +350,7 @@ public class ReIndexServiceImpl implements ReIndexService { final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY ); if(stringStatus == null){ - return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 ); + return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" ); } final Status status = Status.valueOf( stringStatus ); @@ -354,7 +358,39 @@ public class ReIndexServiceImpl implements ReIndexService { final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY ); - return new ReIndexStatus( jobId, status, processedCount, lastUpdated ); + return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" ); + } + + + private void writeStateMetaForCollection(final String appIdString, final String collectionName, + final Status status, final long processedCount, final long lastUpdated ) { + + if(logger.isDebugEnabled()) { + logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}", + collectionName, status, processedCount, lastUpdated); + } + + mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() ); + mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount ); + mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated ); + } + + + private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) { + + final String stringStatus = + mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY ); + + if(stringStatus == null){ + return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName ); + } + + final Status status = Status.valueOf( stringStatus ); + + final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY ); + final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY ); + + return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/230e9950/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java index 9c09806..9da0e3d 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java @@ -251,26 +251,41 @@ public class CollectionResource extends ServiceResource { } - // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this. - // So system access only. - // TODO: use scheduler here to get around people sending a reindex call 30 times. + @POST @Path("{itemName}/_reindex") @Produces({ MediaType.APPLICATION_JSON,"application/javascript"}) - @RequireSystemAccess + @RequireApplicationAccess @JSONP public ApiResponse executePostForReindexing( - @Context UriInfo ui, String body, + @Context UriInfo ui, final Map<String, Object> payload, @PathParam("itemName") PathSegment itemName, @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { addItemToServiceContext( ui, itemName ); IndexResource indexResource = new IndexResource(injector); - return indexResource.rebuildIndexesPost( + return indexResource.rebuildIndexCollectionPost(payload, services.getApplicationId().toString(),itemName.getPath(),false,callback ); } + @GET + @Path("{itemName}/_reindex") + @Produces({ MediaType.APPLICATION_JSON,"application/javascript"}) + @RequireApplicationAccess + @JSONP + public ApiResponse executeGetForReindexStatus( + @Context UriInfo ui, final Map<String, Object> payload, + @PathParam("itemName") PathSegment itemName, + @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { + + addItemToServiceContext( ui, itemName ); + + IndexResource indexResource = new IndexResource(injector); + return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(), + callback ); + } + private CollectionDeleteService getCollectionDeleteService() { return injector.getInstance( CollectionDeleteService.class ); @@ -310,18 +325,17 @@ public class CollectionResource extends ServiceResource { private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) { - final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request ); + final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request); final ApiResponse response = createApiResponse(); - response.setAction( "clear collection" ); - response.setProperty( "jobId", status.getJobId() ); - response.setProperty( "status", status.getStatus() ); - response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); - response.setProperty( "numberQueued", status.getNumberProcessed() ); + response.setAction("clear collection"); + response.setProperty("jobId", status.getJobId()); + response.setProperty("status", status.getStatus()); + response.setProperty("lastUpdatedEpoch", status.getLastUpdated()); + response.setProperty("numberQueued", status.getNumberProcessed()); response.setSuccess(); return response; } - } http://git-wip-us.apache.org/repos/asf/usergrid/blob/230e9950/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java index be60177..ec86750 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java @@ -28,13 +28,16 @@ import com.google.inject.Injector; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl; import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.index.utils.ConversionUtils; import org.apache.usergrid.persistence.index.utils.UUIDUtils; import org.apache.usergrid.rest.AbstractContextResource; import org.apache.usergrid.rest.ApiResponse; import org.apache.usergrid.rest.RootResource; +import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess; import org.apache.usergrid.rest.security.annotations.RequireSystemAccess; +import org.apache.usergrid.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; @@ -182,26 +185,78 @@ public class IndexResource extends AbstractContextResource { return executeResumeAndCreateResponse( payload, request, callback ); } + @RequireOrganizationAccess + @GET + @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" ) + @JSONP + @Produces({ MediaType.APPLICATION_JSON, "application/javascript" }) + public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr, + @PathParam( "collectionName" ) final String collectionName, + @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) - @RequireSystemAccess + + throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName); + } + + + ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName); + + final ApiResponse response = createApiResponse(); + + response.setAction( "get rebuild index status" ); + response.setProperty( "collection", status.getCollectionName() ); + response.setProperty( "status", status.getStatus() ); + response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); + response.setProperty( "numberQueued", status.getNumberProcessed() ); + response.setSuccess(); + + return response; + } + + @RequireOrganizationAccess @POST @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" ) @JSONP @Produces({MediaType.APPLICATION_JSON, "application/javascript"}) - public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr, - @PathParam( "collectionName" ) final String collectionName, - @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, - @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) + public ApiResponse rebuildIndexCollectionPost(final Map<String, Object> payload, + @PathParam( "applicationId" ) final String applicationIdStr, + @PathParam( "collectionName" ) final String collectionName, + @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, + @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception { + ReIndexService.ReIndexStatus existingStatus = + getReIndexService().getStatusForCollection(applicationIdStr, collectionName); - logger.info( "Rebuilding collection {} in application {}", collectionName, applicationIdStr ); + if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){ + throw new ConflictException("Re-index for collection currently in progress"); + } + + logger.info( "Re-indexing collection {} in application {}", collectionName, applicationIdStr ); final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr ); + final ReIndexRequestBuilder request = createRequest().withApplicationId( appId ).withCollection( collectionName ); + Map<String,Object> newPayload = payload; + if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){ + newPayload = new HashMap<>(1); + newPayload.put(UPDATED_FIELD,0); + } + + Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number, + "Property \"updated\" in the payload must be a number in unix timestamp millis format" ); + + //add our updated timestamp to the request + if ( newPayload.containsKey( UPDATED_FIELD ) ) { + final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD)); + request.withStartTimestamp( timestamp ); + } + return executeAndCreateResponse( request, callback ); } @@ -214,7 +269,6 @@ public class IndexResource extends AbstractContextResource { public ApiResponse rebuildIndexesPut( final Map<String, Object> payload, @PathParam( "applicationId" ) final String applicationIdStr, @PathParam( "collectionName" ) final String collectionName, - @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception { @@ -350,7 +404,15 @@ public class IndexResource extends AbstractContextResource { final ApiResponse response = createApiResponse(); response.setAction( "rebuild indexes" ); - response.setProperty( "jobId", status.getJobId() ); + + if(StringUtils.isNotEmpty(status.getJobId())){ + response.setProperty( "jobId", status.getJobId() ); + } + + if(StringUtils.isNotEmpty(status.getCollectionName())){ + response.setProperty( "collection", status.getCollectionName() ); + } + response.setProperty( "status", status.getStatus() ); response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); response.setProperty( "numberQueued", status.getNumberProcessed() );