added test, not working works end to end against rexster, neo1+rexster and titan+rexster but only for 5-10 vertexes and then BlueprintsPersistWriter no longer completes calls to write()
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5f133579 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5f133579 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5f133579 Branch: refs/heads/blueprints Commit: 5f133579de499cd629c190feb38da25e4a04ac87 Parents: 4db2797 Author: Steve Blackmon <[email protected]> Authored: Sun Oct 26 16:06:26 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Sun Oct 26 16:06:26 2014 -0500 ---------------------------------------------------------------------- .../streams-persist-blueprints/pom.xml | 19 +++- .../blueprints/BlueprintsPersistWriter.java | 103 +++++++++++-------- .../test/TestBlueprintsPersistWriter.java | 72 +++++++++++++ 3 files changed, 152 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-blueprints/pom.xml b/streams-contrib/streams-persist-blueprints/pom.xml index d8989f8..ee5cd73 100644 --- a/streams-contrib/streams-persist-blueprints/pom.xml +++ b/streams-contrib/streams-persist-blueprints/pom.xml @@ -12,16 +12,28 @@ <artifactId>streams-persist-blueprints</artifactId> <properties> - <tinkerpop.version>2.6.0</tinkerpop.version> + <tinkerpop.version>2.5.0</tinkerpop.version> </properties> <dependencies> <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-test</artifactId> + <version>${tinkerpop.version}</version> + <type>jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> <version>${project.version}</version> </dependency> <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> <version>${project.version}</version> @@ -53,6 +65,11 @@ <version>${tinkerpop.version}</version> <optional>true</optional> </dependency> + <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-core</artifactId> + <version>${tinkerpop.version}</version> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java index 82727ba..1b929db 100644 --- a/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java +++ b/streams-contrib/streams-persist-blueprints/src/main/java/org/apache/streams/blueprints/BlueprintsPersistWriter.java @@ -23,13 +23,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.tinkerpop.blueprints.Edge; -import com.tinkerpop.blueprints.Element; -import com.tinkerpop.blueprints.Graph; -import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.*; import com.tinkerpop.blueprints.impls.rexster.RexsterGraph; -import com.tinkerpop.rexster.client.RexsterClient; -import com.tinkerpop.rexster.client.RexsterClientFactory; +import com.tinkerpop.blueprints.util.wrappers.id.IdGraph; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; @@ -39,12 +35,9 @@ import org.apache.streams.pojo.json.ActivityObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.Iterator; import java.util.List; -import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -58,8 +51,8 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { private BlueprintsWriterConfiguration configuration; - protected RexsterClient client; - protected RexsterGraph graph; + protected Graph graph; + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis()); private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); @@ -77,18 +70,28 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { @Override public void write(StreamsDatum streamsDatum) { - persistElements(streamsDatum); + List<Element> affected; + try { + affected = persistElements(streamsDatum); + LOGGER.info("wrote datum - " + affected.size() + " elements affected"); + for( Element element : affected ) + element = null; + affected = null; + } catch( Throwable e ) { + LOGGER.warn(e.getMessage()); + } } @Override public void prepare(Object configurationObject) { - connectToGraph(); + LOGGER.info("initializing - " + configuration.toString()); - Preconditions.checkNotNull(client); + graph = connectToGraph(); Preconditions.checkNotNull(graph); - Preconditions.checkNotNull(graph.getGraphURI()); + + LOGGER.info("initialized - " + graph.toString()); } @@ -97,13 +100,7 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { graph.shutdown(); - try { - client.close(); - } catch (IOException e) { - e.printStackTrace(); - } finally { - client = null; - } + LOGGER.info("exiting"); } @@ -130,7 +127,7 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { // always add vertices first // what types of verbs are relevant for adding vertices? - if( configuration.getVertices().getVerbs().contains(activity.getVerb().toString())) { + if( configuration.getVertices().getVerbs().contains(activity.getVerb())) { // what objectTypes are relevant for adding vertices? if( configuration.getVertices().getObjectTypes().contains(activity.getActor().getObjectType())) { @@ -145,7 +142,7 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { // always add edges last // what types of verbs are relevant for adding edges? - if( configuration.getEdges().getVerbs().contains(activity.getVerb().toString())) { + if( configuration.getEdges().getVerbs().contains(activity.getVerb())) { // what objectTypes are relevant for adding edges? if( configuration.getEdges().getObjectTypes().contains(activity.getActor().getObjectType()) @@ -158,13 +155,12 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { return elements; } - private synchronized void connectToGraph() { + private Graph connectToGraph() { + + Graph graph; if( configuration.getType().equals(BlueprintsConfiguration.Type.REXSTER)) { try { - client = RexsterClientFactory.open( - configuration.getHost(), - configuration.getGraph()); StringBuilder uri = new StringBuilder() .append("http://") .append(configuration.getHost()) @@ -172,34 +168,59 @@ public class BlueprintsPersistWriter implements StreamsPersistWriter { .append(configuration.getPort()) .append("/graphs/") .append(configuration.getGraph()); - graph = new RexsterGraph(uri.toString()); - } catch (Exception e) { - LOGGER.error("ERROR: ", e.getMessage()); + KeyIndexableGraph graph1 = new RexsterGraph(uri.toString()); + graph = new IdGraph(graph1, true, false); + } catch (Throwable e) { + LOGGER.error("ERROR: " + e.getMessage()); + return null; } - return; + return graph; + } else { + return null; } } protected Vertex persistVertex(ActivityObject object) { - Iterator<Vertex> existing = graph.query().limit(1).has("id", object.getId()).vertices().iterator(); - if( !existing.hasNext()) { - Vertex vertex = graph.addVertex(object); + Preconditions.checkNotNull(object); + Preconditions.checkNotNull(object.getId()); + LOGGER.info("stream vertex: " + object.getId()); + Vertex existing = graph.getVertex(object.getId()); + if( existing == null ) { + LOGGER.info(object.getId() + " is new"); + Vertex vertex = null; + if( !Strings.isNullOrEmpty(object.getId()) ) { + vertex = graph.addVertex(object.getId()); + if (vertex != null) { + if (!Strings.isNullOrEmpty(object.getDisplayName())) + vertex.setProperty("displayName", object.getDisplayName()); + if (!Strings.isNullOrEmpty(object.getObjectType())) + vertex.setProperty("objectType", object.getObjectType()); + if (!Strings.isNullOrEmpty(object.getUrl())) + vertex.setProperty("url", object.getUrl()); + LOGGER.info(vertex.toString()); + } else { + LOGGER.warn("Vertex null after add"); + } + } else { + LOGGER.warn("Can't persist vertex without id"); + } return vertex; } else { - return existing.next(); + LOGGER.info(object.getId() + " already exists"); + return existing; } } protected Edge persistEdge(Activity activity) { - Iterator<Edge> existing = graph.query().limit(1).has("id", activity.getId()).edges().iterator(); - if( !existing.hasNext()) { + Edge existing = graph.getEdge(activity.getId()); + if( existing == null ) { Vertex s = persistVertex(activity.getActor()); Vertex d = persistVertex(activity.getObject()); Edge edge = graph.addEdge(activity, s, d, activity.getVerb()); return edge; } else { - return existing.next(); + return existing; } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5f133579/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java b/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java new file mode 100644 index 0000000..fc4118f --- /dev/null +++ b/streams-contrib/streams-persist-blueprints/src/test/java/org/apache/streams/elasticsearch/test/TestBlueprintsPersistWriter.java @@ -0,0 +1,72 @@ +package org.apache.streams.elasticsearch.test; + +import com.tinkerpop.blueprints.Graph; +import com.tinkerpop.blueprints.impls.GraphTest; +import com.tinkerpop.blueprints.impls.tg.TinkerGraph; +import org.apache.streams.blueprints.BlueprintsPersistWriter; +import org.apache.streams.blueprints.BlueprintsWriterConfiguration; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; +import org.apache.streams.pojo.json.Actor; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Created by sblackmon on 10/20/14. + */ +@Ignore +public class TestBlueprintsPersistWriter extends GraphTest { + + private final String TEST_INDEX = "TestBlueprintsPersistWriter".toLowerCase(); + + private BlueprintsWriterConfiguration testConfiguration; + + private BlueprintsPersistWriter testPersistWriter; + + private Graph graph = generateGraph(); + + @Test + public void testActivity1() { + + assert(graph.getVertices().iterator().hasNext() == false); + + Activity testActivity1 = + new Activity() + .withId("activityid"); + Actor testActor1 = new Actor(); + testActor1.setId("actorid"); + testActivity1.setActor(testActor1); + ActivityObject testObject1 = new ActivityObject(); + testObject1.setId("objectid"); + testActivity1.setObject(testObject1); + + testPersistWriter.write(new StreamsDatum( + testActivity1)); + testPersistWriter.cleanUp(); + + assert(graph.getVertices().iterator().hasNext() == true); + } + + @Override + public Graph generateGraph() { + return new TinkerGraph(); + } + + @Override + public Graph generateGraph(String s) { + return new TinkerGraph(); + } + + @Override + public void doTestSuite(com.tinkerpop.blueprints.TestSuite testSuite) throws Exception { + + testConfiguration = new BlueprintsWriterConfiguration(); + testConfiguration.setHost("localhost"); + + testPersistWriter = new BlueprintsPersistWriter(testConfiguration); + + testPersistWriter.prepare(null); + + } +}
