Repository: usergrid
Updated Branches:
  refs/heads/fixOrphanedEdges 4a659106e -> f4842b06c


Remove orphaned collection edges with no target entity

Also remove extraneous entityID sort from the default Elasticsearch sort


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f4842b06
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f4842b06
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f4842b06

Branch: refs/heads/fixOrphanedEdges
Commit: f4842b06cd9fbc03af03039cc1f050a887c9b45c
Parents: 4a65910
Author: Mike Dunker <mdun...@google.com>
Authored: Fri Oct 13 10:13:55 2017 -0700
Committer: Mike Dunker <mdun...@google.com>
Committed: Fri Oct 13 10:13:55 2017 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  3 +
 .../read/traverse/EntityLoadVerifyFilter.java   | 72 +++++++++++++++++---
 .../pipeline/read/traverse/ReadRepairFig.java   | 38 +++++++++++
 .../impl/SearchRequestBuilderStrategy.java      |  3 +-
 4 files changed, 106 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a0748e6..909c073 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -27,6 +27,7 @@ import 
org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
 import 
org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration;
 import org.apache.usergrid.corepersistence.pipeline.PipelineModule;
+import 
org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadRepairFig;
 import org.apache.usergrid.corepersistence.rx.impl.*;
 import org.apache.usergrid.corepersistence.service.*;
 import org.apache.usergrid.locking.guice.LockModule;
@@ -160,6 +161,8 @@ public class CoreModule extends AbstractModule {
 
         install( new GuicyFigModule( EntityManagerFig.class ) );
 
+        install( new GuicyFigModule( ReadRepairFig.class ) );
+
         install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
 
         install( new GuicyFigModule( ServiceSchedulerFig.class ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 3f6e26d..7cc9735 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -23,6 +23,11 @@ package 
org.apache.usergrid.corepersistence.pipeline.read.traverse;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,11 +57,17 @@ public class EntityLoadVerifyFilter extends 
AbstractFilter<FilterResult<Id>, Fil
     private static final Logger logger = LoggerFactory.getLogger( 
EntityLoadVerifyFilter.class );
 
     private final EntityCollectionManagerFactory 
entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+    private final ReadRepairFig readRepairFig;
 
 
     @Inject
-    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory ) {
+    public EntityLoadVerifyFilter( final EntityCollectionManagerFactory 
entityCollectionManagerFactory,
+                                   final GraphManagerFactory 
graphManagerFactory,
+                                   final ReadRepairFig readRepairFig) {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+        this.readRepairFig = readRepairFig;
     }
 
 
@@ -64,8 +75,9 @@ public class EntityLoadVerifyFilter extends 
AbstractFilter<FilterResult<Id>, Fil
     public Observable<FilterResult<Entity>> call( final 
Observable<FilterResult<Id>> filterResultObservable ) {
 
 
+        final ApplicationScope applicationScope = 
pipelineContext.getApplicationScope();
         final EntityCollectionManager entityCollectionManager =
-            entityCollectionManagerFactory.createCollectionManager( 
pipelineContext.getApplicationScope() );
+            entityCollectionManagerFactory.createCollectionManager( 
applicationScope );
 
         //it's more efficient to make 1 network hop to get everything, then 
drop our results if required
         final Observable<FilterResult<Entity>> entityObservable =
@@ -80,9 +92,10 @@ public class EntityLoadVerifyFilter extends 
AbstractFilter<FilterResult<Id>, Fil
                               .flatMap( ids -> entityCollectionManager.load( 
ids ) );
 
 
-                //now we have a collection, validate our canidate set is 
correct.
-
-                return entitySetObservable.map( entitySet -> new 
EntityVerifier( entitySet, bufferedIds ) )
+                //now we have a collection, validate our candidate set is 
correct.
+                GraphManager graphManager = 
graphManagerFactory.createEdgeManager(applicationScope);
+                return entitySetObservable.map( entitySet -> new 
EntityVerifier( applicationScope, graphManager,
+                    entitySet, bufferedIds, readRepairFig ) )
                                           .doOnNext( entityCollector -> 
entityCollector.merge() ).flatMap(
                         entityCollector -> Observable.from( 
entityCollector.getResults() ) );
             } );
@@ -102,12 +115,20 @@ public class EntityLoadVerifyFilter extends 
AbstractFilter<FilterResult<Id>, Fil
 
         private final List<FilterResult<Id>> candidateResults;
         private final EntitySet entitySet;
+        private final GraphManager graphManager;
+        private final ApplicationScope applicationScope;
+        private final ReadRepairFig readRepairFig;
 
 
-        public EntityVerifier( final EntitySet entitySet, final 
List<FilterResult<Id>> candidateResults ) {
+        public EntityVerifier( final ApplicationScope applicationScope, final 
GraphManager graphManager,
+                               final EntitySet entitySet, final 
List<FilterResult<Id>> candidateResults,
+                               final ReadRepairFig readRepairFig) {
+            this.applicationScope = applicationScope;
+            this.graphManager = graphManager;
             this.entitySet = entitySet;
             this.candidateResults = candidateResults;
             this.results = new ArrayList<>( entitySet.size() );
+            this.readRepairFig = readRepairFig;
         }
 
 
@@ -137,11 +158,44 @@ public class EntityLoadVerifyFilter extends 
AbstractFilter<FilterResult<Id>, Fil
 
             //doesn't exist warn and drop
             if ( entity == null || !entity.getEntity().isPresent() ) {
-                logger.warn( "Read graph edge and received candidate with 
entityId {}, yet was not found in cassandra."
-                    + "  Ignoring since this could be a region sync issue", 
candidateId );
+
+                // look for orphaned edges
+                String edgeTypeName = 
CpNamingUtils.getEdgeTypeFromCollectionName(Schema.defaultCollectionName(candidateId.getType()));
+                final SearchByEdge searchByEdge =
+                    new SimpleSearchByEdge( applicationScope.getApplication(), 
edgeTypeName, candidateId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.absent() );
+
+                int edgesDeleted = 0;
+                List<MarkedEdge> edgeList = 
graphManager.loadEdgeVersions(searchByEdge).toList().toBlocking().last();
+                if (edgeList.size() > 0) {
+                    MarkedEdge firstEdge = edgeList.get(0);
+                    long currentTimestamp = 
CpNamingUtils.createGraphOperationTimestamp();
+                    long edgeTimestamp = firstEdge.getTimestamp();
+                    long timestampDiff = currentTimestamp - edgeTimestamp;
+                    long orphanDelaySecs = 
readRepairFig.getEdgeOrphanDelaySecs();
+                    // timestamps are in 100 nanoseconds, convert from seconds
+                    long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L;
+                    if (timestampDiff > allowedDiff) {
+                        // edges must be orphans, delete edges
+                        for (MarkedEdge edge: edgeList) {
+                            MarkedEdge markedEdge = 
graphManager.markEdge(edge).toBlocking().lastOrDefault(null);
+                            if (markedEdge != null) {
+                                
graphManager.deleteEdge(markedEdge).toBlocking().lastOrDefault(null);
+                                edgesDeleted += 1;
+                            }
+                        }
+                    }
+                }
+
+                if (edgesDeleted > 0) {
+                    logger.warn("Read graph edge and received candidate with 
entityId {}, yet was not found in cassandra."
+                        + "  Deleted {} edges.", candidateId, edgesDeleted);
+                } else {
+                    logger.warn("Read graph edge and received candidate with 
entityId {}, yet was not found in cassandra."
+                        + "  Ignoring since this could be a region sync 
issue", candidateId);
+                }
 
 
-                //TODO trigger an audit after a fail count where we explicitly 
try to repair from other regions
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
new file mode 100644
index 0000000..2f3e6e4
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+/**
+ * Read repair fig
+ */
+@FigSingleton
+public interface ReadRepairFig extends GuicyFig {
+
+    @Key( "usergrid.edge.orphan.delete.delay.secs" )
+    @Default( "86400" ) // 1 day
+    long getEdgeOrphanDelaySecs();
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index c5f07b4..5812c6f 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -141,8 +141,9 @@ public class SearchRequestBuilderStrategy {
         //sort by the edge timestamp
         srb.addSort( SortBuilders.fieldSort( 
IndexingUtils.EDGE_TIMESTAMP_FIELDNAME ).order( SortOrder.DESC ) );
 
+        // removing secondary sort by entity ID -- takes ES resources and 
provides no benefit
         //sort by the entity id if our times are equal
-        srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME 
).order( SortOrder.ASC ) );
+        //srb.addSort( SortBuilders.fieldSort( 
IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) );
 
         return;
     }

Reply via email to