Github user GERey commented on a diff in the pull request: https://github.com/apache/incubator-usergrid/pull/248#discussion_r30720175 --- Diff: stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java --- @@ -70,60 +79,228 @@ @Inject public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, final MapManagerFactory mapManagerFactory, - final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig, - final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) { + final AllApplicationsObservable allApplicationsObservable, + final IndexProcessorFig indexProcessorFig, final AsyncEventService indexService ) { this.allEntityIdsObservable = allEntityIdsObservable; this.allApplicationsObservable = allApplicationsObservable; this.indexProcessorFig = indexProcessorFig; - this.rxTaskScheduler = rxTaskScheduler; this.indexService = indexService; - this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE ); + this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE ); } - @Override - public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, - final Optional<String> collectionName, final Optional<String> cursor, - final Optional<Long> startTimestamp ) { + public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) { //load our last emitted Scope if a cursor is present - if ( cursor.isPresent() ) { - throw new UnsupportedOperationException( "Build this" ); - } + final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() ); + + + final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor ); + + final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope(); - final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData(); - final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()), + "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" ); + + final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId ); + + + final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); + + final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE ); //create an observable that loads each entity and indexes it, start it running with publish - final ConnectableObservable<EdgeScope> runningReIndex = - allEntityIdsObservable.getEdgesToEntities( applicationScopes, collectionName, startTimestamp ) + final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes, + reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() ) + + //for each edge, create our scope and index on it + .doOnNext( edge -> { + final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince ); + + logger.info( "Queueing {}", entityIndexOperation ); - //for each edge, create our scope and index on it - .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish(); + indexService.index(entityIndexOperation); + } ); //start our sampler and state persistence //take a sample every sample interval to allow us to resume state with minimal loss - runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS, - rxTaskScheduler.getAsyncIOScheduler() ) - .doOnNext( edge -> { + runningReIndex.buffer( indexProcessorFig.getUpdateInterval() ) + //create our flushing collector and flush the edge scopes to it + .collect( () -> new FlushingCollector( jobId ), + ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() ) + //subscribe on our I/O scheduler and run the task + .subscribeOn( Schedulers.io() ).subscribe(); + + + return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); + } + + + @Override + public ReIndexRequestBuilder getBuilder() { + return new ReIndexRequestBuilderImpl(); --- End diff -- Should this point to the builder rather than the impl itself , then wire the impl up through guice?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---