started working on a separate kiwi-caching module with support for query caching
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/dbb13f65 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/dbb13f65 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/dbb13f65 Branch: refs/heads/develop Commit: dbb13f65783dddd3b5e4c64a68cd471ee76c4be1 Parents: aae66bf Author: Sebastian Schaffert <[email protected]> Authored: Thu Dec 19 11:43:20 2013 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Thu Dec 19 11:43:20 2013 +0100 ---------------------------------------------------------------------- libraries/kiwi/kiwi-caching/pom.xml | 25 +++++++ .../kiwi/caching/sail/KiWiCachingSail.java | 13 +++- .../caching/sail/KiWiCachingSailConnection.java | 62 ++++++++++----- .../GeronimoTransactionManagerLookup.java | 46 ++++++++++++ libraries/kiwi/kiwi-triplestore/pom.xml | 1 - .../marmotta/kiwi/caching/KiWiCacheManager.java | 9 ++- .../kiwi/persistence/KiWiConnection.java | 79 +++++++++++--------- .../src/main/resources/jgroups-kiwi.xml | 4 +- .../core/services/cache/CachingServiceImpl.java | 3 +- .../src/main/resources/jgroups-marmotta.xml | 4 +- 10 files changed, 179 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-caching/pom.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/pom.xml b/libraries/kiwi/kiwi-caching/pom.xml index 2ab4417..df65154 100644 --- a/libraries/kiwi/kiwi-caching/pom.xml +++ b/libraries/kiwi/kiwi-caching/pom.xml @@ -46,6 +46,31 @@ <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.geronimo.components</groupId> + <artifactId>geronimo-transaction</artifactId> + <version>3.1.1</version> + + <exclusions> + <exclusion> + <groupId>org.apache.geronimo.ext.tomcat</groupId> + <artifactId>juli</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.geronimo.ext.tomcat</groupId> + <artifactId>util</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-servlet_3.0_spec</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jta_1.1_spec</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- Logging --> <dependency> http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java index 8e4f34a..302cb43 100644 --- a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSail.java @@ -18,13 +18,13 @@ package org.apache.marmotta.kiwi.caching.sail; import org.apache.marmotta.kiwi.caching.config.KiWiQueryCacheConfiguration; +import org.apache.marmotta.kiwi.caching.transaction.GeronimoTransactionManagerLookup; import org.apache.marmotta.kiwi.sail.KiWiStore; import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.transaction.TransactionMode; -import org.infinispan.transaction.lookup.GenericTransactionManagerLookup; import org.openrdf.sail.NotifyingSail; import org.openrdf.sail.NotifyingSailConnection; import org.openrdf.sail.Sail; @@ -63,12 +63,18 @@ public class KiWiCachingSail extends NotifyingSailWrapper { super(baseSail); this.parent = getRootSail(baseSail); - this.cacheManager = parent.getPersistence().getCacheManager().getCacheManager(); this.configuration = configuration; } @Override + public void initialize() throws SailException { + super.initialize(); + + this.cacheManager = parent.getPersistence().getCacheManager().getCacheManager(); + } + + @Override public NotifyingSailConnection getConnection() throws SailException { return new KiWiCachingSailConnection(super.getConnection(), getQueryCache(), configuration.getMaxEntrySize()); } @@ -86,7 +92,8 @@ public class KiWiCachingSail extends NotifyingSailWrapper { Configuration tripleConfiguration = new ConfigurationBuilder().read(cacheManager.getDefaultCacheConfiguration()) .transaction() .transactionMode(TransactionMode.TRANSACTIONAL) - .transactionManagerLookup(new GenericTransactionManagerLookup()) + .transactionManagerLookup(new GeronimoTransactionManagerLookup()) + .cacheStopTimeout(1, TimeUnit.SECONDS) .eviction() . maxEntries(configuration.getMaxCacheSize()) .expiration() http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java index 25af151..21ac100 100644 --- a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/sail/KiWiCachingSailConnection.java @@ -18,7 +18,7 @@ package org.apache.marmotta.kiwi.caching.sail; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; +import com.google.common.collect.Lists; import info.aduna.iteration.CloseableIteration; import info.aduna.iteration.Iteration; import info.aduna.iteration.UnionIteration; @@ -65,11 +65,6 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { this.queryCache = queryCache; this.limit = limit; - try { - queryCache.getAdvancedCache().getTransactionManager().begin(); - } catch (NotSupportedException | SystemException e) { - log.error("error starting cache transaction: ",e); - } } @@ -104,32 +99,54 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { @Override public void addStatement(Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { - tripleUpdated(subj, pred, obj, contexts); + tripleUpdated(subj, pred, obj, resolveContexts(contexts)); super.addStatement(subj, pred, obj, contexts); } @Override public void removeStatements(Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { - tripleUpdated(subj, pred, obj, contexts); + // TODO: too aggressive, but currently we cannot remove with wildcards + queryCache.clear(); super.removeStatements(subj, pred, obj, contexts); } @Override public void addStatement(UpdateContext modify, Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { - tripleUpdated(subj, pred, obj, contexts); + tripleUpdated(subj, pred, obj, resolveContexts(contexts)); super.addStatement(modify, subj, pred, obj, contexts); } @Override public void removeStatement(UpdateContext modify, Resource subj, URI pred, Value obj, Resource... contexts) throws SailException { - tripleUpdated(subj, pred, obj, contexts); + // TODO: too aggressive, but currently we cannot remove with wildcards + queryCache.clear(); super.removeStatement(modify, subj, pred, obj, contexts); } + + @Override + public void clear(Resource... contexts) throws SailException { + // TODO: too aggressive, but currently we cannot remove with wildcards + queryCache.clear(); + + super.clear(contexts); + } + + @Override + public void begin() throws SailException { + super.begin(); + + try { + queryCache.getAdvancedCache().getTransactionManager().begin(); + } catch (NotSupportedException | SystemException e) { + log.error("error starting cache transaction: ",e); + } + } + @Override public void commit() throws SailException { try { @@ -153,11 +170,22 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { } - private Set<Resource> resolveContexts(Resource... contexts) { + @Override + public void close() throws SailException { + try { + queryCache.getAdvancedCache().getTransactionManager().suspend(); + } catch (SystemException e) { + log.error("error suspending transaction",e); + } + + super.close(); + } + + private List<Resource> resolveContexts(Resource... contexts) { if(contexts.length == 0) { - return Collections.singleton((Resource)defaultContext); + return Collections.singletonList((Resource) defaultContext); } else { - return Sets.newHashSet(contexts); + return Lists.newArrayList(contexts); } } @@ -195,7 +223,7 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { // cache the query result IntArray key = createCacheKey(subject,property,object,context,inferred); - queryCache.put(key,result); + queryCache.putAsync(key, result); // cache the nodes of the triples and the triples themselves Set<Value> nodes = new HashSet<Value>(); @@ -203,7 +231,7 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { if(stmt instanceof KiWiTriple) { KiWiTriple triple = (KiWiTriple)stmt; Collections.addAll(nodes, new Value[]{triple.getSubject(), triple.getObject(), triple.getPredicate(), triple.getContext()}); - queryCache.put(createCacheKey(triple.getSubject(),triple.getPredicate(),triple.getObject(),triple.getContext(),triple.isInferred()), ImmutableList.of(stmt)); + queryCache.putAsync(createCacheKey(triple.getSubject(), triple.getPredicate(), triple.getObject(), triple.getContext(), triple.isInferred()), ImmutableList.of(stmt)); } } @@ -221,7 +249,7 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { } for(Map.Entry<URI,List<Statement>> entry : properties.entrySet()) { IntArray key2 = createCacheKey(subject,entry.getKey(),null,context,inferred); - queryCache.put(key2,entry.getValue()); + queryCache.putAsync(key2, entry.getValue()); } } @@ -244,7 +272,7 @@ public class KiWiCachingSailConnection extends NotifyingSailConnectionWrapper { * the triple update need to be cleared. * */ - private void tripleUpdated(Resource subject, URI predicate, Value object, Resource... contexts) { + private void tripleUpdated(Resource subject, URI predicate, Value object, Iterable<Resource> contexts) { queryCache.remove(createCacheKey(null,null,null,null,false)); queryCache.remove(createCacheKey(null,null,null,null,true)); http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/transaction/GeronimoTransactionManagerLookup.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/transaction/GeronimoTransactionManagerLookup.java b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/transaction/GeronimoTransactionManagerLookup.java new file mode 100644 index 0000000..738b7fa --- /dev/null +++ b/libraries/kiwi/kiwi-caching/src/main/java/org/apache/marmotta/kiwi/caching/transaction/GeronimoTransactionManagerLookup.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.marmotta.kiwi.caching.transaction; + +import org.apache.geronimo.transaction.manager.TransactionManagerImpl; +import org.infinispan.transaction.lookup.TransactionManagerLookup; + +import javax.transaction.TransactionManager; + +/** + * Add file description here! + * + * @author Sebastian Schaffert ([email protected]) + */ +public class GeronimoTransactionManagerLookup implements TransactionManagerLookup { + + private TransactionManager manager; + + /** + * Returns a new TransactionManager. + * + * @throws Exception if lookup failed + */ + @Override + public TransactionManager getTransactionManager() throws Exception { + if(manager == null) { + manager = new TransactionManagerImpl(); + } + return manager; + } +} http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-triplestore/pom.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/pom.xml b/libraries/kiwi/kiwi-triplestore/pom.xml index 58a7b72..decfc63 100644 --- a/libraries/kiwi/kiwi-triplestore/pom.xml +++ b/libraries/kiwi/kiwi-triplestore/pom.xml @@ -65,7 +65,6 @@ <artifactId>infinispan-core</artifactId> </dependency> - <!-- JDBC connection pooling --> <dependency> <groupId>org.apache.tomcat</groupId> http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java index dfe8d60..00cf25e 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java @@ -94,12 +94,11 @@ public class KiWiCacheManager { .cacheMode(CacheMode.DIST_ASYNC) .async() .asyncMarshalling() - .useReplQueue(true) .l1() .lifespan(5, TimeUnit.MINUTES) .hash() .numOwners(2) - .numSegments(100) + .numSegments(40) .consistentHashFactory(new SyncConsistentHashFactory()) .eviction() .strategy(EvictionStrategy.LIRS) @@ -314,8 +313,7 @@ public class KiWiCacheManager { .clustering() .cacheMode(CacheMode.REPL_SYNC) .sync() - .l1() - .lifespan(25, TimeUnit.SECONDS) + .replTimeout(15, TimeUnit.SECONDS) .eviction() .strategy(EvictionStrategy.NONE) .build(); @@ -393,9 +391,12 @@ public class KiWiCacheManager { if(embedded && cacheManager.getStatus() == ComponentStatus.RUNNING) { log.warn("shutting down cache manager ..."); if(cacheManager.getTransport() != null) { + log.info("... shutting down transport ..."); cacheManager.getTransport().stop(); } + log.info("... shutting down main component ..."); cacheManager.stop(); + log.info("... done!"); } } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java index 0534bf0..6d932fb 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/persistence/KiWiConnection.java @@ -116,6 +116,8 @@ public class KiWiConnection { private boolean batchCommit = true; + private boolean closed = false; + private int batchSize = 1000; private ReentrantLock commitLock; @@ -992,12 +994,12 @@ public class KiWiConnection { Double dbl_value = null; Long lng_value = null; if(stringLiteral.getContent().length() < 64 && NumberUtils.isNumber(stringLiteral.getContent())) - try { - dbl_value = Double.parseDouble(stringLiteral.getContent()); - lng_value = Long.parseLong(stringLiteral.getContent()); - } catch (NumberFormatException ex) { - // ignore, keep NaN - } + try { + dbl_value = Double.parseDouble(stringLiteral.getContent()); + lng_value = Long.parseLong(stringLiteral.getContent()); + } catch (NumberFormatException ex) { + // ignore, keep NaN + } PreparedStatement insertNode = getPreparedStatement("store.sliteral"); @@ -2107,6 +2109,8 @@ public class KiWiConnection { * @exception java.sql.SQLException SQLException if a database access error occurs */ public void close() throws SQLException { + closed = true; + if(connection != null) { // close all prepared statements try { @@ -2187,7 +2191,7 @@ public class KiWiConnection { return transactionId; } - protected static interface RetryCommand<T> { + protected interface RetryCommand<T> { public T run() throws SQLException; } @@ -2197,7 +2201,7 @@ public class KiWiConnection { * and should be retried several times before giving up completely. * */ - protected static class RetryExecution<T> { + protected class RetryExecution<T> { // counter for current number of retries private int retries = 0; @@ -2259,44 +2263,47 @@ public class KiWiConnection { } public T execute(Connection connection, RetryCommand<T> command) throws SQLException { - Savepoint savepoint = null; - if(useSavepoint) { - savepoint = connection.setSavepoint(); - } - try { - T result = command.run(); - - if(useSavepoint && savepoint != null) { - connection.releaseSavepoint(savepoint); + if(!closed) { + Savepoint savepoint = null; + if(useSavepoint) { + savepoint = connection.setSavepoint(); } + try { + T result = command.run(); - return result; - } catch (SQLException ex) { - if(retries < maxRetries && (sqlStates.size() == 0 || sqlStates.contains(ex.getSQLState()))) { if(useSavepoint && savepoint != null) { - connection.rollback(savepoint); + connection.releaseSavepoint(savepoint); } - Random rnd = new Random(); - long sleep = retryInterval - 250 + rnd.nextInt(500); - log.warn("{}: temporary conflict, retrying in {} ms ... (thread={}, retry={})", name, sleep, Thread.currentThread().getName(), retries); - try { - Thread.sleep(sleep); - } catch (InterruptedException e) {} - retries++; - T result = execute(connection, command); - retries--; return result; - } else { - log.error("{}: temporary conflict could not be solved! (error: {})", name, ex.getMessage()); + } catch (SQLException ex) { + if(retries < maxRetries && (sqlStates.size() == 0 || sqlStates.contains(ex.getSQLState()))) { + if(useSavepoint && savepoint != null) { + connection.rollback(savepoint); + } + Random rnd = new Random(); + long sleep = retryInterval - 250 + rnd.nextInt(500); + log.warn("{}: temporary conflict, retrying in {} ms ... (thread={}, retry={})", name, sleep, Thread.currentThread().getName(), retries); + try { + Thread.sleep(sleep); + } catch (InterruptedException e) {} + retries++; + T result = execute(connection, command); + retries--; + + return result; + } else { + log.error("{}: temporary conflict could not be solved! (error: {})", name, ex.getMessage()); - log.debug("main exception:",ex); - log.debug("next exception:",ex.getNextException()); - throw ex; + log.debug("main exception:",ex); + log.debug("next exception:",ex.getNextException()); + throw ex; + } } + } else { + return null; } - } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml index fb9b7d3..b0b05c5 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml +++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml @@ -63,8 +63,8 @@ <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> - <FD_ALL timeout="15000" interval="3000"/> - <VERIFY_SUSPECT timeout="1500"/> + <FD_ALL timeout="30000" interval="3000"/> + <VERIFY_SUSPECT timeout="1500" num_msgs="5"/> <pbcast.NAKACK2 xmit_interval="1000" http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java index b52b6a5..feceaa0 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java @@ -97,12 +97,11 @@ public class CachingServiceImpl implements CachingService { .cacheMode(CacheMode.DIST_ASYNC) .async() .asyncMarshalling() - .useReplQueue(true) .l1() .lifespan(5, TimeUnit.MINUTES) .hash() .numOwners(2) - .numSegments(100) + .numSegments(40) .consistentHashFactory(new SyncConsistentHashFactory()) .eviction() .strategy(EvictionStrategy.LIRS) http://git-wip-us.apache.org/repos/asf/marmotta/blob/dbb13f65/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml b/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml index fb9b7d3..0c6d184 100644 --- a/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml +++ b/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml @@ -63,8 +63,8 @@ <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> - <FD_ALL timeout="15000" interval="3000"/> - <VERIFY_SUSPECT timeout="1500"/> + <FD_ALL timeout="30000" interval="3000"/> + <VERIFY_SUSPECT timeout="1500" num_msgs="5"/> <pbcast.NAKACK2 xmit_interval="1000"
