Add ability to start initial re-index seek with the UNIX timestamp.  This will 
only start seeking from the time provided, rather than seeking all and 
discarding what doesn't match a filter.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/85cc9343
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/85cc9343
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/85cc9343

Branch: refs/heads/usergrid-1318-queue
Commit: 85cc93436a163c3ba21a7ac1286c6bce3daebeb4
Parents: ef8899a
Author: Michael Russo <mru...@apigee.com>
Authored: Mon Oct 3 21:21:19 2016 -0700
Committer: Michael Russo <mru...@apigee.com>
Committed: Mon Oct 3 21:21:19 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/index/ReIndexServiceImpl.java | 18 ++++++++++++++----
 .../rx/impl/AllEntityIdsObservable.java           |  6 +++++-
 .../rx/impl/AllEntityIdsObservableImpl.java       |  7 +++++--
 .../graph/serialization/EdgesObservable.java      |  4 +++-
 .../serialization/impl/EdgesObservableImpl.java   | 16 +++++++++++++---
 .../usergrid/rest/system/IndexResource.java       | 12 ++++++------
 6 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 19fbcfa..f37f9af 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -135,7 +135,17 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final String jobId = StringUtils.sanitizeUUID( 
UUIDGenerator.newTimeUUID() );
 
-        final long modifiedSince = 
reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+        final long startTimestamp;
+        if ( reIndexRequestBuilder.getUpdateTimestamp().isPresent() && 
reIndexRequestBuilder.getUpdateTimestamp().get() > 0 ){
+
+            // edge timestamps are UUID timestamps, we need to convert from 
UNIX epoch to a UUID timestamp
+            long uuidEpochNanos = 0x01b21dd213814000L; // num 100 nano seconds 
since uuid epoch
+            startTimestamp = 
reIndexRequestBuilder.getUpdateTimestamp().get()*10000 + uuidEpochNanos;
+            logger.info("Reindex provided with from timestamp, converted to an 
Edge timestamp is: {}", startTimestamp);
+        }else{
+            startTimestamp = 0;
+        }
+
 
         // create an observable that loads a batch to be indexed
 
@@ -165,11 +175,11 @@ public class ReIndexServiceImpl implements ReIndexService 
{
         }
 
         allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-            reIndexRequestBuilder.getCollectionName(), 
cursorSeek.getSeekValue() )
+            reIndexRequestBuilder.getCollectionName(), 
cursorSeek.getSeekValue(), startTimestamp )
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", 
edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince);
+                indexService.indexBatch(edgeScopes, startTimestamp);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 
1));
@@ -178,7 +188,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, 
count.get(), System.currentTimeMillis() ))
             .subscribeOn( Schedulers.io() ).subscribe();
 
-        
+
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index 9070609..fe7a455 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -46,8 +46,12 @@ public interface AllEntityIdsObservable {
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
      * @param lastEdge The edge to resume processing from
+     * @param startTimestamp An optional unix timestamp to start the seek ( it 
will be converted to an Edge )
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final 
Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final 
Optional<Edge> lastEdge);
+    Observable<EdgeScope> getEdgesToEntities( final 
Observable<ApplicationScope> appScopes,
+                                              final Optional<String> edgeType,
+                                              Optional<Edge> lastEdge,
+                                              final long startTimestamp );
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 0420a32..e6f3633 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -82,12 +82,15 @@ public class AllEntityIdsObservableImpl implements 
AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final 
Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final 
Optional<Edge> lastEdge) {
+    public Observable<EdgeScope> getEdgesToEntities( final 
Observable<ApplicationScope> appScopes,
+                                                     final Optional<String> 
edgeType,
+                                                     final Optional<Edge> 
lastEdge,
+                                                     final long startTimestamp 
) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
 
-            return edgesObservable.edgesFromSourceDescending( gm, 
applicationScope.getApplication(), edgeType, lastEdge )
+            return edgesObservable.edgesFromSourceDescending( gm, 
applicationScope.getApplication(), edgeType, lastEdge, startTimestamp )
                                   .map( edge -> new 
EdgeScope(applicationScope, edge ));
         } );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 78a1d4b..7c83207 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -64,8 +64,10 @@ public interface EdgesObservable {
      * @param sourceNode
      * @param edgeType The edge type if specified.  Otherwise all types will 
be used
      * @param resume The edge to start seeking after.  Otherwise starts at the 
most recent
+     * @param startTimestamp A unix timestamp to start seeking from if you 
don't have the edge cursor
      * @return
      */
     Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final 
Id sourceNode,
-                                                final Optional<String> 
edgeType, final Optional<Edge> resume );
+                                                final Optional<String> 
edgeType, final Optional<Edge> resume,
+                                                final long startTimestamp );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 20efe42..2504e87 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,8 +75,8 @@ public class EdgesObservableImpl implements EdgesObservable {
 
     @Override
     public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, 
final Id sourceNode,
-                                                       final Optional<String> 
edgeTypeInput, final Optional<Edge> resume  ) {
-
+                                                       final Optional<String> 
edgeTypeInput, final Optional<Edge> resume,
+                                                       final long 
startTimestamp ) {
 
 
         final Observable<String> edgeTypes = edgeTypeInput.isPresent()? 
Observable.just( edgeTypeInput.get() ):
@@ -84,13 +85,22 @@ public class EdgesObservableImpl implements EdgesObservable 
{
 
         return edgeTypes.flatMap(  edgeType -> {
 
+            final Optional<Edge> start;
+
+            if( !resume.isPresent() && startTimestamp > 0 ){
+                // the target node doesn't matter here, the search only looks 
at the timestamp
+                start = Optional.of(new SimpleEdge(sourceNode, edgeType, 
sourceNode, startTimestamp));
+            }else{
+                start = resume;
+            }
+
                 if (logger.isTraceEnabled()) {
                     logger.trace("Loading edges of edgeType {} from {}", 
edgeType, sourceNode);
                 }
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                       resume ) );
+                       start ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cc9343/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
----------------------------------------------------------------------
diff --git 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..2be5b87 100644
--- 
a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ 
b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -61,7 +61,7 @@ import java.util.UUID;
 public class IndexResource extends AbstractContextResource {
 
     private static final Logger logger = LoggerFactory.getLogger( 
IndexResource.class );
-    private static final String UPDATED_FIELD = "updated";
+    private static final String SINCE_FIELD = "since";
 
 
 
@@ -321,17 +321,17 @@ public class IndexResource extends 
AbstractContextResource {
                                                             final String 
callback ) {
 
         Map<String,Object> newPayload = payload;
-        if(newPayload == null ||  !payload.containsKey( UPDATED_FIELD )){
+        if(newPayload == null ||  !payload.containsKey(SINCE_FIELD)){
             newPayload = new HashMap<>(1);
-            newPayload.put(UPDATED_FIELD,0);
+            newPayload.put(SINCE_FIELD,0);
         }
 
-        Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof 
Number,
+        Preconditions.checkArgument(newPayload.get(SINCE_FIELD) instanceof 
Number,
                 "You must specified the field \"updated\" in the payload and 
it must be a timestamp" );
 
         //add our updated timestamp to the request
-        if ( newPayload.containsKey( UPDATED_FIELD ) ) {
-            final long timestamp = 
ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
+        if ( newPayload.containsKey(SINCE_FIELD) ) {
+            final long timestamp = 
ConversionUtils.getLong(newPayload.get(SINCE_FIELD));
             request.withStartTimestamp( timestamp );
         }
 

Reply via email to