Updated Branches: refs/heads/develop acba9247d -> 54670f4c9
also integrate existance check to KiWiLoader and add a test Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/54670f4c Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/54670f4c Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/54670f4c Branch: refs/heads/develop Commit: 54670f4c973cf2032f43526ca8a911f44ff1daa5 Parents: acba924 Author: Sebastian Schaffert <[email protected]> Authored: Fri Dec 13 11:45:36 2013 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Fri Dec 13 11:45:36 2013 +0100 ---------------------------------------------------------------------- .../kiwi/loader/generic/KiWiBatchHandler.java | 12 ++-- .../kiwi/loader/generic/KiWiHandler.java | 47 +++++++++++-- .../kiwi/loader/pgsql/create_indexes.sql | 2 +- .../marmotta/kiwi/loader/pgsql/drop_indexes.sql | 2 +- .../marmotta/kiwi/loader/KiWiHandlerTest.java | 74 +++++++++++--------- .../marmotta/kiwi/loader/KiWiLoaderTest.java | 14 ++-- .../marmotta/kiwi/loader/PGCopyUtilTest.java | 23 ++---- .../apache/marmotta/kiwi/sail/KiWiStore.java | 1 + 8 files changed, 104 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java index 01b3dd3..4cbd29d 100644 --- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java +++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiBatchHandler.java @@ -19,11 +19,7 @@ package org.apache.marmotta.kiwi.loader.generic; import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration; import org.apache.marmotta.kiwi.loader.pgsql.KiWiPostgresHandler; -import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource; -import org.apache.marmotta.kiwi.model.rdf.KiWiLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiNode; -import org.apache.marmotta.kiwi.model.rdf.KiWiTriple; -import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource; +import org.apache.marmotta.kiwi.model.rdf.*; import org.apache.marmotta.kiwi.sail.KiWiStore; import org.openrdf.model.Literal; import org.openrdf.rio.RDFHandler; @@ -201,9 +197,6 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler @Override protected void storeTriple(KiWiTriple result) throws SQLException { - if(result.getId() < 0) { - result.setId(connection.getNextSequence("triples")); - } tripleBacklog.add(result); @@ -212,6 +205,9 @@ public abstract class KiWiBatchHandler extends KiWiHandler implements RDFHandler if(triples % config.getCommitBatchSize() == 0) { try { flushBacklog(); + if(registry != null) { + registry.releaseTransaction(connection.getTransactionId()); + } connection.commit(); } catch (SQLException ex) { log.warn("could not flush out data ({}), retrying with fresh connection", ex.getCause().getMessage()); http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java index 9ce1e84..8eca550 100644 --- a/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java +++ b/libraries/kiwi/kiwi-loader/src/main/java/org/apache/marmotta/kiwi/loader/generic/KiWiHandler.java @@ -7,16 +7,14 @@ import net.sf.ehcache.Element; import net.sf.ehcache.constructs.blocking.CacheEntryFactory; import net.sf.ehcache.constructs.blocking.SelfPopulatingCache; import org.apache.marmotta.commons.sesame.model.Namespaces; +import org.apache.marmotta.commons.sesame.tripletable.IntArray; import org.apache.marmotta.commons.util.DateUtils; import org.apache.marmotta.kiwi.loader.KiWiLoaderConfiguration; import org.apache.marmotta.kiwi.model.rdf.*; import org.apache.marmotta.kiwi.persistence.KiWiConnection; +import org.apache.marmotta.kiwi.persistence.KiWiTripleRegistry; import org.apache.marmotta.kiwi.sail.KiWiStore; -import org.openrdf.model.BNode; -import org.openrdf.model.Literal; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; +import org.openrdf.model.*; import org.openrdf.model.impl.URIImpl; import org.openrdf.rio.RDFHandler; import org.openrdf.rio.RDFHandlerException; @@ -65,6 +63,9 @@ public class KiWiHandler implements RDFHandler { private Statistics statistics; + // only used when statement existance check is enabled + protected KiWiTripleRegistry registry; + protected Date importDate; @@ -112,6 +113,9 @@ public class KiWiHandler implements RDFHandler { }); + if(config.isStatementExistanceCheck()) { + registry = new KiWiTripleRegistry(store); + } } @@ -163,6 +167,9 @@ public class KiWiHandler implements RDFHandler { @Override public void endRDF() throws RDFHandlerException { + if(registry != null) { + registry.releaseTransaction(connection.getTransactionId()); + } try { connection.commit(); @@ -246,9 +253,33 @@ public class KiWiHandler implements RDFHandler { } KiWiTriple result = new KiWiTriple(subject,predicate,object,context, importDate); + + // statement existance check; use the triple registry to lookup if there are any concurrent triple creations if(config.isStatementExistanceCheck()) { - result.setId(connection.getTripleId(subject, predicate, object, context, true)); + IntArray cacheKey = IntArray.createSPOCKey(subject, predicate, object, context); + long tripleId = registry.lookupKey(cacheKey); + + if(tripleId >= 0) { + // try getting id from registry + result.setId(tripleId); + + registry.registerKey(cacheKey, connection.getTransactionId(), result.getId()); + } else { + // not found in registry, try loading from database + result.setId(connection.getTripleId(subject,predicate,object,context,true)); + } + + // triple has no id from registry or database, so we create one and flag it for reasoning + if(result.getId() < 0) { + result.setId(connection.getNextSequence("seq.triples")); + result.setNewTriple(true); + + registry.registerKey(cacheKey, connection.getTransactionId(), result.getId()); + } + } else { + result.setId(connection.getNextSequence("triples")); } + storeTriple(result); } catch (SQLException | ExecutionException e) { @@ -459,6 +490,10 @@ public class KiWiHandler implements RDFHandler { triples++; if(triples % config.getCommitBatchSize() == 0) { + if(registry != null) { + registry.releaseTransaction(connection.getTransactionId()); + } + connection.commit(); printStatistics(); http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql index 02818ad..30169fe 100644 --- a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql +++ b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/create_indexes.sql @@ -1,4 +1,4 @@ -CREATE INDEX idx_triples_op ON triples(object,predicate) WHERE deleted = false; +CREATE INDEX idx_triples_p ON triples(object,predicate) WHERE deleted = false; CREATE INDEX idx_triples_spo ON triples(subject,predicate,object) WHERE deleted = false; CREATE INDEX idx_triples_cspo ON triples(context,subject,predicate,object) WHERE deleted = false; CREATE INDEX idx_node_dcontent ON nodes(dvalue) WHERE dvalue IS NOT NULL; http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql index 40dbafb..f979357 100644 --- a/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql +++ b/libraries/kiwi/kiwi-loader/src/main/resources/org/apache/marmotta/kiwi/loader/pgsql/drop_indexes.sql @@ -1,4 +1,4 @@ -DROP INDEX IF EXISTS idx_triples_op; +DROP INDEX IF EXISTS idx_triples_p; DROP INDEX IF EXISTS idx_triples_spo; DROP INDEX IF EXISTS idx_triples_cspo; DROP INDEX IF EXISTS idx_node_dcontent; http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java index 24d7723..71d8dac 100644 --- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java +++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiHandlerTest.java @@ -8,11 +8,7 @@ import org.apache.marmotta.kiwi.persistence.mysql.MySQLDialect; import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect; import org.apache.marmotta.kiwi.sail.KiWiStore; import org.apache.marmotta.kiwi.test.junit.KiWiDatabaseRunner; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.TestWatcher; import org.junit.runner.Description; import org.junit.runner.RunWith; @@ -20,10 +16,7 @@ import org.openrdf.repository.Repository; import org.openrdf.repository.RepositoryConnection; import org.openrdf.repository.RepositoryException; import org.openrdf.repository.sail.SailRepository; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFParseException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.Rio; +import org.openrdf.rio.*; import org.openrdf.sail.SailException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,13 +47,13 @@ public class KiWiHandlerTest { @Before public void initDatabase() throws RepositoryException, IOException, RDFParseException, SailException { store = new KiWiStore(dbConfig); + store.setDropTablesOnShutdown(true); repository = new SailRepository(store); repository.initialize(); } @After public void dropDatabase() throws RepositoryException, SQLException, SailException { - store.getPersistence().dropDatabase(); repository.shutDown(); } @@ -79,41 +72,56 @@ public class KiWiHandlerTest { }; @Test - public void testImport() throws Exception { + public void testImportNoCheck() throws Exception { + testImport(new KiWiLoaderConfiguration()); + } + + @Test + public void testImportExistanceCheck() throws Exception { + KiWiLoaderConfiguration cfg = new KiWiLoaderConfiguration(); + cfg.setStatementExistanceCheck(true); + testImport(cfg); + } + + private void testImport(KiWiLoaderConfiguration c) throws RDFParseException, IOException, RDFHandlerException { KiWiHandler handler; if(store.getPersistence().getDialect() instanceof PostgreSQLDialect) { - handler = new KiWiPostgresHandler(store, new KiWiLoaderConfiguration()); + handler = new KiWiPostgresHandler(store, c); } else if(store.getPersistence().getDialect() instanceof MySQLDialect) { - handler = new KiWiMySQLHandler(store, new KiWiLoaderConfiguration()); + handler = new KiWiMySQLHandler(store, c); } else { - handler = new KiWiHandler(store,new KiWiLoaderConfiguration()); + handler = new KiWiHandler(store, c); } - // bulk import - long start = System.currentTimeMillis(); - RDFParser parser = Rio.createParser(RDFFormat.RDFXML); - parser.setRDFHandler(handler); - parser.parse(this.getClass().getResourceAsStream("demo-data.foaf"),""); - - logger.info("bulk import in {} ms", System.currentTimeMillis() - start); - - // check presence of data try { - RepositoryConnection con = repository.getConnection(); - try { - con.begin(); + // bulk import + long start = System.currentTimeMillis(); + RDFParser parser = Rio.createParser(RDFFormat.RDFXML); + parser.setRDFHandler(handler); + parser.parse(this.getClass().getResourceAsStream("demo-data.foaf"),""); - Assert.assertTrue(con.hasStatement(null,null,null,true)); + logger.info("bulk import in {} ms", System.currentTimeMillis() - start); - con.commit(); + // check presence of data + try { + RepositoryConnection con = repository.getConnection(); + try { + con.begin(); + + Assert.assertTrue(con.hasStatement(null,null,null,true)); + + con.commit(); + } catch(RepositoryException ex) { + con.rollback(); + } finally { + con.close(); + } } catch(RepositoryException ex) { - con.rollback(); - } finally { - con.close(); + ex.printStackTrace(); // TODO: handle error } - } catch(RepositoryException ex) { - ex.printStackTrace(); // TODO: handle error + } finally { + handler.shutdown(); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java index 86a913e..39d6fff 100644 --- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java +++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/KiWiLoaderTest.java @@ -20,11 +20,7 @@ import org.openrdf.rio.RDFFormat; import org.openrdf.rio.RDFHandlerException; import org.openrdf.rio.RDFParseException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; import java.util.Properties; import java.util.zip.GZIPOutputStream; @@ -237,7 +233,13 @@ public class KiWiLoaderTest { public Repository getRepository() { return super.repository; } - + + @Override + public synchronized void shutdown() throws RepositoryException, RDFHandlerException { + store.setDropTablesOnShutdown(true); + + super.shutdown(); + } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java index 8dfcf68..ce80ccc 100644 --- a/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java +++ b/libraries/kiwi/kiwi-loader/src/test/java/org/apache/marmotta/kiwi/loader/PGCopyUtilTest.java @@ -4,14 +4,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.marmotta.commons.vocabulary.XSD; import org.apache.marmotta.kiwi.config.KiWiConfiguration; import org.apache.marmotta.kiwi.loader.pgsql.PGCopyUtil; -import org.apache.marmotta.kiwi.model.rdf.KiWiAnonResource; -import org.apache.marmotta.kiwi.model.rdf.KiWiBooleanLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiDateLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiDoubleLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiIntLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiNode; -import org.apache.marmotta.kiwi.model.rdf.KiWiStringLiteral; -import org.apache.marmotta.kiwi.model.rdf.KiWiUriResource; +import org.apache.marmotta.kiwi.model.rdf.*; import org.apache.marmotta.kiwi.persistence.KiWiConnection; import org.apache.marmotta.kiwi.persistence.pgsql.PostgreSQLDialect; import org.apache.marmotta.kiwi.sail.KiWiStore; @@ -32,11 +25,7 @@ import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Random; -import java.util.UUID; +import java.util.*; import static org.junit.Assert.assertTrue; @@ -76,6 +65,7 @@ public class PGCopyUtilTest { rnd = new Random(); store = new KiWiStore(psql); + store.setDropTablesOnShutdown(true); repository = new SailRepository(store); repository.initialize(); } @@ -84,10 +74,11 @@ public class PGCopyUtilTest { public void dropDatabase() throws RepositoryException, SQLException, SailException { log.info("cleaning up test setup..."); if (store != null && store.isInitialized()) { + try { assertTrue(store.checkConsistency()); - store.closeValueFactory(); // release all connections before dropping the database - store.getPersistence().dropDatabase(); - repository.shutDown(); + } finally { + repository.shutDown(); + } } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/54670f4c/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java index 513e55e..899b941 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/sail/KiWiStore.java @@ -176,6 +176,7 @@ public class KiWiStore extends NotifyingSailBase { if(dropTablesOnShutdown) { try { + logger.info("dropping database tables ..."); persistence.dropDatabase(); } catch (SQLException e) { logger.error("error dropping database: {}", e.getMessage());
