Add ability to walk through a collection and delete all the entities, optionally up to a certain timestamp. Modeled after reindex services.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11823f29 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11823f29 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11823f29 Branch: refs/heads/expose-reindex Commit: 11823f294dfae762754ef1c4da8a5ee573107968 Parents: 3f7afcd Author: Mike Dunker <mdun...@google.com> Authored: Mon Aug 28 14:46:17 2017 -0700 Committer: Mike Dunker <mdun...@google.com> Committed: Mon Aug 28 14:46:17 2017 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 1 + .../asyncevents/AsyncEventService.java | 3 +- .../asyncevents/AsyncEventServiceImpl.java | 67 +++-- .../asyncevents/EventBuilder.java | 13 +- .../asyncevents/EventBuilderImpl.java | 45 ++- .../asyncevents/model/EntityDeleteEvent.java | 27 +- .../index/CollectionDeleteAction.java | 43 +++ .../index/CollectionDeleteRequestBuilder.java | 92 ++++++ .../CollectionDeleteRequestBuilderImpl.java | 146 +++++++++ .../index/CollectionDeleteService.java | 108 +++++++ .../index/CollectionDeleteServiceImpl.java | 299 +++++++++++++++++++ .../index/IndexProcessorFig.java | 9 + .../index/ReIndexServiceImpl.java | 2 +- .../persistence/CollectionDeleteTest.java | 266 +++++++++++++++++ .../resources/usergrid-custom-test.properties | 2 + .../rest/applications/CollectionResource.java | 130 +++++++- 16 files changed, 1217 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index ec6b775..a0748e6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -140,6 +140,7 @@ public class CoreModule extends AbstractModule { bind( ReIndexService.class ).to( ReIndexServiceImpl.class ); + bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class ); bind( ExportService.class ).to( ExportServiceImpl.class ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 9e346cf..04eaf4c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.asyncevents; +import org.apache.usergrid.corepersistence.index.CollectionDeleteAction; import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; @@ -33,7 +34,7 @@ import java.util.UUID; /** * Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous */ -public interface AsyncEventService extends ReIndexAction { +public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction { /** http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 428772f..3d06cae 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -75,9 +75,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.commons.lang.StringUtils.indexOf; -import static org.apache.commons.lang.StringUtils.isNotEmpty; - /** * TODO, this whole class is becoming a nightmare. @@ -106,7 +103,6 @@ public class AsyncEventServiceImpl implements AsyncEventService { public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars public static final String QUEUE_NAME_DELETE = "delete"; - public static final String DEAD_LETTER_SUFFIX = "_dead"; private final LegacyQueueManager indexQueue; @@ -522,8 +518,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { applicationScope); - logger.trace("Offering InitializeApplicationIndexEvent for {}:{}", - applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering InitializeApplicationIndexEvent for {}:{}", + applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType()); + } offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR); @@ -535,8 +533,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, long updatedAfter) { - logger.trace("Offering EntityIndexEvent for {}:{}", - entity.getId().getUuid(), entity.getId().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EntityIndexEvent for {}:{}", + entity.getId().getUuid(), entity.getId().getType()); + } offer(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(applicationScope, entity.getId()), updatedAfter)); @@ -577,8 +577,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Entity entity, final Edge newEdge) { - logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", - newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", + newEdge.getType(), entity.getId().getUuid(), entity.getId().getType()); + } offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge )); @@ -612,8 +614,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { - logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}", - edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}", + edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType()); + } // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE ); @@ -675,7 +679,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { //send to the topic so all regions index the batch - logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId ); + if (logger.isTraceEnabled()) { + logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId); + } offerTopic( elasticsearchIndexEvent, queueType ); } @@ -749,8 +755,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { // queue the de-index of old versions to the topic so cleanup happens in all regions - logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}", - applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}", + applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType()); + } offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE ); @@ -810,7 +818,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); + if (logger.isTraceEnabled()) { + logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType()); + } // sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ), @@ -830,12 +840,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); + final boolean isCollectionDelete = entityDeleteEvent.isCollectionDelete(); + final long updatedBefore = entityDeleteEvent.getUpdatedBefore(); if (logger.isDebugEnabled()) { - logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); + logger.debug("Deleting entity id from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore {}", + applicationScope, entityId, isCollectionDelete, updatedBefore); } - return eventBuilder.buildEntityDelete( applicationScope, entityId ); + return eventBuilder.buildEntityDelete( applicationScope, entityId, isCollectionDelete, updatedBefore ); } @@ -1192,11 +1205,27 @@ public class AsyncEventServiceImpl implements AsyncEventService { }); - logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size()); + if (logger.isTraceEnabled()) { + logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size()); + } offerBatch( batch, queueType ); } + public void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType) { + + final List<EntityDeleteEvent> batch = new ArrayList<>(); + edges.forEach(e -> { + + //change to id scope to avoid serialization issues + batch.add(new EntityDeleteEvent(queueFig.getPrimaryRegion(), + new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), true, updatedBefore)); + + }); + + offerBatch(batch, queueType); + } + public class IndexEventResult{ private final Optional<IndexOperationMessage> indexOperationMessage; http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index ebb9190..4bb6312 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -63,7 +63,18 @@ public interface EventBuilder { * @param entityId * @return */ - IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId ); + IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId); + + /** + * Return a bin with 2 observable streams for entity delete. + * @param applicationScope + * @param entityId + * @param isCollectionDelete + * @param updatedBefore + * @return + */ + IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId, + boolean isCollectionDelete, long updatedBefore); http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 5051598..7c72b72 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -24,7 +24,9 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.antlr.misc.Graph; import org.apache.usergrid.corepersistence.index.*; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.utils.UUIDUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,15 +130,48 @@ public class EventBuilderImpl implements EventBuilder { //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? @Override - public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { + public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId) { + return buildEntityDelete(applicationScope, entityId, false, Long.MAX_VALUE); + } + + @Override + public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId, + final boolean isCollectionDelete, final long updatedBefore) { if (logger.isDebugEnabled()) { - logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}", - applicationScope, entityId); + logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore={}", + applicationScope, entityId, isCollectionDelete, updatedBefore); + } + + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope); + final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope); + + boolean deleteEntity = ecm.load(entityId). + map(entity -> { + final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED ); + + boolean willDelete = false; + if ( modified == null ) { + // We don't have a modified field, so we can't check, so delete it + willDelete = true; + } else if (modified.getValue() <= updatedBefore) { + willDelete = true; + } + + if (isCollectionDelete && willDelete) { + // need to mark for deletion + ecm.mark(entityId, null) + .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp())) + .toBlocking().last(); + } + + return willDelete; + }).toBlocking().firstOrDefault(true); + + if (!deleteEntity) { + return new IndexOperationMessage(); } - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); MvccLogEntry mostRecentToDelete = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index 01d2ba8..1589632 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E /** * Event that will signal to finish the actual delete (post-mark delete) for an Entity + * It will mark if this is for a collection delete */ public final class EntityDeleteEvent extends AsyncEvent { @@ -31,17 +32,41 @@ public final class EntityDeleteEvent extends AsyncEvent { @JsonProperty protected EntityIdScope entityIdScope; + @JsonProperty + private long updatedBefore; + + @JsonProperty + private boolean isCollectionDelete; + public EntityDeleteEvent() { super(); } public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) { super(sourceRegion); - this.entityIdScope = entityIdScope; + this.entityIdScope = entityIdScope; + this.updatedBefore = Long.MAX_VALUE; + this.isCollectionDelete = false; + } + + public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope, + boolean isCollectionDelete, long updatedBefore) { + super(sourceRegion); + this.entityIdScope = entityIdScope; + this.updatedBefore = updatedBefore; + this.isCollectionDelete = isCollectionDelete; } public EntityIdScope getEntityIdScope() { return entityIdScope; } + + public long getUpdatedBefore() { + return updatedBefore; + } + + public boolean isCollectionDelete() { + return isCollectionDelete; + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java new file mode 100644 index 0000000..7bad06b --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + +import java.util.List; + + +/** + * Callback to perform a collection delete operation based on an scope during bulk collection delete operations + */ +public interface CollectionDeleteAction { + + /** + * Delete a batch list of entities. + * @param edges + * @param updatedBefore + * @param queueType + */ + void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java new file mode 100644 index 0000000..4abdfea --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import com.google.common.base.Optional; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +/** + * A builder interface to build our collection delete request + */ +public interface CollectionDeleteRequestBuilder { + + /** + * Set the application id + */ + CollectionDeleteRequestBuilder withApplicationId(final UUID applicationId); + + /** + * Set the collection name. + * @param collectionName + * @return + */ + CollectionDeleteRequestBuilder withCollection(final String collectionName); + + /** + * Set our cursor to resume processing + * @param cursor + * @return + */ + CollectionDeleteRequestBuilder withCursor(final String cursor); + + + CollectionDeleteRequestBuilder withDelay(int delayTimer, TimeUnit timeUnit); + + /** + * Set the timestamp to delete entities updated <= this timestamp + * @param timestamp + * @return + */ + CollectionDeleteRequestBuilder withEndTimestamp(final Long timestamp); + + + Optional<Integer> getDelayTimer(); + + Optional<TimeUnit> getTimeUnitOptional(); + + /** + * Get the application scope + * @return + */ + Optional<ApplicationScope> getApplicationScope(); + + /** + * Get the collection name + * @return + */ + Optional<String> getCollectionName(); + + /** + * Get the cursor + * @return + */ + Optional<String> getCursor(); + + /** + * Get the latest timestamp to delete + * @return + */ + Optional<Long> getEndTimestamp(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java new file mode 100644 index 0000000..890b770 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import com.google.common.base.Optional; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + + +/** + * collection delete service request builder + */ +public class CollectionDeleteRequestBuilderImpl implements CollectionDeleteRequestBuilder { + + private Optional<UUID> withApplicationId = Optional.absent(); + private Optional<String> withCollectionName = Optional.absent(); + private Optional<String> cursor = Optional.absent(); + private Optional<Long> endTimestamp = Optional.absent(); + private Optional<Integer> delayTimer = Optional.absent(); + private Optional<TimeUnit> timeUnitOptional = Optional.absent(); + + + /*** + * + * @param applicationId The application id + * @return + */ + @Override + public CollectionDeleteRequestBuilder withApplicationId( final UUID applicationId ) { + this.withApplicationId = Optional.fromNullable( applicationId ); + return this; + } + + + /** + * the collection name + * @param collectionName + * @return + */ + @Override + public CollectionDeleteRequestBuilder withCollection( final String collectionName ) { + this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() ) ); + return this; + } + + + /** + * The cursor + * @param cursor + * @return + */ + @Override + public CollectionDeleteRequestBuilder withCursor( final String cursor ) { + this.cursor = Optional.fromNullable( cursor ); + return this; + } + + + /** + * Determines whether we should tack on a delay for collection delete and for how long if we do. Also + * allowed to specify how throttled back it should be. + * @param delayTimer + * @param timeUnit + * @return + */ + @Override + public CollectionDeleteRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){ + this.delayTimer = Optional.fromNullable( delayTimer ); + this.timeUnitOptional = Optional.fromNullable( timeUnit ); + + return this; + } + + + /** + * Set end timestamp in epoch time. Only entities created before this time will be processed for deletion + * @param timestamp + * @return + */ + @Override + public CollectionDeleteRequestBuilder withEndTimestamp( final Long timestamp ) { + this.endTimestamp = Optional.fromNullable( timestamp ); + return this; + } + + + @Override + public Optional<Integer> getDelayTimer() { + return delayTimer; + } + + @Override + public Optional<TimeUnit> getTimeUnitOptional() { + return timeUnitOptional; + } + + + @Override + public Optional<ApplicationScope> getApplicationScope() { + + if ( this.withApplicationId.isPresent() ) { + return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) ); + } + + return Optional.absent(); + } + + + @Override + public Optional<String> getCollectionName() { + return withCollectionName; + } + + + @Override + public Optional<String> getCursor() { + return cursor; + } + + + @Override + public Optional<Long> getEndTimestamp() { + return endTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java new file mode 100644 index 0000000..c939dd3 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +/** + * An interface for re-indexing all entities in an application + */ +public interface CollectionDeleteService { + + + /** + * Perform a collection delete via service + * + * @param collectionDeleteRequestBuilder The builder to build the request + */ + CollectionDeleteStatus deleteCollection(final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder); + + + /** + * Generate a build for the collection delete + */ + CollectionDeleteRequestBuilder getBuilder(); + + + /** + * Get the status of a job + * @param jobId The jobId returned during the collection delete + * @return + */ + CollectionDeleteStatus getStatus(final String jobId); + + + /** + * The response when requesting a collection delete operation + */ + public class CollectionDeleteStatus { + final String jobId; + final Status status; + final long numberProcessed; + final long lastUpdated; + + + public CollectionDeleteStatus(final String jobId, final Status status, final long numberProcessed, + final long lastUpdated ) { + this.jobId = jobId; + this.status = status; + this.numberProcessed = numberProcessed; + this.lastUpdated = lastUpdated; + } + + + /** + * Get the jobId used to resume this operation + */ + public String getJobId() { + return jobId; + } + + + /** + * Get the last updated time, as a long + * @return + */ + public long getLastUpdated() { + return lastUpdated; + } + + + /** + * Get the number of records processed + * @return + */ + public long getNumberProcessed() { + return numberProcessed; + } + + + /** + * Get the status + * @return + */ + public Status getStatus() { + return status; + } + } + + enum Status{ + STARTED, INPROGRESS, COMPLETE, UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java new file mode 100644 index 0000000..7b3e324 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + + +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; +import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil; +import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.util.StringUtils; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.utils.InflectionUtils; +import org.apache.usergrid.utils.JsonUtils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; +import rx.schedulers.Schedulers; + +import static com.google.common.base.Optional.fromNullable; + + +@Singleton +public class CollectionDeleteServiceImpl implements CollectionDeleteService { + + private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteServiceImpl.class ); + + private static final MapScope RESUME_MAP_SCOPE = + new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "collectiondeleteresume" ); + + //Keep cursors to resume collection delete for 10 days. + private static final int CURSOR_TTL = 60 * 60 * 24 * 10; + + private static final String MAP_CURSOR_KEY = "cursor"; + private static final String MAP_COUNT_KEY = "count"; + private static final String MAP_STATUS_KEY = "status"; + private static final String MAP_UPDATED_KEY = "lastUpdated"; + + + private final AllApplicationsObservable allApplicationsObservable; + private final IndexLocationStrategyFactory indexLocationStrategyFactory; + private final AllEntityIdsObservable allEntityIdsObservable; + private final IndexProcessorFig indexProcessorFig; + private final MapManager mapManager; + private final MapManagerFactory mapManagerFactory; + private final AsyncEventService indexService; + private final EntityIndexFactory entityIndexFactory; + private final CollectionSettingsFactory collectionSettingsFactory; + + + @Inject + public CollectionDeleteServiceImpl(final EntityIndexFactory entityIndexFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final AllEntityIdsObservable allEntityIdsObservable, + final MapManagerFactory mapManagerFactory, + final AllApplicationsObservable allApplicationsObservable, + final IndexProcessorFig indexProcessorFig, + final CollectionSettingsFactory collectionSettingsFactory, + final AsyncEventService indexService ) { + this.entityIndexFactory = entityIndexFactory; + this.indexLocationStrategyFactory = indexLocationStrategyFactory; + this.allEntityIdsObservable = allEntityIdsObservable; + this.allApplicationsObservable = allApplicationsObservable; + this.indexProcessorFig = indexProcessorFig; + this.indexService = indexService; + this.collectionSettingsFactory = collectionSettingsFactory; + this.mapManagerFactory = mapManagerFactory; + this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE ); + } + + + //TODO: optional delay, param. + @Override + public CollectionDeleteStatus deleteCollection( final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder) { + + final AtomicInteger count = new AtomicInteger(); + + final Optional<EdgeScope> cursor = parseCursor( collectionDeleteRequestBuilder.getCursor() ); + + final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor ); + + final Optional<Integer> delayTimer = collectionDeleteRequestBuilder.getDelayTimer(); + + final Optional<TimeUnit> timeUnitOptional = collectionDeleteRequestBuilder.getTimeUnitOptional(); + + Optional<ApplicationScope> appId = collectionDeleteRequestBuilder.getApplicationScope(); + + Preconditions.checkArgument(collectionDeleteRequestBuilder.getCollectionName().isPresent(), + "You must specify a collection name"); + String collectionName = collectionDeleteRequestBuilder.getCollectionName().get(); + + Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()), + "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid." ); + Preconditions.checkArgument( cursor.isPresent() || appId.isPresent(), + "Either application ID or cursor is required."); + + ApplicationScope applicationScope; + if (appId.isPresent()) { + applicationScope = appId.get(); + } else { // cursor is present + applicationScope = cursor.get().getApplicationScope(); + } + + + final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + + // default to current time + final long endTimestamp = collectionDeleteRequestBuilder.getEndTimestamp().or( System.currentTimeMillis() ); + + String pluralizedCollectionName = InflectionUtils.pluralize(CpNamingUtils.getNameFromEdgeType(collectionName)); + + CollectionSettings collectionSettings = + collectionSettingsFactory.getInstance(new CollectionSettingsScopeImpl(applicationScope.getApplication(), pluralizedCollectionName)); + + Optional<Map<String, Object>> existingSettings = + collectionSettings.getCollectionSettings( pluralizedCollectionName ); + + if ( existingSettings.isPresent() ) { + + Map jsonMapData = existingSettings.get(); + + jsonMapData.put( "lastCollectionClear", Instant.now().toEpochMilli() ); + + collectionSettings.putCollectionSettings( + pluralizedCollectionName, JsonUtils.mapToJsonString(jsonMapData ) ); + } + + allEntityIdsObservable.getEdgesToEntities( Observable.just(applicationScope), + fromNullable(collectionName), cursorSeek.getSeekValue() ) + .buffer( indexProcessorFig.getCollectionDeleteBufferSize()) + .doOnNext( edgeScopes -> { + logger.info("Sending batch of {} to be deleted.", edgeScopes.size()); + indexService.deleteBatch(edgeScopes, endTimestamp, AsyncEventQueueType.DELETE); + count.addAndGet(edgeScopes.size() ); + if( edgeScopes.size() > 0 ) { + writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); + } + writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); }) + .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) + .subscribeOn( Schedulers.io() ).subscribe(); + + + return new CollectionDeleteStatus( jobId, Status.STARTED, 0, 0 ); + } + + + @Override + public CollectionDeleteRequestBuilder getBuilder() { + return new CollectionDeleteRequestBuilderImpl(); + } + + + @Override + public CollectionDeleteStatus getStatus( final String jobId ) { + Preconditions.checkNotNull( jobId, "jobId must not be null" ); + return getCollectionDeleteResponse( jobId ); + } + + + /** + * Get the resume edge scope + * + * @param edgeScope The optional edge scope from the cursor + */ + private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) { + + + if ( edgeScope.isPresent() ) { + return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) ); + } + + return new CursorSeek<>( Optional.absent() ); + } + + + /** + * Swap our cursor for an optional edgescope + */ + private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) { + + if ( !cursor.isPresent() ) { + return Optional.absent(); + } + + //get our cursor + final String persistedCursor = mapManager.getString( cursor.get() ); + + if ( persistedCursor == null ) { + return Optional.absent(); + } + + final JsonNode node = CursorSerializerUtil.fromString( persistedCursor ); + + final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() ); + + return Optional.of( edgeScope ); + } + + + /** + * Write the cursor state to the map in cassandra + */ + private void writeCursorState( final String jobId, final EdgeScope edge ) { + + final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge ); + + final String serializedState = CursorSerializerUtil.asString( node ); + + mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, CURSOR_TTL); + } + + + /** + * Write our state meta data into cassandra so everyone can see it + * @param jobId + * @param status + * @param processedCount + * @param lastUpdated + */ + private void writeStateMeta( final String jobId, final Status status, final long processedCount, + final long lastUpdated ) { + + if(logger.isDebugEnabled()) { + logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}", + jobId, status, processedCount, lastUpdated); + } + + mapManager.putString( jobId + MAP_STATUS_KEY, status.name() ); + mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount ); + mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated ); + } + + + /** + * Get the index response from the jobId + * @param jobId + * @return + */ + private CollectionDeleteStatus getCollectionDeleteResponse( final String jobId ) { + + final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY ); + + if(stringStatus == null){ + return new CollectionDeleteStatus( jobId, Status.UNKNOWN, 0, 0 ); + } + + final Status status = Status.valueOf( stringStatus ); + + final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); + final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY ); + + return new CollectionDeleteStatus( jobId, status, processedCount, lastUpdated ); + } +} + + http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index eb63056..948e106 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -56,6 +56,8 @@ public interface IndexProcessorFig extends GuicyFig { String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size"; + String COLLECTION_DELETE_BUFFER_SIZE = "elasticsearch.collection_delete.buffer_size"; + String REINDEX_CONCURRENCY_FACTOR = "elasticsearch.reindex.concurrency.factor"; @@ -157,6 +159,13 @@ public interface IndexProcessorFig extends GuicyFig { int getReindexConcurrencyFactor(); /** + * Number of parallel buffers during collection delete + */ + @Default("500") + @Key(COLLECTION_DELETE_BUFFER_SIZE) + int getCollectionDeleteBufferSize(); + + /** * Flag to resolve the LOCAL queue implementation service synchronously. */ @Default("false") http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index c7371b3..05602fc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -352,7 +352,7 @@ public class ReIndexServiceImpl implements ReIndexService { final Status status = Status.valueOf( stringStatus ); final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); - final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY ); + final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY ); return new ReIndexStatus( jobId, status, processedCount, lastUpdated ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java new file mode 100644 index 0000000..ddf2c68 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.persistence; + + +import com.codahale.metrics.MetricRegistry; +import com.google.inject.Injector; +import net.jcip.annotations.NotThreadSafe; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.usergrid.AbstractCoreIT; +import org.apache.usergrid.cassandra.SpringResource; +import org.apache.usergrid.corepersistence.index.*; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.junit.Assert.*; + + +@NotThreadSafe +public class CollectionDeleteTest extends AbstractCoreIT { + private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class ); + + private static final MetricRegistry registry = new MetricRegistry(); + + + private static final int ENTITIES_TO_DELETE = 1000; + private static final int ENTITIES_TO_ADD_AFTER_TIME = 3; + + + @Before + public void startReporting() { + + if (logger.isDebugEnabled()) { + logger.debug("Starting metrics reporting"); + } + } + + + @After + public void printReport() { + logger.debug( "Printing metrics report" ); + } + + + @Test( timeout = 240000 ) + public void clearOneCollection() throws Exception { + + logger.info( "Started clearOneCollection()" ); + + String rand = RandomStringUtils.randomAlphanumeric( 5 ); + final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand ); + + final EntityManager em = setup.getEmf().getEntityManager( appId ); + + final CollectionDeleteService collectionDeleteService = setup.getInjector().getInstance( CollectionDeleteService.class ); + + // ----------------- create a bunch of entities + + Map<String, Object> entityMap = new HashMap<String, Object>() {{ + put( "key1", 1000 ); + put( "key2", 2000 ); + put( "key3", "Some value" ); + }}; + + String collectionName = "items"; + String itemType = "item"; + + + List<EntityRef> entityRefs = new ArrayList<EntityRef>(); + for ( int i = 0; i < ENTITIES_TO_DELETE; i++ ) { + + final Entity entity; + + try { + entityMap.put( "key", i ); + entity = em.create(itemType, entityMap); + } + catch ( Exception ex ) { + throw new RuntimeException( "Error creating entity", ex ); + } + + entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); + if ( i % 10 == 0 ) { + logger.info( "Created {} entities", i ); + } + } + + logger.info("Created {} entities", ENTITIES_TO_DELETE); + long timeFirstPutDone = System.currentTimeMillis(); + logger.info("timeFirstPutDone={}", timeFirstPutDone); + + for (int i = 0; i < ENTITIES_TO_ADD_AFTER_TIME; i++) { + + final Entity entity; + + try { + entityMap.put( "key", ENTITIES_TO_DELETE + i ); + entity = em.create(itemType, entityMap); + } + catch ( Exception ex ) { + throw new RuntimeException( "Error creating entity", ex ); + } + + entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) ); + if ( i % 10 == 0 ) { + logger.info( "Created {} entities after delete time", i ); + } + + } + logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME); + + + app.waitForQueueDrainAndRefreshIndex(5000); + + final CollectionDeleteRequestBuilder builder = + collectionDeleteService.getBuilder() + .withApplicationId( em.getApplicationId() ) + .withCollection(collectionName) + .withEndTimestamp(timeFirstPutDone); + + CollectionDeleteService.CollectionDeleteStatus status = collectionDeleteService.deleteCollection(builder); + + assertNotNull( status.getJobId(), "JobId is present" ); + + logger.info( "Delete collection" ); + + + waitForDelete( status, collectionDeleteService ); + + app.waitForQueueDrainAndRefreshIndex(15000); + + // ----------------- test that we can read the entries after the timestamp + + readData( em, collectionName,ENTITIES_TO_ADD_AFTER_TIME); + } + + /** + * Wait for the delete to occur + */ + private void waitForDelete( final CollectionDeleteService.CollectionDeleteStatus status, final CollectionDeleteService collectionDeleteService ) + throws InterruptedException, IllegalArgumentException { + if (status != null) { + logger.info("waitForDelete: jobID={}", status.getJobId()); + } else { + logger.info("waitForDelete: error, status = null"); + throw new IllegalArgumentException("collectionDeleteStatus = null"); + } + while ( true ) { + + try { + final CollectionDeleteService.CollectionDeleteStatus updatedStatus = + collectionDeleteService.getStatus( status.getJobId() ); + + if (updatedStatus == null) { + logger.info("waitForDelete: updated status is null"); + } else { + logger.info("waitForDelete: status={} numberProcessed={}", + updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed()); + + if ( updatedStatus.getStatus() == CollectionDeleteService.Status.COMPLETE ) { + break; + } + } + } + catch ( IllegalArgumentException iae ) { + //swallow. Thrown if our job can't be found. I.E hasn't updated yet + } + + + Thread.sleep( 1000 ); + } + } + + + private int readData(EntityManager em, String collectionName, int expectedEntities) + throws Exception { + + app.waitForQueueDrainAndRefreshIndex(); + + Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities, + Query.Level.ALL_PROPERTIES, false); + + int count = 0; + while ( true ) { + + if (results.getEntities().size() == 0) { + break; + } + + UUID lastEntityUUID = null; + for ( Entity e : results.getEntities() ) { + + assertEquals(2000, e.getProperty("key2")); + + if (count % 100 == 0) { + logger.info("read {} entities", count); + } + lastEntityUUID = e.getUuid(); + count++; + } + + results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities, + Query.Level.ALL_PROPERTIES, false); + + } + logger.info("read {} total entities", count); + + assertEquals( "Did not get expected entities", expectedEntities, count ); + return count; + } + + private int countEntities( EntityManager em, String collectionName, int expectedEntities) + throws Exception { + + app.waitForQueueDrainAndRefreshIndex(); + + Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 ); + Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities ); + + int count = 0; + while ( true ) { + + count += results.size(); + + + if ( results.hasCursor() ) { + logger.info( "Counted {} : query again with cursor", count ); + q.setCursor( results.getCursor() ); + results = em.searchCollection( em.getApplicationRef(), collectionName, q ); + } + else { + break; + } + } + + assertEquals( "Did not get expected entities", expectedEntities, count ); + return count; + } + + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/query-validator/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/query-validator/src/test/resources/usergrid-custom-test.properties b/stack/query-validator/src/test/resources/usergrid-custom-test.properties index bc1ba56..c8e3eee 100644 --- a/stack/query-validator/src/test/resources/usergrid-custom-test.properties +++ b/stack/query-validator/src/test/resources/usergrid-custom-test.properties @@ -30,3 +30,5 @@ usergrid.sysadmin.login.allowed=true # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid + +elasticsearch.queue_impl=LOCAL http://git-wip-us.apache.org/repos/asf/usergrid/blob/11823f29/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java index b8c1caa..c9174c1 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java @@ -18,19 +18,19 @@ package org.apache.usergrid.rest.applications; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.PathSegment; import javax.ws.rs.core.UriInfo; +import com.google.common.base.Preconditions; +import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder; +import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilderImpl; +import org.apache.usergrid.corepersistence.index.CollectionDeleteService; +import org.apache.usergrid.persistence.index.utils.ConversionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @@ -48,6 +48,9 @@ import org.apache.usergrid.services.ServicePayload; import com.fasterxml.jackson.jaxrs.json.annotation.JSONP; +import java.util.HashMap; +import java.util.Map; + /** * A collection resource that stands before the Service Resource. If it cannot find @@ -61,6 +64,9 @@ import com.fasterxml.jackson.jaxrs.json.annotation.JSONP; }) public class CollectionResource extends ServiceResource { + private static final Logger logger = LoggerFactory.getLogger( CollectionResource.class ); + private static final String UPDATED_BEFORE_FIELD = "updatedBefore"; + public CollectionResource() { } @@ -190,6 +196,61 @@ public class CollectionResource extends ServiceResource { } + @PUT + @Path("{itemName}/_clear") + @Produces({MediaType.APPLICATION_JSON, "application/javascript"}) + @RequireApplicationAccess + @JSONP + public ApiResponse clearCollectionPut( + final Map<String, Object> payload, + @PathParam("itemName") final String collectionName, + @QueryParam("callback") @DefaultValue("callback") String callback + ) throws Exception { + + logger.info("Clearing collection {} for application {}", collectionName, getApplicationId().toString()); + + final CollectionDeleteRequestBuilder request = createRequest() + .withApplicationId(getApplicationId()) + .withCollection(collectionName); + + return executeResumeAndCreateResponse(payload, request, callback); + + } + + + @GET + @Path( "{itemName}/_clear/{jobId}") + @Produces({MediaType.APPLICATION_JSON,"application/javascript"}) + @RequireApplicationAccess + @JSONP + public ApiResponse clearCollectionJobGet( + @Context UriInfo ui, + @PathParam("itemName") PathSegment itemName, + @PathParam("jobId") String jobId, + @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { + + if(logger.isTraceEnabled()){ + logger.trace( "CollectionResource.clearCollectionJobGet" ); + } + + Preconditions + .checkNotNull(jobId, "path param jobId must not be null" ); + + CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().getStatus(jobId); + + final ApiResponse response = createApiResponse(); + + response.setAction( "clear collection" ); + response.setProperty( "jobId", status.getJobId() ); + response.setProperty( "status", status.getStatus() ); + response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); + response.setProperty( "numberCheckedForDeletion", status.getNumberProcessed() ); + response.setSuccess(); + + return response; + } + + // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this. // So system access only. // TODO: use scheduler here to get around people sending a reindex call 30 times. @@ -210,4 +271,57 @@ public class CollectionResource extends ServiceResource { services.getApplicationId().toString(),itemName.getPath(),false,callback ); } + + private CollectionDeleteService getCollectionDeleteService() { + return injector.getInstance( CollectionDeleteService.class ); + } + + + private CollectionDeleteRequestBuilder createRequest() { + return new CollectionDeleteRequestBuilderImpl(); + } + + + private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> payload, + final CollectionDeleteRequestBuilder request, + final String callback ) { + + Map<String,Object> newPayload = payload; + if(newPayload == null || !payload.containsKey( UPDATED_BEFORE_FIELD )){ + newPayload = new HashMap<>(1); + newPayload.put(UPDATED_BEFORE_FIELD,Long.MAX_VALUE); + } + + Preconditions.checkArgument(newPayload.get(UPDATED_BEFORE_FIELD) instanceof Number, + "The field \"updatedBefore\" in the payload must be a timestamp" ); + + //add our updated timestamp to the request + if ( newPayload.containsKey( UPDATED_BEFORE_FIELD ) ) { + final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_BEFORE_FIELD)); + request.withEndTimestamp( timestamp ); + } + + return executeAndCreateResponse( request, callback ); + } + + /** + * Execute the request and return the response. + */ + private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) { + + + final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request ); + + final ApiResponse response = createApiResponse(); + + response.setAction( "clear collection" ); + response.setProperty( "jobId", status.getJobId() ); + response.setProperty( "status", status.getStatus() ); + response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); + response.setProperty( "numberQueued", status.getNumberProcessed() ); + response.setSuccess(); + + return response; + } + }