added test, not working
works end to end against rexster, neo1+rexster and titan+rexster
  but only for 5-10 vertexes and then BlueprintsPersistWriter no longer 
completes calls to write()


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5f133579
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5f133579
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5f133579

Branch: refs/heads/blueprints
Commit: 5f133579de499cd629c190feb38da25e4a04ac87
Parents: 4db2797
Author: Steve Blackmon <[email protected]>
Authored: Sun Oct 26 16:06:26 2014 -0500
Committer: Steve Blackmon <[email protected]>
Committed: Sun Oct 26 16:06:26 2014 -0500

----------------------------------------------------------------------
 .../streams-persist-blueprints/pom.xml          |  19 +++-
 .../blueprints/BlueprintsPersistWriter.java     | 103 +++++++++++--------
 .../test/TestBlueprintsPersistWriter.java       |  72 +++++++++++++
 3 files changed, 152 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-blueprints/pom.xml 
b/streams-contrib/streams-persist-blueprints/pom.xml
index d8989f8..ee5cd73 100644
--- a/streams-contrib/streams-persist-blueprints/pom.xml
+++ b/streams-contrib/streams-persist-blueprints/pom.xml
@@ -12,16 +12,28 @@
     <artifactId>streams-persist-blueprints</artifactId>
 
     <properties>
-        <tinkerpop.version>2.6.0</tinkerpop.version>
+        <tinkerpop.version>2.5.0</tinkerpop.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>com.tinkerpop.blueprints</groupId>
+            <artifactId>blueprints-test</artifactId>
+            <version>${tinkerpop.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-core</artifactId>
             <version>${project.version}</version>
@@ -53,6 +65,11 @@
             <version>${tinkerpop.version}</version>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>com.tinkerpop.blueprints</groupId>
+            <artifactId>blueprints-core</artifactId>
+            <version>${tinkerpop.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
index 82727ba..1b929db 100644
--- 
a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
+++ 
b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java
@@ -23,13 +23,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
-import com.tinkerpop.blueprints.Edge;
-import com.tinkerpop.blueprints.Element;
-import com.tinkerpop.blueprints.Graph;
-import com.tinkerpop.blueprints.Vertex;
+import com.tinkerpop.blueprints.*;
 import com.tinkerpop.blueprints.impls.rexster.RexsterGraph;
-import com.tinkerpop.rexster.client.RexsterClient;
-import com.tinkerpop.rexster.client.RexsterClientFactory;
+import com.tinkerpop.blueprints.util.wrappers.id.IdGraph;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
@@ -39,12 +35,9 @@ import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -58,8 +51,8 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
 
     private BlueprintsWriterConfiguration configuration;
 
-    protected RexsterClient client;
-    protected RexsterGraph graph;
+    protected Graph graph;
+
     private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
     private volatile AtomicLong lastWrite = new 
AtomicLong(System.currentTimeMillis());
     private ScheduledExecutorService backgroundFlushTask = 
Executors.newSingleThreadScheduledExecutor();
@@ -77,18 +70,28 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        persistElements(streamsDatum);
+        List<Element> affected;
+        try {
+            affected = persistElements(streamsDatum);
+            LOGGER.info("wrote datum - " + affected.size() + " elements 
affected");
+            for( Element element : affected )
+                element = null;
+            affected = null;
+        } catch( Throwable e ) {
+            LOGGER.warn(e.getMessage());
+        }
     }
 
     @Override
     public void prepare(Object configurationObject) {
 
-        connectToGraph();
+        LOGGER.info("initializing - " + configuration.toString());
 
-        Preconditions.checkNotNull(client);
+        graph = connectToGraph();
 
         Preconditions.checkNotNull(graph);
-        Preconditions.checkNotNull(graph.getGraphURI());
+
+        LOGGER.info("initialized - " + graph.toString());
 
     }
 
@@ -97,13 +100,7 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
 
         graph.shutdown();
 
-        try {
-            client.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            client = null;
-        }
+        LOGGER.info("exiting");
 
     }
 
@@ -130,7 +127,7 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
 
         // always add vertices first
         // what types of verbs are relevant for adding vertices?
-        if( 
configuration.getVertices().getVerbs().contains(activity.getVerb().toString())) 
{
+        if( 
configuration.getVertices().getVerbs().contains(activity.getVerb())) {
 
             // what objectTypes are relevant for adding vertices?
             if( 
configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType()))
 {
@@ -145,7 +142,7 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
 
         // always add edges last
         // what types of verbs are relevant for adding edges?
-        if( 
configuration.getEdges().getVerbs().contains(activity.getVerb().toString())) {
+        if( configuration.getEdges().getVerbs().contains(activity.getVerb())) {
 
             // what objectTypes are relevant for adding edges?
             if( 
configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType())
@@ -158,13 +155,12 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
         return elements;
     }
 
-    private synchronized void connectToGraph() {
+    private Graph connectToGraph() {
+
+        Graph graph;
 
         if( 
configuration.getType().equals(BlueprintsConfiguration.Type.REXSTER)) {
             try {
-                client = RexsterClientFactory.open(
-                        configuration.getHost(),
-                        configuration.getGraph());
                 StringBuilder uri = new StringBuilder()
                         .append("http://";)
                         .append(configuration.getHost())
@@ -172,34 +168,59 @@ public class BlueprintsPersistWriter implements 
StreamsPersistWriter {
                         .append(configuration.getPort())
                         .append("/graphs/")
                         .append(configuration.getGraph());
-                graph = new RexsterGraph(uri.toString());
-            } catch (Exception e) {
-                LOGGER.error("ERROR: ", e.getMessage());
+                KeyIndexableGraph graph1 = new RexsterGraph(uri.toString());
+                graph = new IdGraph(graph1, true, false);
+            } catch (Throwable e) {
+                LOGGER.error("ERROR: " + e.getMessage());
+                return null;
             }
-            return;
+            return graph;
 
+        } else {
+            return null;
         }
     }
 
     protected Vertex persistVertex(ActivityObject object) {
-        Iterator<Vertex> existing = graph.query().limit(1).has("id", 
object.getId()).vertices().iterator();
-        if( !existing.hasNext()) {
-            Vertex vertex = graph.addVertex(object);
+        Preconditions.checkNotNull(object);
+        Preconditions.checkNotNull(object.getId());
+        LOGGER.info("stream vertex: " + object.getId());
+        Vertex existing = graph.getVertex(object.getId());
+        if( existing == null ) {
+            LOGGER.info(object.getId() + " is new");
+            Vertex vertex = null;
+            if( !Strings.isNullOrEmpty(object.getId()) ) {
+                vertex = graph.addVertex(object.getId());
+                if (vertex != null) {
+                    if (!Strings.isNullOrEmpty(object.getDisplayName()))
+                        vertex.setProperty("displayName", 
object.getDisplayName());
+                    if (!Strings.isNullOrEmpty(object.getObjectType()))
+                        vertex.setProperty("objectType", 
object.getObjectType());
+                    if (!Strings.isNullOrEmpty(object.getUrl()))
+                        vertex.setProperty("url", object.getUrl());
+                    LOGGER.info(vertex.toString());
+                } else {
+                    LOGGER.warn("Vertex null after add");
+                }
+            } else {
+                LOGGER.warn("Can't persist vertex without id");
+            }
             return vertex;
         } else {
-            return existing.next();
+            LOGGER.info(object.getId() + " already exists");
+            return existing;
         }
     }
 
     protected Edge persistEdge(Activity activity) {
-        Iterator<Edge> existing = graph.query().limit(1).has("id", 
activity.getId()).edges().iterator();
-        if( !existing.hasNext()) {
+        Edge existing = graph.getEdge(activity.getId());
+        if( existing == null ) {
             Vertex s = persistVertex(activity.getActor());
             Vertex d = persistVertex(activity.getObject());
             Edge edge = graph.addEdge(activity, s, d, activity.getVerb());
             return edge;
         } else {
-            return existing.next();
+            return existing;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java
 
b/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java
new file mode 100644
index 0000000..fc4118f
--- /dev/null
+++ 
b/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java
@@ -0,0 +1,72 @@
+package org.apache.streams.elasticsearch.test;
+
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.impls.GraphTest;
+import com.tinkerpop.blueprints.impls.tg.TinkerGraph;
+import org.apache.streams.blueprints.BlueprintsPersistWriter;
+import org.apache.streams.blueprints.BlueprintsWriterConfiguration;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+@Ignore
+public class TestBlueprintsPersistWriter extends GraphTest {
+
+    private final String TEST_INDEX = 
"TestBlueprintsPersistWriter".toLowerCase();
+
+    private BlueprintsWriterConfiguration testConfiguration;
+
+    private BlueprintsPersistWriter testPersistWriter;
+
+    private Graph graph = generateGraph();
+
+    @Test
+    public void testActivity1() {
+
+        assert(graph.getVertices().iterator().hasNext() == false);
+
+        Activity testActivity1 =
+                new Activity()
+                        .withId("activityid");
+        Actor testActor1 = new Actor();
+        testActor1.setId("actorid");
+        testActivity1.setActor(testActor1);
+        ActivityObject testObject1 = new ActivityObject();
+        testObject1.setId("objectid");
+        testActivity1.setObject(testObject1);
+
+        testPersistWriter.write(new StreamsDatum(
+                testActivity1));
+        testPersistWriter.cleanUp();
+
+        assert(graph.getVertices().iterator().hasNext() == true);
+    }
+
+    @Override
+    public Graph generateGraph() {
+        return new TinkerGraph();
+    }
+
+    @Override
+    public Graph generateGraph(String s) {
+        return new TinkerGraph();
+    }
+
+    @Override
+    public void doTestSuite(com.tinkerpop.blueprints.TestSuite testSuite) 
throws Exception {
+
+        testConfiguration = new BlueprintsWriterConfiguration();
+        testConfiguration.setHost("localhost");
+
+        testPersistWriter = new BlueprintsPersistWriter(testConfiguration);
+
+        testPersistWriter.prepare(null);
+
+    }
+}

Reply via email to