Repository: usergrid Updated Branches: refs/heads/fixOrphanedEdges 4a659106e -> f4842b06c
Remove orphaned collection edges with no target entity Also remove extraneous entityID sort from the default Elasticsearch sort Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f4842b06 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f4842b06 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f4842b06 Branch: refs/heads/fixOrphanedEdges Commit: f4842b06cd9fbc03af03039cc1f050a887c9b45c Parents: 4a65910 Author: Mike Dunker <mdun...@google.com> Authored: Fri Oct 13 10:13:55 2017 -0700 Committer: Mike Dunker <mdun...@google.com> Committed: Fri Oct 13 10:13:55 2017 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 3 + .../read/traverse/EntityLoadVerifyFilter.java | 72 +++++++++++++++++--- .../pipeline/read/traverse/ReadRepairFig.java | 38 +++++++++++ .../impl/SearchRequestBuilderStrategy.java | 3 +- 4 files changed, 106 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index a0748e6..909c073 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -27,6 +27,7 @@ import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; import org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration; import org.apache.usergrid.corepersistence.pipeline.PipelineModule; +import org.apache.usergrid.corepersistence.pipeline.read.traverse.ReadRepairFig; import org.apache.usergrid.corepersistence.rx.impl.*; import org.apache.usergrid.corepersistence.service.*; import org.apache.usergrid.locking.guice.LockModule; @@ -160,6 +161,8 @@ public class CoreModule extends AbstractModule { install( new GuicyFigModule( EntityManagerFig.class ) ); + install( new GuicyFigModule( ReadRepairFig.class ) ); + install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) ); install( new GuicyFigModule( ServiceSchedulerFig.class ) ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java index 3f6e26d..7cc9735 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java @@ -23,6 +23,11 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; import java.util.ArrayList; import java.util.List; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.Schema; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.*; +import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +57,17 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class ); private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final GraphManagerFactory graphManagerFactory; + private final ReadRepairFig readRepairFig; @Inject - public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + public EntityLoadVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory, + final GraphManagerFactory graphManagerFactory, + final ReadRepairFig readRepairFig) { this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.graphManagerFactory = graphManagerFactory; + this.readRepairFig = readRepairFig; } @@ -64,8 +75,9 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) { + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); final EntityCollectionManager entityCollectionManager = - entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() ); + entityCollectionManagerFactory.createCollectionManager( applicationScope ); //it's more efficient to make 1 network hop to get everything, then drop our results if required final Observable<FilterResult<Entity>> entityObservable = @@ -80,9 +92,10 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil .flatMap( ids -> entityCollectionManager.load( ids ) ); - //now we have a collection, validate our canidate set is correct. - - return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) ) + //now we have a collection, validate our candidate set is correct. + GraphManager graphManager = graphManagerFactory.createEdgeManager(applicationScope); + return entitySetObservable.map( entitySet -> new EntityVerifier( applicationScope, graphManager, + entitySet, bufferedIds, readRepairFig ) ) .doOnNext( entityCollector -> entityCollector.merge() ).flatMap( entityCollector -> Observable.from( entityCollector.getResults() ) ); } ); @@ -102,12 +115,20 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil private final List<FilterResult<Id>> candidateResults; private final EntitySet entitySet; + private final GraphManager graphManager; + private final ApplicationScope applicationScope; + private final ReadRepairFig readRepairFig; - public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) { + public EntityVerifier( final ApplicationScope applicationScope, final GraphManager graphManager, + final EntitySet entitySet, final List<FilterResult<Id>> candidateResults, + final ReadRepairFig readRepairFig) { + this.applicationScope = applicationScope; + this.graphManager = graphManager; this.entitySet = entitySet; this.candidateResults = candidateResults; this.results = new ArrayList<>( entitySet.size() ); + this.readRepairFig = readRepairFig; } @@ -137,11 +158,44 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil //doesn't exist warn and drop if ( entity == null || !entity.getEntity().isPresent() ) { - logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra." - + " Ignoring since this could be a region sync issue", candidateId ); + + // look for orphaned edges + String edgeTypeName = CpNamingUtils.getEdgeTypeFromCollectionName(Schema.defaultCollectionName(candidateId.getType())); + final SearchByEdge searchByEdge = + new SimpleSearchByEdge( applicationScope.getApplication(), edgeTypeName, candidateId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent() ); + + int edgesDeleted = 0; + List<MarkedEdge> edgeList = graphManager.loadEdgeVersions(searchByEdge).toList().toBlocking().last(); + if (edgeList.size() > 0) { + MarkedEdge firstEdge = edgeList.get(0); + long currentTimestamp = CpNamingUtils.createGraphOperationTimestamp(); + long edgeTimestamp = firstEdge.getTimestamp(); + long timestampDiff = currentTimestamp - edgeTimestamp; + long orphanDelaySecs = readRepairFig.getEdgeOrphanDelaySecs(); + // timestamps are in 100 nanoseconds, convert from seconds + long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L; + if (timestampDiff > allowedDiff) { + // edges must be orphans, delete edges + for (MarkedEdge edge: edgeList) { + MarkedEdge markedEdge = graphManager.markEdge(edge).toBlocking().lastOrDefault(null); + if (markedEdge != null) { + graphManager.deleteEdge(markedEdge).toBlocking().lastOrDefault(null); + edgesDeleted += 1; + } + } + } + } + + if (edgesDeleted > 0) { + logger.warn("Read graph edge and received candidate with entityId {}, yet was not found in cassandra." + + " Deleted {} edges.", candidateId, edgesDeleted); + } else { + logger.warn("Read graph edge and received candidate with entityId {}, yet was not found in cassandra." + + " Ignoring since this could be a region sync issue", candidateId); + } - //TODO trigger an audit after a fail count where we explicitly try to repair from other regions return; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java new file mode 100644 index 0000000..2f3e6e4 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadRepairFig.java @@ -0,0 +1,38 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.corepersistence.pipeline.read.traverse; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + +/** + * Read repair fig + */ +@FigSingleton +public interface ReadRepairFig extends GuicyFig { + + @Key( "usergrid.edge.orphan.delete.delay.secs" ) + @Default( "86400" ) // 1 day + long getEdgeOrphanDelaySecs(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/f4842b06/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java index c5f07b4..5812c6f 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java @@ -141,8 +141,9 @@ public class SearchRequestBuilderStrategy { //sort by the edge timestamp srb.addSort( SortBuilders.fieldSort( IndexingUtils.EDGE_TIMESTAMP_FIELDNAME ).order( SortOrder.DESC ) ); + // removing secondary sort by entity ID -- takes ES resources and provides no benefit //sort by the entity id if our times are equal - srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) ); + //srb.addSort( SortBuilders.fieldSort( IndexingUtils.ENTITY_ID_FIELDNAME ).order( SortOrder.ASC ) ); return; }