jenkins-bot has submitted this change and it was merged.

Change subject: Multi-threaded dump loader
......................................................................


Multi-threaded dump loader

This is a "simpler" dump loader that downloads the wikidata dump, creates
the properties, and loads the data in multiple threads, and sets the last
update time so the recent changes poller can pick it up.  It also supports
loading onto multiple Titan servers, pausing if the Titan server stops,
and resuming the load at some line offset.

Also updates to Tinkerpop 3.0.0M7 because that doesn't leak transactions.

Also changes ./servers.sh from running a tinkergraph server to a titan
server.  This is much better for testing loading.

Change-Id: I4c784bb4110d002c85f482ed6bc3b144b247ef4d
---
M .gitignore
M README.md
M ext/plugins.txt
M pom.xml
M src/build/checkstyle.xml
M src/main/groovy/org/wikidata/gremlin/ConsoleInit.groovy
D src/main/groovy/org/wikidata/gremlin/DataLoader.groovy
M src/main/groovy/org/wikidata/gremlin/Loader.groovy
A src/main/groovy/org/wikidata/gremlin/tool/ClientScript.groovy
A src/main/groovy/org/wikidata/gremlin/tool/DumpLoader.groovy
R src/main/groovy/org/wikidata/gremlin/tool/RecentChangesPoller.groovy
M src/main/java/org/wikidata/gremlin/DomainSpecificLanguageTraversal.java
A src/main/java/org/wikidata/gremlin/HasGraph.java
M src/main/java/org/wikidata/gremlin/IteratorUtils.java
M src/main/java/org/wikidata/gremlin/LoadingTraversal.java
M src/main/java/org/wikidata/gremlin/WikidataTraversal.java
M src/test/resources/gremlin-server.yaml
M src/test/resources/remote.yaml
A src/test/resources/titan-cassandra-localhost.properties
19 files changed, 694 insertions(+), 437 deletions(-)

Approvals:
  Smalyshev: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/.gitignore b/.gitignore
index f1159f2..b4d1381 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,4 @@
 .settings
 dependency-reduced-pom.xml
 wikidata-gremlin.log
+work
diff --git a/README.md b/README.md
index 624aaaf..752f1a9 100644
--- a/README.md
+++ b/README.md
@@ -36,9 +36,15 @@
 :> 
g.of(WikidataTraversal.class).ensureSchema().wd('Q23').wd('Q91').out('P509').unstub().properties('labelEn')
 ```
 
-You can now setup the update feed by running
+Optionally you can load a dump of wikidata by running
 ```bash
 mvn package
-java -jar target/wikidata-gremlin-poller-0.0.9-SNAPSHOT.jar
+java -cp target/wikidata-gremlin-tools-0.0.9-SNAPSHOT.jar 
org.wikidata.gremlin.tool.DumpLoader
 ```
-which will start listening for all changes starting now.
+
+You can also start updating from wikidata by running
+```bash
+mvn package
+java -cp target/wikidata-gremlin-tools-0.0.9-SNAPSHOT.jar 
org.wikidata.gremlin.tool.RecentChangesPoller
+```
+which will start listening for all changes.
diff --git a/ext/plugins.txt b/ext/plugins.txt
index 6c72578..05aa5d8 100644
--- a/ext/plugins.txt
+++ b/ext/plugins.txt
@@ -1,4 +1,3 @@
 org.wikidata.gremlin.GremlinPlugin
-com.tinkerpop.gremlin.console.plugin.DriverGremlinPlugin
-com.tinkerpop.gremlin.console.plugin.UtilitiesGremlinPlugin
-com.tinkerpop.gremlin.tinkergraph.groovy.plugin.TinkerGraphGremlinPlugin
+com.tinkerpop.gremlin.console.groovy.plugin.DriverGremlinPlugin
+com.thinkaurelius.titan.graphdb.tinkerpop.plugin.TitanGremlinPlugin
diff --git a/pom.xml b/pom.xml
index 6babf65..ae021e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,8 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <gremlin.version>3.0.0.M6</gremlin.version>
+        <gremlin.version>3.0.0.M7</gremlin.version>
+        <titan.version>0.9.0-wmf1</titan.version> <!-- This version is hand 
built from -->
     </properties>
     <organization>
         <name>Wikimedia Foundation</name>
@@ -44,6 +45,13 @@
         
<developerConnection>scm:git:ht...@gerrit.wikimedia.org:r/wikidata/gremlin</developerConnection>
         
<url>https://gerrit.wikimedia.org/r/#/admin/projects/wikidata/gremlin</url>
     </scm>
+    <repositories>
+        <!-- Temporary repository in case we depend on things not released to 
central. -->
+        <repository>
+            <id>wmf</id>
+            <url>http://archiva.wikimedia.org/repository/mirrored/</url>
+        </repository>
+    </repositories>
     <dependencies>
         <dependency>
             <groupId>com.tinkerpop</groupId>
@@ -68,7 +76,7 @@
         <dependency>
             <groupId>com.thinkaurelius.titan</groupId>
             <artifactId>titan-core</artifactId>
-            <version>0.9.0-M1</version>
+            <version>${titan.version}</version>
         </dependency>
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
@@ -117,6 +125,43 @@
             <artifactId>gremlin-server</artifactId>
             <version>${gremlin.version}</version>
             <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.tinkerpop</groupId>
+            <artifactId>tinkergraph-gremlin</artifactId>
+            <version>${gremlin.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-cassandra</artifactId>
+            <version>${titan.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- This _shouldn't_ be required but the Titan plugin lives here, 
probably temporarily -->
+        <dependency>
+            <groupId>com.thinkaurelius.titan</groupId>
+            <artifactId>titan-all</artifactId>
+            <version>${titan.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.thinkaurelius.titan</groupId>
+                    <artifactId>titan-berkeleyje</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.thinkaurelius.titan</groupId>
+                    <artifactId>titan-hbase</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.thinkaurelius.titan</groupId>
+                    <artifactId>titan-hadoop</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.thinkaurelius.titan</groupId>
+                    <artifactId>titan-lucene</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
     <build>
@@ -174,13 +219,13 @@
                 <version>2.3</version>
                 <executions>
                     <execution>
-                        <id>make-assembly</id>
+                        <id>make-tools</id>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
                         <configuration>
-                            
<finalName>wikidata-gremlin-poller-${project.version}</finalName>
+                            
<finalName>wikidata-gremlin-tools-${project.version}</finalName>
                             <artifactSet>
                                 <includes>
                                     <include>commons-cli:commons-cli</include>
@@ -209,12 +254,6 @@
                                     
<include>xml-resolver:xml-resolver</include>
                                 </includes>
                             </artifactSet>
-                            <transformers>
-                                <transformer
-                                
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    
<mainClass>org.wikidata.gremlin.RecentChangesPoller</mainClass>
-                                </transformer>
-                            </transformers>
                         </configuration>
                     </execution>
                 </executions>
@@ -354,38 +393,6 @@
                                         <argument>-classpath</argument>
                                         <classpath/>
                                         
<argument>com.tinkerpop.gremlin.console.Console</argument>
-                                    </arguments>
-                                </configuration>
-                            </execution>
-                        </executions>
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-        <profile>
-            <id>poll</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.codehaus.mojo</groupId>
-                        <artifactId>exec-maven-plugin</artifactId>
-                        <version>1.3.2</version>
-                        <executions>
-                            <execution>
-                                <id>console</id>
-                                <phase>test</phase>
-                                <goals>
-                                    <!-- The exec:java goal fail with 
GROOVY-6951.  No idea why.
-                                         This is better anyway because we have 
more control. -->
-                                    <goal>exec</goal>
-                                </goals>
-                                <configuration>
-                                    <executable>java</executable>
-                                    <longClasspath>true</longClasspath>
-                                    <arguments>
-                                        <argument>-classpath</argument>
-                                        <classpath/>
-                                        
<argument>org.wikidata.gremlin.RecentChangesPoller</argument>
                                     </arguments>
                                 </configuration>
                             </execution>
diff --git a/src/build/checkstyle.xml b/src/build/checkstyle.xml
index 58a6c68..465a8e1 100644
--- a/src/build/checkstyle.xml
+++ b/src/build/checkstyle.xml
@@ -99,7 +99,6 @@
     <!-- Common coding problems/opinionates stuff -->
     <module name="CovariantEquals"/>
     <module name="EqualsHashCode"/>
-    <module name="HiddenField"/> <!-- Rarely the right thing to do -->
     <module name="InnerAssignment"/> <!-- Too suprising -->
     <module name="MissingSwitchDefault"/> <!-- Just comment that its a noop if 
its a noop -->
     <module name="ModifiedControlVariable"/>
@@ -133,6 +132,7 @@
     <!--
     <module name="ParameterAssignment"/> Usually used when munging parameters. 
 Fine.
     <module name="TrailingComment"/>
+    <module name="HiddenField"/> It turns out that we do this all the time in 
setters and constructors.
     -->
   </module>
 
diff --git a/src/main/groovy/org/wikidata/gremlin/ConsoleInit.groovy 
b/src/main/groovy/org/wikidata/gremlin/ConsoleInit.groovy
index 3ef0071..49d8a66 100644
--- a/src/main/groovy/org/wikidata/gremlin/ConsoleInit.groovy
+++ b/src/main/groovy/org/wikidata/gremlin/ConsoleInit.groovy
@@ -17,8 +17,6 @@
 import groovy.lang.Closure
 import groovy.util.logging.Slf4j
 
-import com.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
-
 @Slf4j
 class ConsoleInit extends RexsterInit {
   // TODO Do we need this in Tinkerpop 3 at all?
@@ -67,7 +65,8 @@
     if(!script.g) {
       try {
         log.info "Initializing using in memory graph"
-        script.g = new TinkerGraph('wikidata', TinkerGraph.FileType.GRAPHSON)
+        // Use reflection so we don't need to depend on TinkerGraph which is 
no longer bundled with tinkerpop
+        script.g = 
Class.forName("com.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph").newInstance()
       } catch (Exception e) {
         log.error "Something is wrong loading or creating the graph.  Here is 
the exception, good luck:"
         throw e
diff --git a/src/main/groovy/org/wikidata/gremlin/DataLoader.groovy 
b/src/main/groovy/org/wikidata/gremlin/DataLoader.groovy
deleted file mode 100644
index ea89b76..0000000
--- a/src/main/groovy/org/wikidata/gremlin/DataLoader.groovy
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Copyright (C) 2014 Wikimedia Foundation
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wikidata.gremlin
-
-import java.io.Reader
-import com.tinkerpop.gremlin.structure.Graph
-import groovy.json.*
-import com.thinkaurelius.titan.core.TitanException
-import groovy.util.logging.Slf4j
-
-@Slf4j
-class DataLoader {
-  final Loader loader
-  final Graph g
-  private Reader stream
-  private File rejects = null
-  private File processed = null
-  private int linesPerCommit = 500
-  private String fileName
-  private int numReaders = 1
-  private int myNum = 0
-  private boolean gzipped
-  private int skipLines = 0
-  private boolean failOnError = false
-  private int processedNum = 0
-  private boolean batch = false
-
-  DataLoader(Graph g, boolean ignore_props = false) {
-    this.g = g
-    this.loader = new Loader(g, ignore_props)
-  }
-
-  public DataLoader setReaders(int r) {
-    numReaders = r
-    return this
-  }
-
-  public DataLoader setNum(int r) {
-    myNum = r
-    return this
-  }
-
-  public DataLoader file(String f) {
-    fileName = f
-    gzipped = false
-    return this
-  }
-
-  public DataLoader gzipFile(String f) {
-    fileName = f
-    gzipped = true
-    return this
-  }
-
-  public DataLoader setLines(int l) {
-    linesPerCommit = l
-    return this
-  }
-
-  public DataLoader failOnError(boolean f=true) {
-    failOnError = f
-    return this
-  }
-
-  protected void initStream() {
-    def input
-    if(gzipped) {
-      input = new java.util.zip.GZIPInputStream(new FileInputStream(fileName))
-    } else {
-      input = new FileInputStream(fileName)
-    }
-    stream = new LineNumberReader(new InputStreamReader(input, "UTF-8"))
-  }
-
-  protected void initFiles() {
-    if(processed) {
-      return
-    }
-    String basename = (fileName =~ /[^\/]+$/)[0]
-    rejects = new File("rejects.${basename}.${numReaders}.${myNum}.json")
-    processed = new File("processed.${basename}.${numReaders}.${myNum}")
-  }
-
-  public DataLoader recover() {
-    initFiles()
-    if(!processed.size()) {
-      // nothing to recover
-      return this
-    }
-    processedNum = processed.text as int;
-    log.info "Recovery: advancing line number by $processedNum"
-    return this
-  }
-
-  public DataLoader batch(val = true) {
-    batch = val
-    this.loader.setBatch(val)
-    this
-  }
-
-  public void preload()
-  {
-    loader.loadLanguages()
-    loader.loadSites()
-  }
-
-  public void load(max) {
-    initStream()
-    initFiles()
-    loader.resetClaims()
-    def json = new JsonSlurper() //.setType(JsonParserType.INDEX_OVERLAY )
-    String line = stream.readLine()
-    if(line[0] == '[') {
-      line = stream.readLine()
-    }
-    if(processedNum) {
-      // we need to advance the num by 1 since last line processed is 1000, 
not 999
-      processedNum++
-    }
-    if(myNum > 0 || processedNum > 0) {
-      for(i in 0..<(myNum+processedNum)) {
-        line = stream.readLine()
-      }
-    }
-    def realLines = 0
-    def failedLines = 0
-    for(i in processedNum..<max) {
-        if(!line || line[0] == ']') {
-          break
-        }
-      if(line[-1] == ',') {
-          line = line[0..-2]
-        }
-
-      try {
-        def item = json.parseText(line)
-          if(!item) {
-          log.warn "Bad line $i, skipping\n"
-            break
-          }
-          loader.loadFromItem(item)
-      } catch(TitanException e) {
-        // it's bad and for once it's not our fault. Let's stop for now
-        log.error "Titan exception: $e. Bailing out..."
-        e.printStackTrace()
-        break;
-      } catch(e) {
-        log.warn "Importing line ${stream.getLineNumber()} failed: $e"
-        rejects << line
-        rejects << "\n"
-        failedLines++;
-        if(failOnError) { throw e; }
-      }
-      realLines++;
-      (0..numReaders-1).each() { line = stream.readLine() }
-      if(i != 0 && i % linesPerCommit == 0) {
-        if(!batch) {
-          g.tx().commit()
-        }
-        log.info "Committed on row $i"
-        def fw = new FileWriter(processed)
-        fw.write(i as String)
-        fw.close()
-      }
-    }
-    g.tx().commit()
-    log.info "Processed $realLines lines, failed $failedLines, processed 
${loader.getClaims()} claims"
-  }
-
-  public void processClaims(max, Closure c) {
-    initStream()
-    initFiles()
-    def json = new JsonSlurper() //.setType(JsonParserType.INDEX_OVERLAY )
-    String line = stream.readLine()
-    if(line[0] == '[') {
-      line = stream.readLine()
-    }
-    if(myNum > 0) {
-      for(i in 0..myNum-1) {
-        line = stream.readLine()
-      }
-    }
-    for(i in 0..max-1) {
-        if(!line || line[0] == ']') {
-          break
-        }
-      if(line[-1] == ',') {
-          line = line[0..-2]
-        }
-
-      def item = json.parseText(line)
-        if(!item) {
-           break
-        }
-      c(item, g, i)
-/*          for (claimsOnProperty in item.claims) {
-          if(!claimsOnProperty.value.size()) {
-            // empty claim, ignore
-            continue
-          }
-              for (claim in claimsOnProperty.value) {
-                if (claim.mainsnak == null) {
-              continue;
-            }
-            c(claim)
-          }
-        }
-*/
-      (0..numReaders-1).each() { line = stream.readLine() }
-      if(i != 0 && i % linesPerCommit == 0) {
-        log.info "Processed row $i"
-        def fw = new FileWriter(processed)
-        fw.write(i as String)
-        fw.close()
-      }
-    }
-  }
-}
diff --git a/src/main/groovy/org/wikidata/gremlin/Loader.groovy 
b/src/main/groovy/org/wikidata/gremlin/Loader.groovy
index 468fa50..93a729d 100644
--- a/src/main/groovy/org/wikidata/gremlin/Loader.groovy
+++ b/src/main/groovy/org/wikidata/gremlin/Loader.groovy
@@ -34,7 +34,6 @@
 
 //import com.tinkerpop.blueprints.util.wrappers.batch.VertexIDType;
 
-import com.tinkerpop.gremlin.tinkergraph.process.graph.TinkerElementTraversal
 /**
  * Loading data from external format (e.g. JSON) into the database
  */
@@ -112,7 +111,7 @@
    * Load into the DB from parsed wikidata result.
    */
   def loadFromItem(item) {
-    def id = item['id']
+    def id = item.id
     def isProperty = (id[0] == 'P')
     if(isProperty) {
       // if we loading property via update, we may want to initalize the data 
first
@@ -124,8 +123,9 @@
     }
     def v = getOrCreateVertex(id)
     def isNew = false
-    if(v['stub']) {
-      log.info "Creating $id at revision ${item.lastrevid}"
+    if (v['stub']) {
+      def revision = item.lastrevid == null ? 'at an unknown revision' : "at 
revision ${item.lastrevid}"
+      log.info "Creating $id ${revision}"
       isNew = true
     } else {
       if(item.lastrevid && v.lastrevid && v.lastrevid >= item.lastrevid) {
@@ -153,8 +153,8 @@
     } else {
       v.property('modified').remove()
     }
+    // Not committing here to allow users to manage their own transactions
     return v
-    // Not committing here to allow DataLoader to group updates
   }
 
   void setEntitySource(String src) {
@@ -357,7 +357,7 @@
       try {
         v.singleProperty(l, label.value.value)
       } catch(java.lang.IllegalArgumentException e) {
-      // We can't know all languages in advance, so have to be dynamic here
+        // We can't know all languages in advance, so have to be dynamic here
         initProperty(l, String.class, null, false) // do not create index for 
labels
         v.singleProperty(l, label.value.value)
       }
@@ -375,6 +375,7 @@
       try {
         v.singleProperty(l, true)
       } catch(java.lang.IllegalArgumentException e) {
+        // We can't know all languages in advance so we have to be dynamic here
         initProperty(l, Boolean.class, null, false) // no index since indexing 
by boolean is useless
         v.singleProperty(l, true)
       }
@@ -443,18 +444,6 @@
     }
   }
 
-  /* This is necessary because TinkerGraph which we use for unit tests
-   * hates modification within iteration.
-   * FIXME: find a sane solution for this
-   */
-  private getEdges(traversal) {
-    if(traversal instanceof TinkerElementTraversal) {
-      // collect all elements from iterator
-      return traversal.toSet()
-    }
-    return traversal
-  }
-
   private linkCache = [:]
   private refClaims = [] as Set
   /**
@@ -491,7 +480,7 @@
         return
       }
       def removed = [] as Set
-      for(cl in getEdges(v.outE('claim'))) {
+      for(cl in v.outE('claim')) {
         def clHash = cl.value('contentHash')
         if(!(clHash in claimsById)) {
           // This one is not there anymore, remove it
@@ -509,9 +498,9 @@
             targetId = v.outE(prop).has('contentHash', 
clHash).values(getValueName(prop)).next()
           }
           // Drop related claims
-          getEdges(v.outE(prop).has('contentHash', clHash)).each { it.remove() 
}
+          v.outE(prop).has('contentHash', clHash).each { it.remove() }
           def refVerts = [] as Set
-          getEdges(v.outE('reference').has('contentHash', clHash)).each { 
refVerts << it; it.remove() }
+          v.outE('reference').has('contentHash', clHash).each { refVerts << 
it; it.remove() }
           refVerts.each {
             if(!it.inV('reference').hasNext()) {
               // remove orphan reference verts
@@ -1058,31 +1047,23 @@
   }
 
   /**
-   * Load pre-existing set of languages
+   * Preinitualize a language.
    */
-  public void loadLanguages() {
+  public void initLanguage(language) {
     if(!hasMetadata) {
       return
     }
-     def items = new 
JsonSlurper().parse(getClass().getClassLoader().getResource("languages.json").toURI().toURL(),
-        'UTF-8')
-    for(row in items?.rows) {
-      initProperty('label'+row[0].capitalize(), String.class, null, false)
-      initProperty('desc'+row[0].capitalize(), Boolean.class, null, false)
-    }
+    initProperty('label' + language.capitalize(), String.class, null, false)
+    initProperty('desc' + language.capitalize(), Boolean.class, null, false)
   }
 
   /**
-   * Load pre-existing set of sites
+   * Preinitualize a site.
    */
-  public void loadSites() {
+  public void initSite(site) {
     if(!hasMetadata) {
       return
     }
-    def items = new 
JsonSlurper().parse(getClass().getClassLoader().getResource("sites.json").toURI().toURL(),
-        'UTF-8')
-    for(row in items?.rows) {
-      initSitelink(getSitelinkName(row[0]))
-    }
+    initSitelink(getSitelinkName(site))
   }
 }
diff --git a/src/main/groovy/org/wikidata/gremlin/tool/ClientScript.groovy 
b/src/main/groovy/org/wikidata/gremlin/tool/ClientScript.groovy
new file mode 100644
index 0000000..94e9fad
--- /dev/null
+++ b/src/main/groovy/org/wikidata/gremlin/tool/ClientScript.groovy
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014 Wikimedia Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wikidata.gremlin.tool
+
+import com.tinkerpop.gremlin.driver.Cluster
+import com.tinkerpop.gremlin.driver.exception.ResponseException
+import groovy.util.CliBuilder
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeUnit
+
+/**
+ * Trait helper to build and communicate with the Client.
+ */
+trait ClientScript {
+  private static final long MILLIS_TO_WAIT = TimeUnit.SECONDS.toMillis(5)
+
+  List<String> contactPoints = ['localhost']
+
+  /**
+   * Used to log errors during retryMostErrors.
+   */
+  abstract void logError(String message, Throwable t)
+
+  /**
+   * Setup a cli builder with cluster options.
+   */
+  void setupCli(CliBuilder cli) {
+    cli.g(longOpt: 'gremlin', args: 1, argName: 'urls',
+      'Send requests the Gremlin servers at this url.  Defaults to localhost.  
Multiple servers should be separated ' +
+      'by comma like foo,bar,baz.')
+  }
+
+  /**
+   * Create a cluster builder with configuration from options which should have
+   * been built by parsing command line arguments setup with setupCli.
+   */
+  Cluster.Builder clusterBuilder(options) {
+    def clusterBuilder = Cluster.build()
+    if (options.g) {
+      contactPoints = options.g.tokenize(',')
+    }
+    contactPoints.each{clusterBuilder.addContactPoint(it)}
+    // Tinkerpop uses lots of these little worker threads and can deadlock 
itself.
+    clusterBuilder.workerPoolSize(contactPoints.size() * 25)
+    clusterBuilder
+  }
+
+  def retryMostErrors(operation, closure) {
+    while (true) {
+      try {
+        def start = System.currentTimeMillis()
+        def resultSet = awaitOrCancel(closure(), MILLIS_TO_WAIT)
+        return awaitOrCancel(resultSet.all(), MILLIS_TO_WAIT + start - 
System.currentTimeMillis())
+      } catch (ExecutionException e) {
+        Exception c = e
+        while (c.getCause() != null) {
+          if (c.getCause() instanceof ResponseException && 
!c.getMessage().contains('Could not commit transaction due to exception during 
persistence')) {
+            logError("Error ${operation} that looks like a bug or dirty data.  
See gremlin server logs for more.", c)
+            return
+          } else {
+            c = c.getCause()
+          }
+        }
+        logError("Error ${operation}.  Waiting and retrying.", c)
+      } catch (Throwable t) {
+        logError("Error ${operation}.  Waiting and retrying.", t)
+      }
+      sleep(5000)
+    }
+  }
+
+  def awaitOrCancel(future, millis) {
+    try {
+      return future.get(millis, TimeUnit.MILLISECONDS)
+    } catch (Exception e) {
+      future.cancel(true)
+      throw e
+    }
+  }
+}
\ No newline at end of file
diff --git a/src/main/groovy/org/wikidata/gremlin/tool/DumpLoader.groovy 
b/src/main/groovy/org/wikidata/gremlin/tool/DumpLoader.groovy
new file mode 100644
index 0000000..baaa853
--- /dev/null
+++ b/src/main/groovy/org/wikidata/gremlin/tool/DumpLoader.groovy
@@ -0,0 +1,399 @@
+/**
+ * Copyright (C) 2014 Wikimedia Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wikidata.gremlin.tool
+
+import com.tinkerpop.gremlin.driver.Client
+import com.tinkerpop.gremlin.driver.Cluster
+import groovy.json.JsonSlurper
+import groovy.util.CliBuilder
+import groovy.util.logging.Slf4j
+import java.io.InputStream
+import java.io.ByteArrayOutputStream
+import java.net.URL
+import java.nio.file.Files
+import java.util.concurrent.Executors
+import java.util.concurrent.ExecutionException
+import java.util.zip.GZIPInputStream
+import java.util.zip.GZIPOutputStream
+import org.joda.time.DateTimeZone
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.format.DateTimeFormatter
+import org.slf4j.LoggerFactory
+
+@Slf4j
+class DumpLoader implements ClientScript {
+  private static final DateTimeFormatter FILE_NAME_FORMAT = 
DateTimeFormat.forPattern("yyyyMMdd")
+    .withZone(DateTimeZone.UTC)
+
+  static void main(String[] args) {
+    def cli = new CliBuilder(usage:'java -cp 
wikidata-gremlin-tools-0.0.9-SNAPSHOT.jar org.wikidata.gremlin.tool.DumpLoader 
<options>')
+    cli.header = "Downloads and loads a dump from wikidata into the database."
+    cli.s(longOpt: 'skip', args: 1, argName: 'line number',
+      'Skip to the item at this line in the dump.  Use when resuming a 
partially completed import.  Note that resume is not possible' +
+      'for importing languages, sites, or properties.')
+    cli.w(longOpt: 'workDir', args: 1, argName: 'directory',
+      'Work directory for downloading the dump.  Defaults to the "work" 
subdirectory of the current directory.  If the directory does' +
+      'not contain a file named dumpName it will be downloaded from 
"http://dumps.wikimedia.org/other/wikidata/";.' )
+    cli.n(longOpt: 'dumpName', args: 1, argName: 'name',
+      'Name of the latest dump.  Use this when resuming a partially completed 
import or when you already have a dump downloaded.  ' +
+      'Format like "20150112.json.gz".  Defaults to detecting based on the 
contents of "http://dumps.wikimedia.org/other/wikidata/";.  ' +
+      'If you want to reuse a downloaded dump then move it to the work 
directory and use its name.')
+    cli.d(longOpt: 'dumpDate', args: 1, argName: 'date',
+      'Date of the last dump.  Used to properly set up runs of 
RecentChangesPoller.  Format like 20150131.  Defaults to parsing the ' +
+      'dump name by taking the first 8 characters and interpreting them as a 
date in yyyyMMdd format at the UTC timezone.')
+    cli.help('Show this message')
+
+    def loader = new DumpLoader()
+    loader.setupCli(cli)
+    def options = cli.parse(args)
+    if (options.help) {
+      cli.usage()
+      System.exit(0)
+    }
+
+    if (options.s) {
+      loader.skipToLine(options.s.toInteger())
+    }
+    if (options.w) {
+      loader.workDirectory(new File(options.w))
+    }
+    if (options.n) {
+      loader.latestDumpName(options.n)
+    }
+    if (options.d) {
+      loader.dumpDate(options.d)
+    }
+    loader.sync(new GremlinSync(loader, 
loader.clusterBuilder(options)).ensureSchema()).load()
+    System.exit(0)
+  }
+
+  /**
+   * Name of the latest dump file.
+   */
+  private String latestDumpName
+
+  /**
+   * Work directory for storing downloaded and extracted files.
+   */
+  private File workDirectory = new File('work')
+
+  /**
+   * Date of the dump being imported.  Defaults to detecting it from the 
dump's name.
+   */
+  private String dumpDate
+
+  /**
+   * Sync for results.
+   */
+  private Sync sync
+
+  /**
+   * Set by latestDump method on first invocation.
+   */
+  private File latestDump
+
+  /**
+   * Set by latestProperties method of first invocation.
+   */
+  private File latestProperties
+
+  /**
+   * Skip to this line when loading items.  -1 is don't.
+   */
+  private int skipToLine = -1
+
+  def latestDumpName(String latestDumpName) {
+    this.latestDumpName = latestDumpName
+    this
+  }
+
+  def workDirectory(File workDirectory) {
+    this.workDirectory = workDirectory
+    this
+  }
+
+  def sync(Sync sync) {
+    this.sync = sync
+    this
+  }
+
+  def skipToLine(int skipToLine) {
+    this.skipToLine = skipToLine
+    this
+  }
+
+  def dumpDate(String dumpDate) {
+    this.dumpDate = dumpDate
+    this
+  }
+
+  def load() {
+    if (skipToLine < 0) {
+      loadLanguages()
+      loadSites()
+      loadProperties()
+    }
+    loadItems()
+  }
+
+  /**
+   * Preloads a list of languages so they don't have to be created on the fly.
+   */
+  def loadLanguages() {
+    log.info "Preloading languages"
+    // Loading the languages in a big call causes timeouts.  This doesn't.
+    def loaded = new 
JsonSlurper().parse(getClass().getResource("/languages.json"), 'UTF-8')
+    loaded?.rows.each({sync.language(it[0])})
+  }
+
+  /**
+   * Preloads a list of sites so they don't have to be created on the fly.
+   */
+  def loadSites() {
+    log.info "Preloading sites"
+    def loaded = new 
JsonSlurper().parse(getClass().getResource("/sites.json"), 'UTF-8')
+    loaded?.rows.each({sync.site(it[0])})
+  }
+  /**
+   * Loads all properties.
+   */
+  private def loadProperties() {
+    log.info "Loading properties"
+    eachEntity(latestProperties(), 0, 'load properties',
+      {it.startsWith('{')},
+      {line, lineNumber -> sync.entity('load properties', line, lineNumber)})
+    sync.sync('load properties', null)
+    log.info "Resume now possible with -s <line number>.  `-s 0` will resume 
from this point.  See line number logs for future points."
+  }
+
+  /**
+   * Loads all items.
+   */
+  private def loadItems() {
+    log.info "Loading items"
+    eachEntity(latestDump(), skipToLine, 'load items',
+      {it.startsWith('{') && !it.startsWith('{"id":"P')},
+      {line, lineNumber -> sync.entity('load items', line, lineNumber)})
+    sync.sync('load items', null)
+    log.info "Recording last update time"
+    if (dumpDate == null) {
+      dumpDate = latestDumpName.substring(0, 8)
+    }
+    sync.lastUpdate(FILE_NAME_FORMAT.parseDateTime(dumpDate))
+  }
+
+  private def latestProperties() {
+    if (latestProperties == null) {
+      def latestDumpStream = latestDump()
+      latestProperties = new File(latestDump.getAbsolutePath() + 
".properties.json")
+      if (!latestProperties.exists()) {
+        log.info "${latestProperties} doesn't exist, creating it"
+        File latestPropertiesPart = new 
File(latestProperties.getAbsolutePath() + ".part")
+        if (latestPropertiesPart.exists()) {
+          latestPropertiesPart.delete()
+        }
+        latestPropertiesPart.withWriter{ out ->
+          out.write("[\n")
+          eachLine(latestDumpStream, 0, 'extract properties',
+            {it.startsWith('{"id":"P')},
+            {line, lineNumber -> out.write(line);out.write("\n")})
+          out.write("]\n")
+        }
+        latestPropertiesPart.renameTo(latestProperties)
+      }
+    }
+    latestProperties.newInputStream()
+  }
+
+  /**
+   * Call c for each entity that passes strFilter.
+   */
+  private void eachEntity(InputStream input, int skipToLine, String 
operationName, Closure strFilter, Closure c) {
+    eachLine(input, skipToLine, operationName, strFilter){ line, lineNumber ->
+      if (line.endsWith(',')) {
+        line = line.substring(0, line.length() - 1)
+      }
+      c(line, lineNumber)
+    }
+  }
+
+  private void eachLine(InputStream input, int skipToLine, String 
operationName, Closure strFilter, Closure c) {
+    input.eachLine('UTF-8'){ line, lineNumber ->
+      if (lineNumber < skipToLine) {
+        if (lineNumber % 100000 == 0) {
+          log.info "${operationName} - skipped ${lineNumber} lines..."
+        }
+        return
+      }
+      if (lineNumber % 10000 == 0) {
+        log.info "${operationName} - scanned ${lineNumber} lines..."
+      }
+      if (!strFilter(line)) {
+        return
+      }
+      c(line, lineNumber)
+    }
+  }
+
+  /**
+   * Opens an input stream that reads the dump as text, unzipping it on the 
fly.
+   * If the dump isn't downloaded then it downloads it completely and then
+   * reopens it as a stream.
+   */
+  private def latestDump() {
+    if (latestDump == null) {
+      def dumpRoot = 'http://dumps.wikimedia.org/other/wikidata/'
+      if (latestDumpName == null) {
+        new URL(dumpRoot).eachLine('UTF-8'){ line ->
+          def matcher = line =~ /^<a href="([^"]+)">.*<\/a>.*(\d+).*$/
+          if (!matcher.matches()) {
+            return
+          }
+          latestDumpName = matcher.group(1)
+        }
+        if (latestDumpName == null) {
+          throw new RuntimeException("Couldn't find latest dump at 
${dumpRoot}")
+        }
+      }
+      log.info "Latest wikidata dump is ${latestDumpName}"
+      if (!workDirectory.exists()) {
+        workDirectory.mkdir()
+      }
+      latestDump = new File(workDirectory, latestDumpName)
+      if (latestDump.exists()) {
+        log.info "${latestDump} exists - skipping download."
+      } else {
+        log.info "${latestDump} doesn't exist - downloading."
+        log.info "This really deserves a progress bar...."
+        def latestDumpPart = new File(latestDump.getAbsolutePath() + ".part")
+        if (latestDumpPart.exists()) {
+          latestDumpPart.delete();
+        }
+        def stream = new URL("${dumpRoot}/${latestDumpName}").openStream()
+        latestDumpPart.append(stream)
+        stream.close
+        latestDumpPart.renameTo(latestDump)
+      }
+    }
+    new GZIPInputStream(latestDump.newInputStream())
+  }
+
+  void logError(String message, Throwable t) {
+    log.warn message
+    LoggerFactory.getLogger("verbose." + log.getName()).warn(message, t)
+  }
+
+  interface Sync {
+    void language(language)
+    void site(site)
+    void entity(operationName, entity, lineNumber)
+    void sync(operationName, lineNumber)
+    void lastUpdate(lastUpdate)
+  }
+
+  static class PrintSync implements Sync {
+    def json = new JsonSlurper()
+    void language(language) {
+      print "Language ${language}"
+    }
+    void site(site) {
+      print "Site ${site}"
+    }
+    void entity(operationName, blob, lineNumber) {
+      println json.parseText(blob).id
+    }
+    void sync(operationName, lineNumber) {}
+    void lastUpdate(lastUpdate) {}
+  }
+
+  static class GremlinSync implements Sync {
+    private final ClientScript loader
+    private final def executor
+    private final def futures = []
+    private final Cluster cluster
+    private final Client client
+
+    public GremlinSync(loader, clusterBuilder) {
+      this.loader = loader
+      // Eight megabytes should be enough
+      clusterBuilder.maxContentLength(8388608)
+      // Kryo had some trouble with serializing large strings - JSON it is
+      clusterBuilder.serializer("JSON")
+      cluster = clusterBuilder.create()
+      client = cluster.connect()
+      executor = Executors.newFixedThreadPool(loader.contactPoints.size() * 4)
+    }
+
+    GremlinSync ensureSchema() {
+      client.submit("g.wt().ensureSchema()").all().get()
+      this
+    }
+
+    void language(language) {
+      loader.retryMostErrors("preloading ${language}"){
+        client.submitAsync("g.wt().loader().initLanguage(language)", 
[language: language])
+      }
+    }
+
+    void site(site) {
+      loader.retryMostErrors("preloading ${site}"){
+        client.submitAsync("g.wt().loader().initSite(site)", [site: site])
+      }
+    }
+
+    void entity(operationName, blob, lineNumber) {
+      // Sending a big String over Kryo wasn't working so we parse it and send 
it as json
+      // which should work better.  Its not as efficient but it works.
+      def entity = new JsonSlurper().parseText(blob)
+      def task = {loader.retryMostErrors("updating ${entity.id}"){
+        client.submitAsync("g.wt().loadFromEntity(entity); 
g.commitIfSupported()",
+          [entity: entity])
+      }}
+      // The first few items and all properties will have lock contention 
issues if we
+      // use too many threads.  So we single thread them.
+      if (entity.type != 'item' || lineNumber < 500) {
+        task()
+        log.info "${operationName} - Finished up to line ${lineNumber}."
+      } else {
+        futures << executor.submit(task)
+        if (futures.size() % 1000 == 0) {
+          sync(operationName, lineNumber)
+        }
+      }
+    }
+
+    void sync(operationName, lineNumber) {
+      futures.each{it.get()}
+      futures.clear()
+      if (lineNumber == null) {
+        log.info "${operationName} - Finished."
+      } else {
+        log.info "${operationName} - Finished up to line ${lineNumber}."
+      }
+    }
+
+    void lastUpdate(lastUpdate) {
+      loader.retryMostErrors('updating last update time'){
+        client.submitAsync("g.variables().set('lastUpdate', lastUpdate)", 
[lastUpdate: lastUpdate.getMillis()])
+      }
+    }
+
+    void close() {
+      client.close()
+      cluster.close()
+      futures.each{it.get()}
+    }
+  }
+}
diff --git a/src/main/groovy/org/wikidata/gremlin/RecentChangesPoller.groovy 
b/src/main/groovy/org/wikidata/gremlin/tool/RecentChangesPoller.groovy
similarity index 76%
rename from src/main/groovy/org/wikidata/gremlin/RecentChangesPoller.groovy
rename to src/main/groovy/org/wikidata/gremlin/tool/RecentChangesPoller.groovy
index b9e8e3b..e4486a4 100644
--- a/src/main/groovy/org/wikidata/gremlin/RecentChangesPoller.groovy
+++ b/src/main/groovy/org/wikidata/gremlin/tool/RecentChangesPoller.groovy
@@ -12,11 +12,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.wikidata.gremlin
+package org.wikidata.gremlin.tool
 
 import com.tinkerpop.gremlin.driver.Client
 import com.tinkerpop.gremlin.driver.Cluster
-import com.tinkerpop.gremlin.driver.exception.ResponseException
 import groovy.util.CliBuilder
 import groovy.util.logging.Slf4j
 import groovyx.net.http.HTTPBuilder
@@ -28,7 +27,6 @@
 import org.joda.time.format.ISODateTimeFormat
 import org.slf4j.LoggerFactory
 import java.util.concurrent.Executors
-import java.util.concurrent.ExecutionException
 
 import static groovyx.net.http.ContentType.JSON
 import static groovyx.net.http.Method.GET
@@ -39,14 +37,14 @@
  * database.
  */
 @Slf4j
-class RecentChangesPoller {
+class RecentChangesPoller implements ClientScript {
   private static final DateTimeFormatter API_INPUT_FORMAT = 
DateTimeFormat.forPattern("yyyyMMddHHmmss")
     .withZone(DateTimeZone.UTC)
   private static final DateTimeFormatter API_OUTPUT_FORMAT = 
ISODateTimeFormat.dateTimeNoMillis()
     .withZone(DateTimeZone.UTC)
 
-  public static void main(String[] args) {
-    def cli = new CliBuilder(usage:'pollRecentChanges')
+  static void main(String[] args) {
+    def cli = new CliBuilder(usage:'java -cp 
wikidata-gremlin-tools-0.0.9-SNAPSHOT.jar 
org.wikidata.gremlin.tool.RecentChangesPoller <options>')
     cli.header = "Polls the recent changes API at Wikidata and sends requests 
off to gremlin servers to " +
       "update the database."
     cli.s(longOpt: 'start', args: 1, argName: 'start time',
@@ -54,36 +52,33 @@
       'Accepts yyyyMMddkkmmss or yyyy-MM-ddTkk:mm:ssZ.')
     cli.p(longOpt: 'print',
       'Print the updates instead of sending them to Gremlin.')
-    cli.g(longOpt: 'gremlin', args: 1, argName: 'gremlin urls',
-      'Send requests the Gremlin servers at this url.  Defaults to localhost.  
Multiple servers should be separated ' +
-      'by comma like foo,bar,baz.')
     cli.w(longOpt: 'wait', args: 1, argName: 'wait time',
       'Seconds to wait after wikidata returns no data left.  Defaults to 30.  
Decimals ok.')
     cli.u(longOpt: 'url', args: 1, argName: 'wikidata url',
       'Base of the url for wikidata.  Defaults to www.wikidata.org.')
     cli.c(longOpt: 'count', args: 1, argName: 'count',
-      'Number of recent changes to pull at a time.  Defaults to 10.')
+      'Number of recent changes to pull at a time.  Defaults to 100.')
     cli.oneshot('Stop once all data has been collected rather than sleeping 
checking again.')
     cli.nologfile("Don't log to a file - just spew all logs to stdout no 
matter how verbose.")
     cli.help('Show this message')
+    def poller = new RecentChangesPoller()
+    poller.setupCli(cli)
     def options = cli.parse(args)
     if (options.help) {
       cli.usage()
       System.exit(0)
     }
-    def poller = new RecentChangesPoller()
     def gremlinSync = null
+    poller.verboseLogsGoToFile(!options.nologfile)
     if (options.p) {
       poller.sync(new PrintSync())
     } else {
-      def contactPoints = ['localhost']
-      if (options.g) {
-        contactPoints = options.g.tokenize(',')
-      }
-      gremlinSync = new GremlinSync(contactPoints, !options.nologfile)
+      def clusterBuilder = poller.clusterBuilder(options)
+      gremlinSync = new GremlinSync(poller, clusterBuilder.create())
       try {
         gremlinSync.ensureSchema()
       } catch (RuntimeException e) {
+        e.printStackTrace()
         println "Error validating schema.  Check that gremlin server is 
running.  If it is running, check its logs."
         System.exit(1)
       }
@@ -98,7 +93,7 @@
       base = options.u
     }
     poller.url("http://${base}/w/api.php";)
-    def count = 10
+    def count = 100
     if (options.c) {
       count = options.c.toInteger()
     }
@@ -143,6 +138,7 @@
   private Sync sync
   private String url
   private int count
+  private boolean verboseLogsGoToFile = true
 
   def extra(extra) {
     this.extra = extra
@@ -169,10 +165,9 @@
     this
   }
 
-  def close() {
-    if (sync != null) {
-      sync.close()
-    }
+  def verboseLogsGoToFile(boolean verboseLogsGoToFile) {
+    this.verboseLogsGoToFile = verboseLogsGoToFile
+    this
   }
 
   /**
@@ -219,6 +214,15 @@
     return lastContinue != ['continue': '']
   }
 
+  void logError(String message, Throwable t) {
+    if (verboseLogsGoToFile) {
+      log.warn message
+      LoggerFactory.getLogger("verbose." + log.getName()).warn(message, t)
+    } else {
+      log.warn(message, t)
+    }
+  }
+
   /**
    * Build the update command for the change from the recent changes feed.
    * @return [the command string, a map of command parameters]
@@ -230,13 +234,13 @@
      following cases:
      1.  New entities are created as part of the wd step.  Then we skip
      updating the entity a second time by filtering on last revision in the
-     has step.
+     filter step.
      2.  Updated entities are found in the graph with the wd step, the in the
-     has step doesn't remove them, and the refresh step updates them.
+     filter step doesn't remove them, and the refresh step updates them.
      3.  Unchanced entities are found in the graph with the wd step and 
filtered
-     out with the has step.
+     out with the filter step.
      4.  Deleted entities that exist are handled by finding them in the wd 
step,
-     skipping the has step entirely (for all deletes), and the refresh step
+     skipping the filter step entirely (for all deletes), and the refresh step
      loops back to wikidata and when it finds that the entities indeed no 
longer
      exist it deletes them.
      5.  Deleted entities that don't exist aren't found or loaded by the wd
@@ -247,15 +251,14 @@
     if (change.type == 'log' && change.logtype == 'delete' && change.logaction 
== 'delete') {
       filter = ""
     } else {
-      filter = ".has('lastrevid', lt, revid)"
+      filter = ".filter{it.get().property('lastrevid').orElse(0) < revid}"
       parameters['revid'] = change.revid
     }
-    ["g.of(WikidataTraversal.class).wd(title)${filter}.refresh()", parameters]
+    
["g.of(WikidataTraversal.class).wd(title)${filter}.refresh().properties('wikibaseId').value()",
 parameters]
   }
 
   interface Sync {
     void recentChanges(lastMaxTime, recentChanges)
-    void close()
   }
 
   static class PrintSync implements Sync {
@@ -267,7 +270,6 @@
         println setParams + command
       }
     }
-    void close() {}
   }
 
   @Slf4j
@@ -283,15 +285,14 @@
       }
     }
 
-    private final def executor = Executors.newCachedThreadPool()
+    private final def executor
+    private final ClientScript poller
     private final Cluster cluster
-    private final boolean verboseLogsGoToFile
 
-    public GremlinSync(contactPoints, verboseLogsGoToFile) {
-      def clusterBuilder = Cluster.build()
-      contactPoints.each{clusterBuilder.addContactPoint(it)}
-      cluster = clusterBuilder.create()
-      this.verboseLogsGoToFile = verboseLogsGoToFile
+    GremlinSync(poller, cluster) {
+      this.poller = poller
+      this.cluster = cluster
+      executor = Executors.newFixedThreadPool(poller.contactPoints.size() * 10)
     }
 
     GremlinSync ensureSchema() {
@@ -313,54 +314,18 @@
       cluster.connect().withClient{ client ->
         def futures = recentChanges.collect{ change ->
           executor.submit({
-            blindlyRetry{
-              // We try to catch exceptions that might be bugs and just 
continue rather than retry forever.
-              try {
-                def (command, parameters) = updateCommand(change)
-                client.submit(command, parameters).all().get()
-              } catch (ExecutionException e) {
-                Exception c = e
-                while (c.getCause() != null) {
-                  if (c.getCause() instanceof ResponseException) {
-                    logError("Error updating ${change.title} that looks like a 
bug or dirty data.  See gremlin server logs for more.", t)
-                    return
-                  } else {
-                    c = c.getCause()
-                  }
-                }
-                throw e
-              }
+            poller.retryMostErrors("updating ${change.title}"){
+              def (command, parameters) = updateCommand(change)
+              client.submitAsync(command, parameters)
             }
           })
         }
         futures.each{it.get()}
         // Not setting during load process for each vertex to reduce contention
-        blindlyRetry{client.submit("g.variables().set('lastUpdate', 
lastMaxTime)",
-          [lastMaxTime: lastMaxTime.getMillis()]).all().get()}
-      }
-    }
-
-    void close() {
-      log.debug "Closing Gremlin driver"
-      cluster.close()
-    }
-
-    def blindlyRetry(closure) {
-      while (true) {
-        try {
-          return closure()
-        } catch (Throwable t) {
-          logError("Error updating.  Waiting and retrying.", t)
+        poller.retryMostErrors('updating last update time'){
+          client.submitAsync("g.variables().set('lastUpdate', lastMaxTime)",
+            [lastMaxTime: lastMaxTime.getMillis()])
         }
-      }
-    }
-
-    def logError(String message, Throwable t) {
-      if (verboseLogsGoToFile) {
-        log.warn message
-        LoggerFactory.getLogger("verbose." + log.getName()).warn(message, t)
-      } else {
-        log.warn(message, t)
       }
     }
   }
diff --git 
a/src/main/java/org/wikidata/gremlin/DomainSpecificLanguageTraversal.java 
b/src/main/java/org/wikidata/gremlin/DomainSpecificLanguageTraversal.java
index b563275..3c5e567 100644
--- a/src/main/java/org/wikidata/gremlin/DomainSpecificLanguageTraversal.java
+++ b/src/main/java/org/wikidata/gremlin/DomainSpecificLanguageTraversal.java
@@ -15,9 +15,8 @@
 package org.wikidata.gremlin;
 
 import com.tinkerpop.gremlin.process.graph.GraphTraversal;
-import com.tinkerpop.gremlin.process.graph.step.sideEffect.StartStep;
+import com.tinkerpop.gremlin.process.graph.step.sideEffect.GraphStep;
 import com.tinkerpop.gremlin.structure.Vertex;
-
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -27,12 +26,12 @@
  * @param <E> element type being traversed - .get returns this
  * @param <Self> self type for casting
  */
-public interface DomainSpecificLanguageTraversal<S, E, Self> extends 
GraphTraversal<S, E>, SelfAware<Self> {
+public interface DomainSpecificLanguageTraversal<S, E, Self> extends 
GraphTraversal<S, E>, SelfAware<Self>, HasGraph {
   /**
    * Seed the graph traversal with all things.
    */
   default Self all() {
-    return cast(addStep(new StartStep<>(this, sideEffects().getGraph().V())));
+    return cast(asAdmin().addStep(new GraphStep<>(this, graph(), 
Vertex.class)));
   }
 
   /**
diff --git a/src/main/java/org/wikidata/gremlin/HasGraph.java 
b/src/main/java/org/wikidata/gremlin/HasGraph.java
new file mode 100644
index 0000000..3604433
--- /dev/null
+++ b/src/main/java/org/wikidata/gremlin/HasGraph.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (C) 2014 Wikimedia Foundation
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.wikidata.gremlin;
+
+import com.tinkerpop.gremlin.structure.Graph;
+
+/**
+ * Objects of this type exist in the context of a single graph.
+ */
+public interface HasGraph {
+  /**
+   * The graph that this object is in the context of.
+   */
+  Graph graph();
+}
diff --git a/src/main/java/org/wikidata/gremlin/IteratorUtils.java 
b/src/main/java/org/wikidata/gremlin/IteratorUtils.java
index 5bcfc8f..4625e23 100644
--- a/src/main/java/org/wikidata/gremlin/IteratorUtils.java
+++ b/src/main/java/org/wikidata/gremlin/IteratorUtils.java
@@ -14,21 +14,13 @@
  */
 package org.wikidata.gremlin;
 
-import com.tinkerpop.gremlin.process.util.SingleIterator;
 import java.util.Collections;
 import java.util.Iterator;
 
 /**
  * Utilities for building iterators.
  */
-public abstract class IteratorUtils {
-  /**
-   * Build an iterator of a single element.
-   */
-  public static final <S> Iterator<S> of(S s) {
-    return new SingleIterator(s);
-  }
-
+public abstract class IteratorUtils extends 
com.tinkerpop.gremlin.util.iterator.IteratorUtils {
   /**
    * If element isn't null build an iterator of it otherwise build and empty 
iterator.
    */
diff --git a/src/main/java/org/wikidata/gremlin/LoadingTraversal.java 
b/src/main/java/org/wikidata/gremlin/LoadingTraversal.java
index 50e95ed..d73ee01 100644
--- a/src/main/java/org/wikidata/gremlin/LoadingTraversal.java
+++ b/src/main/java/org/wikidata/gremlin/LoadingTraversal.java
@@ -18,7 +18,6 @@
 import com.tinkerpop.gremlin.process.graph.step.sideEffect.IdentityStep;
 import com.tinkerpop.gremlin.process.graph.step.sideEffect.StartStep;
 import com.tinkerpop.gremlin.structure.Vertex;
-import groovy.json.JsonSlurper;
 
 /**
  * Implementations of methods to load items from wikidata and ensure the schema
@@ -27,22 +26,22 @@
  * @param <E> element type being traversed - .get returns this
  * @param <Self> self type for casting
  */
-public interface LoadingTraversal<S, E, Self> extends GraphTraversal<S, E>, 
SelfAware<Self> {
+public interface LoadingTraversal<S, E, Self> extends GraphTraversal<S, E>, 
SelfAware<Self>, HasGraph {
   /**
    * Ensure schema is setup.
    */
   default Self ensureSchema() {
-    Schema schema = new Schema(sideEffects().getGraph());
+    Schema schema = new Schema(graph());
     schema.setupSchema();
     schema.setupConstantData();
-    return cast(addStep(new IdentityStep<>(this)));
+    return cast(asAdmin().addStep(new IdentityStep<>(this)));
   }
 
   /**
    * Load id from wikidata (if it isn't already loaded) and start the 
traversal there.
    */
   default Self wd(String id) {
-    return cast(addStep(new StartStep<>(this, loader().byId(id))));
+    return cast(asAdmin().addStep(new StartStep<>(this, loader().byId(id))));
   }
 
   /**
@@ -69,17 +68,16 @@
   }
 
   /**
-   * Load from a json blob.
+   * Load from a pre-fetched object.
    */
-  default Self loadFromBlob(String text) {
-    Object item = new JsonSlurper().parseText(text);
-    return cast(addStep(new StartStep<>(this, loader().loadFromItem(item))));
+  default Self loadFromEntity(Object item) {
+    return cast(asAdmin().addStep(new StartStep<>(this, 
loader().loadFromItem(item))));
   }
 
   /**
    * Fetch the loader to use for this traversal.
    */
   default Loader loader() {
-    return sideEffects().getOrCreate("loader", () -> new 
Loader(sideEffects().getGraph()));
+    return asAdmin().getSideEffects().getOrCreate("loader", () -> new 
Loader(graph(), false));
   }
 }
diff --git a/src/main/java/org/wikidata/gremlin/WikidataTraversal.java 
b/src/main/java/org/wikidata/gremlin/WikidataTraversal.java
index 6527883..da4d2d1 100644
--- a/src/main/java/org/wikidata/gremlin/WikidataTraversal.java
+++ b/src/main/java/org/wikidata/gremlin/WikidataTraversal.java
@@ -14,11 +14,8 @@
  */
 package org.wikidata.gremlin;
 
-import com.tinkerpop.gremlin.process.TraversalStrategies;
 import com.tinkerpop.gremlin.process.graph.GraphTraversal;
-import com.tinkerpop.gremlin.process.graph.strategy.TraverserSourceStrategy;
-import com.tinkerpop.gremlin.process.util.DefaultTraversal;
-import com.tinkerpop.gremlin.process.util.DefaultTraversalStrategies;
+import com.tinkerpop.gremlin.process.graph.util.DefaultGraphTraversal;
 import com.tinkerpop.gremlin.structure.Graph;
 
 /**
@@ -44,20 +41,25 @@
   }
 
   /**
-   * Default traversal implementation.
+   * Default implementation of WikidataTraversal.
    */
-  class DefaultWikidataTraversal extends DefaultTraversal implements 
WikidataTraversal {
-    static {
-      DefaultTraversalStrategies traversalStrategies = new 
DefaultTraversalStrategies();
-      traversalStrategies.addStrategy(TraverserSourceStrategy.instance());
-      
TraversalStrategies.GlobalCache.registerStrategies(DefaultWikidataTraversal.class,
 traversalStrategies);
-    }
+  class DefaultWikidataTraversal<S, E> extends DefaultGraphTraversal<S, E> 
implements WikidataTraversal<S, E> {
+    /**
+     * Graph for this traversal.
+     */
+    private final Graph graph;
 
     /**
      * Build witha graph reference.
      */
     public DefaultWikidataTraversal(Graph graph) {
-      super(graph);
+      super(Graph.class);
+      this.graph = graph;
+    }
+
+    @Override
+    public Graph graph() {
+      return graph;
     }
   }
 }
diff --git a/src/test/resources/gremlin-server.yaml 
b/src/test/resources/gremlin-server.yaml
index 04c7fad..e198f23 100644
--- a/src/test/resources/gremlin-server.yaml
+++ b/src/test/resources/gremlin-server.yaml
@@ -15,14 +15,15 @@
 
 host: localhost
 port: 8182
-threadPoolWorker: 1
-gremlinPool: 1
+threadPoolWorker: 2
+gremlinPool: 8
 scriptEvaluationTimeout: 30000
 serializedResponseTimeout: 30000
 channelizer: com.tinkerpop.gremlin.server.channel.WebSocketChannelizer
 graphs: {
-  g: src/test/resources/tinkergraph-empty.properties}
+  g: src/test/resources/titan-cassandra-localhost.properties}
 plugins:
+  - aurelius.titan
   - tinkerpop.tinkergraph
   - wikidata
 scriptEngines: {
@@ -50,9 +51,9 @@
 maxInitialLineLength: 4096
 maxHeaderSize: 8192
 maxChunkSize: 8192
-maxContentLength: 65536
+maxContentLength: 8388608
 maxAccumulationBufferComponents: 1024
-resultIterationBatchSize: 64
+resultIterationBatchSize: 16
 writeBufferHighWaterMark: 32768
 writeBufferHighWaterMark: 65536
 ssl: {
diff --git a/src/test/resources/remote.yaml b/src/test/resources/remote.yaml
index 19f2c03..1aced8d 100644
--- a/src/test/resources/remote.yaml
+++ b/src/test/resources/remote.yaml
@@ -16,4 +16,4 @@
 
 hosts: [localhost]
 port: 8182
-serializer: { className: 
com.tinkerpop.gremlin.driver.ser.KryoMessageSerializerV1d0, config: { 
serializeResultToString: true }}
+serializer: { className: 
com.tinkerpop.gremlin.driver.ser.JsonMessageSerializerGremlinV1d0, config: { 
serializeResultToString: true }}
diff --git a/src/test/resources/titan-cassandra-localhost.properties 
b/src/test/resources/titan-cassandra-localhost.properties
new file mode 100644
index 0000000..2165a3f
--- /dev/null
+++ b/src/test/resources/titan-cassandra-localhost.properties
@@ -0,0 +1,19 @@
+# Basic empty TinkerGraph configuration
+#
+# Copyright (C) 2014 Wikimedia Foundation
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
+storage.backend=cassandra
+storage.hostname=localhost

-- 
To view, visit https://gerrit.wikimedia.org/r/187279
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I4c784bb4110d002c85f482ed6bc3b144b247ef4d
Gerrit-PatchSet: 7
Gerrit-Project: wikidata/gremlin
Gerrit-Branch: master
Gerrit-Owner: Manybubbles <never...@wikimedia.org>
Gerrit-Reviewer: Aude <aude.w...@gmail.com>
Gerrit-Reviewer: Manybubbles <never...@wikimedia.org>
Gerrit-Reviewer: Smalyshev <smalys...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to