Github user GERey commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/248#discussion_r30720175
  
    --- Diff: 
stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 ---
    @@ -70,60 +79,228 @@
         @Inject
         public ReIndexServiceImpl( final AllEntityIdsObservable 
allEntityIdsObservable,
                                    final MapManagerFactory mapManagerFactory,
    -                               final AllApplicationsObservable 
allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
    -                               final RxTaskScheduler rxTaskScheduler, 
final AsyncEventService indexService ) {
    +                               final AllApplicationsObservable 
allApplicationsObservable,
    +                               final IndexProcessorFig indexProcessorFig, 
final AsyncEventService indexService ) {
             this.allEntityIdsObservable = allEntityIdsObservable;
             this.allApplicationsObservable = allApplicationsObservable;
             this.indexProcessorFig = indexProcessorFig;
    -        this.rxTaskScheduler = rxTaskScheduler;
             this.indexService = indexService;
     
    -        this.mapManager = mapManagerFactory.createMapManager( 
RESUME_MAP_SCOPTE );
    +        this.mapManager = mapManagerFactory.createMapManager( 
RESUME_MAP_SCOPE );
         }
     
     
    -
         @Override
    -    public IndexResponse rebuildIndex( final Optional<UUID> appId, final 
Optional<String> collection,
    -                                       final Optional<String> 
collectionName, final Optional<String> cursor,
    -                                       final Optional<Long> startTimestamp 
) {
    +    public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder 
reIndexRequestBuilder ) {
     
             //load our last emitted Scope if a cursor is present
    -        if ( cursor.isPresent() ) {
    -            throw new UnsupportedOperationException( "Build this" );
    -        }
     
    +        final Optional<EdgeScope> cursor = parseCursor( 
reIndexRequestBuilder.getCursor() );
    +
    +
    +        final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
    +
    +        final Optional<ApplicationScope> appId = 
reIndexRequestBuilder.getApplicationScope();
     
    -        final Observable<ApplicationScope>  applicationScopes = 
appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : 
allApplicationsObservable.getData();
     
    -        final String newCursor = StringUtils.sanitizeUUID( 
UUIDGenerator.newTimeUUID() );
    +        Preconditions.checkArgument( !(cursor.isPresent() && 
appId.isPresent()),
    +            "You cannot specify an app id and a cursor.  When resuming 
with cursor you must omit the appid" );
    +
    +        final Observable<ApplicationScope> applicationScopes = 
getApplications( cursor, appId );
    +
    +
    +        final String jobId = StringUtils.sanitizeUUID( 
UUIDGenerator.newTimeUUID() );
    +
    +        final long modifiedSince = 
reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
     
             //create an observable that loads each entity and indexes it, 
start it running with publish
    -        final ConnectableObservable<EdgeScope> runningReIndex =
    -            allEntityIdsObservable.getEdgesToEntities( applicationScopes, 
collectionName, startTimestamp )
    +        final Observable<EdgeScope> runningReIndex = 
allEntityIdsObservable.getEdgesToEntities( applicationScopes,
    +            reIndexRequestBuilder.getCollectionName(), 
cursorSeek.getSeekValue() )
    +
    +            //for each edge, create our scope and index on it
    +            .doOnNext( edge -> {
    +                final EntityIndexOperation entityIndexOperation = new 
EntityIndexOperation( edge.getApplicationScope(), 
edge.getEdge().getTargetNode(), modifiedSince );
    +
    +                logger.info( "Queueing {}", entityIndexOperation );
     
    -                //for each edge, create our scope and index on it
    -                .doOnNext( edge -> indexService.index( new EntityIdScope( 
edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
    +                indexService.index(entityIndexOperation);
     
    +            } );
     
     
             //start our sampler and state persistence
             //take a sample every sample interval to allow us to resume state 
with minimal loss
    -        runningReIndex.sample( 
indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
    -            rxTaskScheduler.getAsyncIOScheduler() )
    -            .doOnNext( edge -> {
    +        runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
    +            //create our flushing collector and flush the edge scopes to it
    +            .collect( () -> new FlushingCollector( jobId ),
    +                ( ( flushingCollector, edgeScopes ) -> 
flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> 
flushingCollector.complete() )
    +                //subscribe on our I/O scheduler and run the task
    +            .subscribeOn( Schedulers.io() ).subscribe();
    +
    +
    +        return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
    +    }
    +
    +
    +    @Override
    +    public ReIndexRequestBuilder getBuilder() {
    +        return new ReIndexRequestBuilderImpl();
    --- End diff --
    
    Should this point to the builder rather than the impl itself , then wire 
the impl up through guice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to