Initial testing with guice wiring for test

Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b625ed24
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b625ed24
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b625ed24

Branch: refs/heads/USERGRID-933
Commit: b625ed24a68d9c71fc30694c5dbeda26f1afb4d7
Parents: 2faa805
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Sep 14 17:33:38 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Sep 14 18:14:00 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  23 ++-
 .../asyncevents/AsyncEventService.java          |   2 +
 .../builder/PipelineBuilderFactory.java         |   3 +
 .../service/ConnectionScope.java                |  53 +++++
 .../service/ConnectionService.java              |  10 +
 .../service/ConnectionServiceImpl.java          |  65 +++++-
 .../corepersistence/util/CpNamingUtils.java     |  13 +-
 .../corepersistence/TestCoreModule.java         |  34 ++++
 .../service/ConnectionServiceImplTest.java      | 198 +++++++++++++++++++
 stack/core/src/test/resources/log4j.properties  |   3 +-
 .../impl/TargetIdObservableImpl.java            |  13 +-
 11 files changed, 397 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 0c3b927..1da6f75 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.index.EntityIndex;
@@ -694,9 +695,27 @@ public class CpRelationManager implements RelationManager {
         final Edge edge = createConnectionEdge( cpHeadEntity.getId(), 
connectionType, targetEntity.getId() );
 
         final GraphManager gm = managerCache.getGraphManager( applicationScope 
);
-        gm.writeEdge( edge ).toBlocking().last();
 
-        indexService.queueNewEdge( applicationScope, targetEntity, edge );
+
+        //check if the edge exists
+
+
+        final SearchByEdge searchByEdge = new 
SimpleSearchByEdge(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), 
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()  );
+
+
+        //only take 1 and count it.  If we don't have anything, create the edge
+        final int count = gm.loadEdgeVersions( searchByEdge ).take( 1 
).count().toBlocking().last();
+
+        if(count == 0) {
+            if(logger.isDebugEnabled()) {
+                logger.debug( "No edge exists between {} and {} of type {}.  
Creating",
+                    new Object[] { edge.getSourceNode(), edge.getTargetNode(), 
edge.getType() } );
+            }
+
+            gm.writeEdge( edge ).toBlocking().last();
+
+            indexService.queueNewEdge( applicationScope, targetEntity, edge );
+        }
 
         return connection;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 6d51679..ae5688c 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -54,6 +54,8 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      * Fired when a new edge is added to an entity. Such as initial entity 
creation, adding to a collection, or creating a connection
      *
+     * TODO: We shouldn't take an entity here, only the id.  It doesn't make 
sense in a distributed context to pass the entity
+     *
      * @param applicationScope
      * @param entity
      * @param newEdge

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
index 6cb515b..ada1779 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilderFactory.java
@@ -23,6 +23,9 @@ package org.apache.usergrid.corepersistence.pipeline.builder;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 
+/**
+ * Factory interface for creating our pipeline builders
+ */
 public interface PipelineBuilderFactory {
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionScope.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionScope.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionScope.java
new file mode 100644
index 0000000..3cc9ae5
--- /dev/null
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionScope.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence.service;
+
+
+import java.io.Serializable;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Tuple containing our application scope, and an edge within that 
applications' graph
+ */
+public class ConnectionScope implements Serializable {
+    private final Edge edge;
+    private final ApplicationScope applicationScope;
+
+
+
+
+    public ConnectionScope( ApplicationScope applicationScope, final Edge edge 
) {
+        this.edge = edge;
+        this.applicationScope = applicationScope;
+    }
+
+
+    public Edge getEdge() {
+        return edge;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
index 71a25c9..539c881 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionService.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.service;
 
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -46,4 +47,13 @@ public interface ConnectionService {
      * @return
      */
     Observable<ResultsPage<ConnectionRef>> searchConnectionAsRefs( final 
ConnectionSearch search );
+
+
+    /**
+     * An observable that will remove duplicate edges from the graph that 
represent connections.  All emitted scopes are scopes that have been deleted.
+     *
+     * @param applicationScopeObservable
+     * @return
+     */
+    Observable<ConnectionScope> deDupeConnections(final 
Observable<ApplicationScope> applicationScopeObservable);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index c7e0fee..82c1038 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -22,7 +22,18 @@ import 
org.apache.usergrid.corepersistence.pipeline.builder.EntityBuilder;
 import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
 import 
org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectionRef;
+import 
org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -37,11 +48,17 @@ import rx.Observable;
 public class ConnectionServiceImpl implements ConnectionService {
 
     private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final AllEntityIdsObservable allEntityIdsObservable;
+    private final GraphManagerFactory graphManagerFactory;
 
 
     @Inject
-    public ConnectionServiceImpl( final PipelineBuilderFactory 
pipelineBuilderFactory ) {
+    public ConnectionServiceImpl( final PipelineBuilderFactory 
pipelineBuilderFactory,
+                                  final AllEntityIdsObservable 
allEntityIdsObservable,
+                                  final GraphManagerFactory 
graphManagerFactory ) {
         this.pipelineBuilderFactory = pipelineBuilderFactory;
+        this.allEntityIdsObservable = allEntityIdsObservable;
+        this.graphManagerFactory = graphManagerFactory;
     }
 
 
@@ -117,4 +134,50 @@ public class ConnectionServiceImpl implements 
ConnectionService {
 
         return results;
     }
+
+
+    @Override
+    public Observable<ConnectionScope> deDupeConnections(
+        final Observable<ApplicationScope> applicationScopeObservable ) {
+
+
+        final Observable<EntityIdScope> entityIds 
=allEntityIdsObservable.getEntities( applicationScopeObservable );
+        //now we have an observable of entityIds.  Walk each connection type
+
+        //get all edge types for connections
+       return  entityIds.flatMap( entityIdScope -> {
+
+            final ApplicationScope applicationScope = 
entityIdScope.getApplicationScope();
+            final Id entityId = entityIdScope.getId();
+
+            final GraphManager gm = 
graphManagerFactory.createEdgeManager(applicationScope );
+
+            return gm.getEdgeTypesFromSource(
+                new SimpleSearchEdgeType( entityId, 
CpNamingUtils.EDGE_CONN_PREFIX, Optional.absent() ) )
+
+                //now load all edges from this node of this type
+                .flatMap( edgeType -> {
+                    final SearchByEdgeType searchByEdge =
+                        new SimpleSearchByEdgeType( entityId, edgeType, 
Long.MAX_VALUE,
+                            SearchByEdgeType.Order.ASCENDING, 
Optional.absent() );
+
+                    //load edges from the source the with type specified
+                    return gm.loadEdgesFromSource( searchByEdge );
+                } )
+
+                //now that we have a stream of edges, stream all versions
+                .flatMap( edge -> {
+                    final SearchByEdge searchByEdge =
+                        new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(),
+                            Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, 
Optional.absent() );
+                    return gm.loadEdgeVersions( searchByEdge );
+                } )
+
+            //skip the first version since it's the one we want to retain
+            // validate there is only 1 version of it, delete anything > than 
the min
+                .skip( 1 )
+                .flatMap( edgeToDelete -> gm.deleteEdge( edgeToDelete ) )
+                .map(deletedEdge ->  new ConnectionScope( applicationScope, 
deletedEdge ) ) ;
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index f32e9d7..55bc49a 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,8 +21,6 @@ package org.apache.usergrid.corepersistence.util;
 
 import java.util.UUID;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
@@ -42,7 +40,8 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.base.Optional;
 
 
 /**
@@ -257,6 +256,7 @@ public class CpNamingUtils {
     }
 
 
+
     /**
      * Get the application scope from the given uuid
      *
@@ -309,6 +309,8 @@ public class CpNamingUtils {
     }
 
 
+
+
     private static boolean isCollectionEdgeType( String type ) {
         return type.startsWith( EDGE_COLL_PREFIX );
     }
@@ -326,8 +328,5 @@ public class CpNamingUtils {
     }
 
 
-    public static boolean isApplication(Id id) {
-        Preconditions.checkNotNull(id);
-        return id.getType().equals("application");
-    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
new file mode 100644
index 0000000..32b9845
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import org.apache.usergrid.persistence.core.guice.TestModule;
+
+
+/**
+ * Test guice module for our core guice configuration
+ */
+public class TestCoreModule extends TestModule {
+
+    @Override
+    protected void configure() {
+
+        install( new CoreModule() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
new file mode 100644
index 0000000..0a29286
--- /dev/null
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import java.util.List;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.corepersistence.TestCoreModule;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+
+@RunWith( ITRunner.class )
+@UseModules( TestCoreModule.class )
+public class ConnectionServiceImplTest {
+
+    @Inject
+    private GraphManagerFactory graphManagerFactory;
+
+    @Inject
+    private ConnectionService connectionService;
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Test
+    public void testSingleConnection() {
+
+        final ApplicationScope applicationScope = new ApplicationScopeImpl( 
new SimpleId( "application" ) );
+
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
+
+        //now write a single connection
+
+        final Id source = new SimpleId( "source" );
+
+
+        //add to a collection
+
+        final String collectionName = "testCollection";
+
+        final Edge collectionEdge = CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), collectionName, source );
+
+        final Edge writtenCollection = gm.writeEdge( collectionEdge 
).toBlocking().last();
+
+        assertNotNull("Collection edge written", writtenCollection);
+
+
+
+
+        final Id target = new SimpleId( "target" );
+
+        final String connectionType = "testConnection";
+
+        final Edge connectionEdge = CpNamingUtils.createConnectionEdge( 
source, connectionType, target );
+
+        final Edge writtenConnection = gm.writeEdge( connectionEdge 
).toBlocking().last();
+
+
+        //now run the cleanup
+
+        final int count =
+            connectionService.deDupeConnections( Observable.just( 
applicationScope ) ).count().toBlocking().last();
+
+        assertEquals( "No edges deleted", 0, count );
+
+        //now ensure we can read the edge.
+
+        final SearchByEdge simpleSearchByEdge =
+            new SimpleSearchByEdge( source, connectionEdge.getType(), target, 
Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.absent() );
+
+        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge 
).toList().toBlocking().last();
+
+        assertEquals( 1, edges.size() );
+
+        assertEquals( writtenConnection, edges.get( 0 ) );
+    }
+
+
+    @Test
+    public void testDuplicateConnections() {
+
+        final ApplicationScope applicationScope = new ApplicationScopeImpl( 
new SimpleId( "application" ) );
+
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
+
+        //now write a single connection
+
+        final Id source = new SimpleId( "source" );
+
+        final Id target = new SimpleId( "target" );
+
+        final String connectionType = "testConnection";
+
+
+          //add to a collection
+
+        final String collectionName = "testCollection";
+
+        final Edge collectionEdge = CpNamingUtils.createCollectionEdge( 
applicationScope.getApplication(), collectionName, source );
+
+        final Edge writtenCollection = gm.writeEdge( collectionEdge 
).toBlocking().last();
+
+        assertNotNull("Collection edge written", writtenCollection);
+
+        //write our first connection
+        final Edge connection1 = CpNamingUtils.createConnectionEdge( source, 
connectionType, target );
+
+        final Edge written1 = gm.writeEdge( connection1 ).toBlocking().last();
+
+
+        //write the second
+        final Edge connection2 = CpNamingUtils.createConnectionEdge( source, 
connectionType, target );
+
+        final Edge written2 = gm.writeEdge( connection2 ).toBlocking().last();
+
+
+        //write the 3rd
+        final Edge connection3 = CpNamingUtils.createConnectionEdge( source, 
connectionType, target );
+
+        final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last();
+
+
+
+
+        //now run the cleanup
+
+        final List<ConnectionScope> deletedConnections =
+            connectionService.deDupeConnections( Observable.just( 
applicationScope ) ).toList().toBlocking().last();
+
+        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
+
+        //check our oldest was deleted first
+
+        assertEquals(written2, deletedConnections.get( 0 ));
+
+        assertEquals(written3, deletedConnections.get( 1 ));
+
+
+
+        //now ensure we can read the edge.
+
+        final SearchByEdge simpleSearchByEdge =
+            new SimpleSearchByEdge( source, connection1.getType(), target, 
Long.MAX_VALUE,
+                SearchByEdgeType.Order.DESCENDING, Optional.absent() );
+
+        //check only 1 exists
+        final List<Edge> edges = gm.loadEdgeVersions( simpleSearchByEdge 
).toList().toBlocking().last();
+
+        assertEquals( 1, edges.size() );
+
+        assertEquals( written1, edges.get( 0 ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties 
b/stack/core/src/test/resources/log4j.properties
index dfcb260..dd29671 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -18,7 +18,8 @@
 # and the pattern to %c instead of %l.  (%l is slower.)
 
 # output messages into a rolling log file as well as stdout
-log4j.rootLogger=ERROR,stdout
+#log4j.rootLogger=ERROR,stdout\
+log4j.rootLogger=DEBUG,stdout
 
 # stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b625ed24/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 82c7d54..bd8cd3c 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -55,18 +55,13 @@ public class TargetIdObservableImpl implements 
TargetIdObservable {
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id 
sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceDescending( gm, 
sourceNode ).map( new Func1<Edge, Id>() {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, 
sourceNode ).map( edge -> {
+            final Id targetNode = edge.getTargetNode();
 
+            logger.debug( "Emitting targetId of {}", edge );
 
-            @Override
-            public Id call( final Edge edge ) {
-                final Id targetNode = edge.getTargetNode();
 
-                logger.debug( "Emitting targetId of {}", edge );
-
-
-                return targetNode;
-            }
+            return targetNode;
         } );
     }
 }

Reply via email to