Changes implementation to update a connection to have the timestamp it was last POSTed.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/079ba976 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/079ba976 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/079ba976 Branch: refs/heads/USERGRID-933 Commit: 079ba9760f3e8d72dbf08610360d089b9ff62d47 Parents: 0c12abb Author: Todd Nine <tn...@apigee.com> Authored: Tue Sep 22 16:59:25 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Sep 22 16:59:25 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpRelationManager.java | 34 ++++++++++++-------- .../service/ConnectionServiceImpl.java | 5 +-- .../service/ConnectionServiceImplTest.java | 4 +-- 3 files changed, 26 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 1da6f75..21a2ee7 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; @@ -62,7 +61,6 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType; @@ -697,25 +695,35 @@ public class CpRelationManager implements RelationManager { final GraphManager gm = managerCache.getGraphManager( applicationScope ); - //check if the edge exists + //write new edge + gm.writeEdge( edge ).subscribe(); - final SearchByEdge searchByEdge = new SimpleSearchByEdge(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ); + indexService.queueNewEdge( applicationScope, targetEntity, edge ); - //only take 1 and count it. If we don't have anything, create the edge - final int count = gm.loadEdgeVersions( searchByEdge ).take( 1 ).count().toBlocking().last(); + //now read all older versions of an edge, and remove them. Finally calling delete + final SearchByEdge searchByEdge = + new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.absent() ); - if(count == 0) { - if(logger.isDebugEnabled()) { - logger.debug( "No edge exists between {} and {} of type {}. Creating", - new Object[] { edge.getSourceNode(), edge.getTargetNode(), edge.getType() } ); + + //load our versions, only retain the most recent one + gm.loadEdgeVersions( searchByEdge ).skip( 1 ).flatMap( edgeToDelete -> { + if ( logger.isDebugEnabled() ) { + logger.debug( "Marking edge {} for deletion", edgeToDelete ); + } + return gm.markEdge( edge ); + } ).lastOrDefault( null ).doOnNext( lastEdge -> { + //no op if we hit our default + if(lastEdge == null){ + return; } - gm.writeEdge( edge ).toBlocking().last(); + //queue up async processing + indexService.queueDeleteEdge( applicationScope, lastEdge ); + }).subscribe(); - indexService.queueNewEdge( applicationScope, targetEntity, edge ); - } return connection; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java index 1b36321..927d292 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java @@ -181,9 +181,10 @@ public class ConnectionServiceImpl implements ConnectionService { logger.debug( "Found edge {}, searching for multiple versions of edge", edge ); + //keep only the most recent final SearchByEdge searchByEdge = - new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), 0, - SearchByEdgeType.Order.ASCENDING, Optional.absent() ); + new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.absent() ); return gm.loadEdgeVersions( searchByEdge ) //skip the first version since it's the one we want to retain .skip( 1 ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/079ba976/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java index a2c18b1..326e128 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java @@ -196,7 +196,7 @@ public class ConnectionServiceImplTest { //check our oldest was deleted first assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() ); - assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() ); + assertEdgeData( written1, deletedConnections.get( 1 ).getEdge() ); assertEquals( "2 edges deleted", 2, deletedConnections.size() ); @@ -213,7 +213,7 @@ public class ConnectionServiceImplTest { assertEquals( 1, edges.size() ); - assertEquals( written1, edges.get( 0 ) ); + assertEquals( written3, edges.get( 0 ) ); }