finished MARMOTTA-388: transition from EHCache to Infinispan
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/61c14a19 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/61c14a19 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/61c14a19 Branch: refs/heads/develop Commit: 61c14a19f2b38b65743e8ef3991b13f7b4ec3cac Parents: af9a800 Author: Sebastian Schaffert <[email protected]> Authored: Tue Dec 17 14:13:17 2013 +0100 Committer: Sebastian Schaffert <[email protected]> Committed: Tue Dec 17 14:13:17 2013 +0100 ---------------------------------------------------------------------- .../marmotta/kiwi/caching/KiWiCacheManager.java | 27 +++- .../src/main/resources/jgroups-kiwi.xml | 74 +++++++++++ .../backend/kiwi/model/KiWiCacheEntry.java | 4 +- .../LDCachingKiWiPersistenceConnection.java | 27 ++-- parent/pom.xml | 5 - .../backend/kiwi/KiWiStoreProvider.java | 2 +- .../main/resources/config-defaults.properties | 3 +- .../resources/config-descriptions.properties | 3 - .../platform/core/api/cache/CachingService.java | 15 ++- .../core/api/modules/ResourceEntry.java | 3 +- .../core/logging/CacheLoggingModule.java | 2 +- .../core/services/cache/CachingServiceImpl.java | 133 ++++++++++++++----- .../services/http/HttpClientServiceImpl.java | 111 +++++++++++++++- .../modules/MarmottaResourceServiceImpl.java | 12 +- .../main/resources/config-defaults.properties | 10 ++ .../resources/config-descriptions.properties | 7 + .../src/main/resources/jgroups-marmotta.xml | 74 +++++++++++ .../user/services/AccountServiceImpl.java | 21 ++- 18 files changed, 438 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java index 8af3ba6..f22a6ad 100644 --- a/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java +++ b/libraries/kiwi/kiwi-triplestore/src/main/java/org/apache/marmotta/kiwi/caching/KiWiCacheManager.java @@ -70,7 +70,7 @@ public class KiWiCacheManager { .defaultTransport() .clusterName(config.getName()) .machineId("instance-" + config.getDatacenterId()) - .addProperty("configurationFile", "jgroups-udp.xml") + .addProperty("configurationFile", "jgroups-kiwi.xml") .globalJmxStatistics() .build(); @@ -154,6 +154,28 @@ public class KiWiCacheManager { /** + * Return the triple id -> triple cache from the cache manager. This cache is used for speeding up the + * construction of query results. + * + * @return + */ + public Cache getQueryCache() { + if(!cacheManager.cacheExists(TRIPLE_CACHE)) { + Configuration tripleConfiguration = new ConfigurationBuilder().read(defaultConfiguration) + .eviction() + .maxEntries(100000) + .expiration() + .lifespan(5, TimeUnit.MINUTES) + .maxIdle(60, TimeUnit.SECONDS) + .build(); + cacheManager.defineConfiguration(TRIPLE_CACHE, tripleConfiguration); + } + return cacheManager.getCache(TRIPLE_CACHE); + } + + + + /** * Return the uri -> KiWiUriResource cache from the cache manager. This cache is used when constructing new * KiWiUriResources to avoid a database lookup. * @@ -318,7 +340,8 @@ public class KiWiCacheManager { String cacheName = iterator.next(); Cache<String,Object> cache = cacheManager.getCache(cacheName); cache.clear(); - } } + } + } /** * Shutdown this cache manager instance. Will shutdown the underlying EHCache cache manager. http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml ---------------------------------------------------------------------- diff --git a/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml new file mode 100644 index 0000000..aa5ce8c --- /dev/null +++ b/libraries/kiwi/kiwi-triplestore/src/main/resources/jgroups-kiwi.xml @@ -0,0 +1,74 @@ +<config xmlns="urn:org:jgroups" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd"> + <UDP + mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" + mcast_port="${jgroups.udp.mcast_port:46655}" + tos="8" + ucast_recv_buf_size="20m" + ucast_send_buf_size="640k" + mcast_recv_buf_size="25m" + mcast_send_buf_size="640k" + loopback="true" + max_bundle_size="31k" + ip_ttl="${jgroups.udp.ip_ttl:2}" + enable_diagnostics="false" + bundler_type="old" + + thread_naming_pattern="pl" + + thread_pool.enabled="true" + thread_pool.min_threads="2" + thread_pool.max_threads="30" + thread_pool.keep_alive_time="60000" + thread_pool.queue_enabled="true" + thread_pool.queue_max_size="100" + thread_pool.rejection_policy="Discard" + + oob_thread_pool.enabled="true" + oob_thread_pool.min_threads="2" + oob_thread_pool.max_threads="30" + oob_thread_pool.keep_alive_time="60000" + oob_thread_pool.queue_enabled="false" + oob_thread_pool.queue_max_size="100" + oob_thread_pool.rejection_policy="Discard" + + internal_thread_pool.enabled="true" + internal_thread_pool.min_threads="1" + internal_thread_pool.max_threads="10" + internal_thread_pool.keep_alive_time="60000" + internal_thread_pool.queue_enabled="true" + internal_thread_pool.queue_max_size="100" + internal_thread_pool.rejection_policy="Discard" + /> + + <PING timeout="3000" num_initial_members="3"/> + <MERGE2 max_interval="30000" min_interval="10000"/> + + <FD_SOCK/> + <FD_ALL timeout="15000" interval="3000"/> + <VERIFY_SUSPECT timeout="1500"/> + + <pbcast.NAKACK2 + xmit_interval="1000" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="10000" + xmit_table_max_compaction_time="10000" + max_msg_batch_size="100"/> + <UNICAST3 + xmit_interval="500" + xmit_table_num_rows="20" + xmit_table_msgs_per_row="10000" + xmit_table_max_compaction_time="10000" + max_msg_batch_size="100" + conn_expiry_timeout="0"/> + + <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1m"/> + <pbcast.GMS print_local_addr="false" join_timeout="3000" view_bundling="true"/> + <tom.TOA/> <!-- the TOA is only needed for total order transactions--> + + <UFC max_credits="2m" min_threshold="0.40"/> + <MFC max_credits="2m" min_threshold="0.40"/> + <FRAG2 frag_size="30k" /> + <RSVP timeout="60000" resend_interval="500" ack_on_delivery="false" /> +</config> http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/model/KiWiCacheEntry.java ---------------------------------------------------------------------- diff --git a/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/model/KiWiCacheEntry.java b/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/model/KiWiCacheEntry.java index 265970d..fb57cbd 100644 --- a/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/model/KiWiCacheEntry.java +++ b/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/model/KiWiCacheEntry.java @@ -19,12 +19,14 @@ package org.apache.marmotta.ldcache.backend.kiwi.model; import org.apache.marmotta.ldcache.model.CacheEntry; +import java.io.Serializable; + /** * Add file description here! * <p/> * Author: Sebastian Schaffert ([email protected]) */ -public class KiWiCacheEntry extends CacheEntry { +public class KiWiCacheEntry extends CacheEntry implements Serializable { Long id; http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/persistence/LDCachingKiWiPersistenceConnection.java ---------------------------------------------------------------------- diff --git a/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/persistence/LDCachingKiWiPersistenceConnection.java b/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/persistence/LDCachingKiWiPersistenceConnection.java index bd79539..a0691d2 100644 --- a/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/persistence/LDCachingKiWiPersistenceConnection.java +++ b/libraries/ldcache/ldcache-backend-kiwi/src/main/java/org/apache/marmotta/ldcache/backend/kiwi/persistence/LDCachingKiWiPersistenceConnection.java @@ -18,8 +18,6 @@ package org.apache.marmotta.ldcache.backend.kiwi.persistence; import info.aduna.iteration.CloseableIteration; -import net.sf.ehcache.Cache; -import net.sf.ehcache.Element; import org.apache.marmotta.kiwi.model.rdf.KiWiNode; import org.apache.marmotta.kiwi.model.rdf.KiWiResource; import org.apache.marmotta.kiwi.persistence.KiWiConnection; @@ -27,6 +25,7 @@ import org.apache.marmotta.kiwi.persistence.util.ResultSetIteration; import org.apache.marmotta.kiwi.persistence.util.ResultTransformerFunction; import org.apache.marmotta.ldcache.backend.kiwi.model.KiWiCacheEntry; import org.apache.marmotta.ldcache.model.CacheEntry; +import org.infinispan.Cache; import org.openrdf.model.URI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,13 +52,13 @@ public class LDCachingKiWiPersistenceConnection { /** * Cache entries by resource */ - private Cache entryResourceCache; + private Cache<String,KiWiCacheEntry> entryResourceCache; /** * Cache entries by ID */ - private Cache entryIdCache; + private Cache<Long,KiWiCacheEntry> entryIdCache; public LDCachingKiWiPersistenceConnection(KiWiConnection connection) throws SQLException { @@ -72,11 +71,11 @@ public class LDCachingKiWiPersistenceConnection { public KiWiCacheEntry constructCacheEntry(ResultSet row) throws SQLException { Long id = row.getLong("id"); - Element cached = entryIdCache.get(id); + KiWiCacheEntry cached = entryIdCache.get(id); // lookup element in cache first, so we can avoid reconstructing it if it is already there if(cached != null) { - return (KiWiCacheEntry)cached.getObjectValue(); + return cached; } KiWiCacheEntry entry = new KiWiCacheEntry(); @@ -87,8 +86,8 @@ public class LDCachingKiWiPersistenceConnection { entry.setResource((URI) connection.loadNodeById(row.getLong("resource_id"))); entry.setTripleCount(row.getInt("triple_count")); - entryIdCache.put(new Element(id,entry)); - entryResourceCache.put(new Element(entry.getResource().stringValue(),entry)); + entryIdCache.put(id,entry); + entryResourceCache.put(entry.getResource().stringValue(),entry); return entry; } @@ -103,11 +102,11 @@ public class LDCachingKiWiPersistenceConnection { */ public KiWiCacheEntry getCacheEntry(String uri) throws SQLException { - Element cached = entryResourceCache.get(uri); + KiWiCacheEntry cached = entryResourceCache.get(uri); // lookup element in cache first, so we can avoid reconstructing it if it is already there if(cached != null) { - return (KiWiCacheEntry)cached.getObjectValue(); + return cached; } PreparedStatement query = connection.getPreparedStatement("load.entry_by_uri"); @@ -169,8 +168,8 @@ public class LDCachingKiWiPersistenceConnection { log.debug("persisted ld-cache entry with id {}", kEntry.getId()); - entryIdCache.put(new Element(kEntry.getId(),kEntry)); - entryResourceCache.put(new Element(kEntry.getResource().stringValue(),kEntry)); + entryIdCache.put(kEntry.getId(),kEntry); + entryResourceCache.put(kEntry.getResource().stringValue(),kEntry); } @@ -205,11 +204,11 @@ public class LDCachingKiWiPersistenceConnection { deleteEntry.setString(1,uri); deleteEntry.executeUpdate(); - Element cached = entryResourceCache.get(uri); + KiWiCacheEntry cached = entryResourceCache.get(uri); if(cached != null) { entryResourceCache.remove(uri); - entryIdCache.remove(((KiWiCacheEntry) cached.getObjectValue()).getId()); + entryIdCache.remove(cached.getId()); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 62cf99f..8be8f54 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -1449,11 +1449,6 @@ </dependency> <dependency> <groupId>org.apache.marmotta</groupId> - <artifactId>ldcache-backend-ehcache</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.marmotta</groupId> <artifactId>ldcache-sail-kiwi</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/backends/marmotta-backend-kiwi/src/main/java/org/apache/marmotta/platform/backend/kiwi/KiWiStoreProvider.java ---------------------------------------------------------------------- diff --git a/platform/backends/marmotta-backend-kiwi/src/main/java/org/apache/marmotta/platform/backend/kiwi/KiWiStoreProvider.java b/platform/backends/marmotta-backend-kiwi/src/main/java/org/apache/marmotta/platform/backend/kiwi/KiWiStoreProvider.java index 979c806..b7d63e2 100644 --- a/platform/backends/marmotta-backend-kiwi/src/main/java/org/apache/marmotta/platform/backend/kiwi/KiWiStoreProvider.java +++ b/platform/backends/marmotta-backend-kiwi/src/main/java/org/apache/marmotta/platform/backend/kiwi/KiWiStoreProvider.java @@ -112,7 +112,7 @@ public class KiWiStoreProvider implements StoreProvider { configuration.setDatacenterId(configurationService.getIntConfiguration(DATACENTER_ID,0)); configuration.setFulltextEnabled(configurationService.getBooleanConfiguration(FULLTEXT_ENABLED, true)); configuration.setFulltextLanguages(configurationService.getListConfiguration(FULLTEXT_LANGUAGES, ImmutableList.of("en"))); - configuration.setClustered(configurationService.getBooleanConfiguration("database.clustered", false)); + configuration.setClustered(configurationService.getBooleanConfiguration("clustering.enabled", false)); if("native".equalsIgnoreCase(configurationService.getStringConfiguration(SPARQL_STRATEGY))) { return new KiWiSparqlSail(new KiWiStore(configuration)); http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/backends/marmotta-backend-kiwi/src/main/resources/config-defaults.properties ---------------------------------------------------------------------- diff --git a/platform/backends/marmotta-backend-kiwi/src/main/resources/config-defaults.properties b/platform/backends/marmotta-backend-kiwi/src/main/resources/config-defaults.properties index 65c013d..6ee6eb6 100644 --- a/platform/backends/marmotta-backend-kiwi/src/main/resources/config-defaults.properties +++ b/platform/backends/marmotta-backend-kiwi/src/main/resources/config-defaults.properties @@ -69,6 +69,5 @@ database.mysql.driver = com.mysql.jdbc.Driver database.mysql.url = jdbc:mysql://localhost:3306/lmf?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true -# Turn on cluster-specific configuration options (e.g. replicated and distributed caching, synchronization, ...) -database.clustered = false + http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/backends/marmotta-backend-kiwi/src/main/resources/config-descriptions.properties ---------------------------------------------------------------------- diff --git a/platform/backends/marmotta-backend-kiwi/src/main/resources/config-descriptions.properties b/platform/backends/marmotta-backend-kiwi/src/main/resources/config-descriptions.properties index c214f3f..49c53a2 100644 --- a/platform/backends/marmotta-backend-kiwi/src/main/resources/config-descriptions.properties +++ b/platform/backends/marmotta-backend-kiwi/src/main/resources/config-descriptions.properties @@ -53,6 +53,3 @@ database.fulltext.enabled.type = java.lang.Boolean database.fulltext.languages.description = list of languages supported by fulltext search; a fulltext index will be created for each language (PostgreSQL only) database.fulltext.languages.type = java.util.List - -database.clustered.description = Turn on cluster-specific configuration options (e.g. replicated and distributed caching, synchronization, ...) -database.clustered.type = java.lang.Boolean http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/cache/CachingService.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/cache/CachingService.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/cache/CachingService.java index 4329a3e..00cb18f 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/cache/CachingService.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/cache/CachingService.java @@ -17,10 +17,11 @@ */ package org.apache.marmotta.platform.core.api.cache; -import net.sf.ehcache.CacheManager; -import net.sf.ehcache.Ehcache; +import org.infinispan.Cache; +import org.infinispan.manager.EmbeddedCacheManager; import javax.enterprise.inject.spi.InjectionPoint; +import java.util.Set; /** * Add file description here! @@ -29,13 +30,15 @@ import javax.enterprise.inject.spi.InjectionPoint; */ public interface CachingService { - public Ehcache getCache(InjectionPoint injectionPoint); - public String[] getCacheNames(); + + public Cache getCache(InjectionPoint injectionPoint); + + public Set<String> getCacheNames(); public void clearAll(); - Ehcache getCacheByName(String cacheName); + public Cache getCacheByName(String cacheName); - CacheManager getCacheManager(); + public EmbeddedCacheManager getCacheManager(); } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/modules/ResourceEntry.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/modules/ResourceEntry.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/modules/ResourceEntry.java index 192e115..d3d4b39 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/modules/ResourceEntry.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/api/modules/ResourceEntry.java @@ -17,6 +17,7 @@ */ package org.apache.marmotta.platform.core.api.modules; +import java.io.Serializable; import java.net.URL; /** @@ -24,7 +25,7 @@ import java.net.URL; * <p/> * User: sschaffe */ -public class ResourceEntry { +public class ResourceEntry implements Serializable { /** * The path relative to the web application root under which this resource is accessed. http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/logging/CacheLoggingModule.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/logging/CacheLoggingModule.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/logging/CacheLoggingModule.java index 38eb5f7..af6709c 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/logging/CacheLoggingModule.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/logging/CacheLoggingModule.java @@ -58,6 +58,6 @@ public class CacheLoggingModule extends BaseLoggingModule { */ @Override public Collection<String> getPackages() { - return ImmutableSet.of("net.sf.ehcache", "org.apache.marmotta.platform.core.services.cache"); + return ImmutableSet.of("org.infinispan", "org.apache.marmotta.platform.core.services.cache"); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java index f1e9d80..729a563 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/cache/CachingServiceImpl.java @@ -18,10 +18,19 @@ package org.apache.marmotta.platform.core.services.cache; import org.apache.marmotta.platform.core.api.cache.CachingService; +import org.apache.marmotta.platform.core.api.config.ConfigurationService; import org.apache.marmotta.platform.core.events.SystemRestartingEvent; import org.apache.marmotta.platform.core.qualifiers.cache.MarmottaCache; -import net.sf.ehcache.CacheManager; -import net.sf.ehcache.Ehcache; +import org.infinispan.Cache; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.Configuration; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfiguration; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.distribution.ch.SyncConsistentHashFactory; +import org.infinispan.eviction.EvictionStrategy; +import org.infinispan.manager.DefaultCacheManager; +import org.infinispan.manager.EmbeddedCacheManager; import org.slf4j.Logger; import javax.annotation.PostConstruct; @@ -31,7 +40,9 @@ import javax.enterprise.event.Observes; import javax.enterprise.inject.Produces; import javax.enterprise.inject.spi.InjectionPoint; import javax.inject.Inject; -import java.net.URL; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; /** * A service that offers a EHCache system cache implementation for use by other components @@ -47,8 +58,16 @@ public class CachingServiceImpl implements CachingService { @Inject private Logger log; + @Inject + private ConfigurationService configurationService; + + private EmbeddedCacheManager cacheManager; + + private GlobalConfiguration globalConfiguration; + + // default configuration: distributed cache, 100000 entries, 300 seconds expiration, 60 seconds idle + private Configuration defaultConfiguration; - private CacheManager manager; public CachingServiceImpl() { } @@ -56,15 +75,57 @@ public class CachingServiceImpl implements CachingService { @PostConstruct public void initialize() { - URL url = this.getClass().getClassLoader().getResource("ehcache-marmotta.xml"); + boolean clustered = configurationService.getBooleanConfiguration("clustering.enabled", false); + + log.info("Apache Marmotta Caching Service starting up ({}) ...", clustered ? "clustering" : "single host" ); + if(clustered) { + globalConfiguration = new GlobalConfigurationBuilder() + .transport() + .defaultTransport() + .clusterName(configurationService.getStringConfiguration("clustering.name", "Marmotta")) + .machineId(configurationService.getServerName()) + .addProperty("configurationFile", "jgroups-marmotta.xml") + .globalJmxStatistics() + .build(); + + + defaultConfiguration = new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.DIST_ASYNC) + .async() + .l1() + .lifespan(25, TimeUnit.SECONDS) + .hash() + .numOwners(2) + .numSegments(100) + .consistentHashFactory(new SyncConsistentHashFactory()) + .eviction() + .strategy(EvictionStrategy.LIRS) + .maxEntries(1000) + .expiration() + .lifespan(5, TimeUnit.MINUTES) + .maxIdle(1, TimeUnit.MINUTES) + .build(); + } else { + globalConfiguration = new GlobalConfigurationBuilder() + .globalJmxStatistics() + .build(); + + defaultConfiguration = new ConfigurationBuilder() + .clustering() + .cacheMode(CacheMode.LOCAL) + .eviction() + .strategy(EvictionStrategy.LIRS) + .maxEntries(1000) + .expiration() + .lifespan(5, TimeUnit.MINUTES) + .maxIdle(1, TimeUnit.MINUTES) + .build(); - // backwards compatibility - if(url == null) { - url = this.getClass().getClassLoader().getResource("ehcache-lmf.xml"); } - log.info("Apache Marmotta Caching Service starting up (configuration at {}) ...",url); - manager = CacheManager.create(url); + + cacheManager = new DefaultCacheManager(globalConfiguration, defaultConfiguration, true); } /** @@ -82,35 +143,39 @@ public class CachingServiceImpl implements CachingService { */ @Override @Produces @MarmottaCache("") - public Ehcache getCache(InjectionPoint injectionPoint) { + public Cache getCache(InjectionPoint injectionPoint) { String cacheName = injectionPoint.getAnnotated().getAnnotation(MarmottaCache.class).value(); return getCacheByName(cacheName); } - @Override - public Ehcache getCacheByName(String cacheName) { - if(!manager.cacheExists(cacheName)) { - log.info("added new cache with name {}",cacheName); - manager.addCache(cacheName); - } + /** + * Allow CDI injection of the default cache + * @return + */ + @Produces + public Configuration getDefaultConfiguration() { + return defaultConfiguration; + } - Ehcache cache = manager.getEhcache(cacheName); - cache.setStatisticsEnabled(true); - return cache; + @Override + public Cache getCacheByName(String cacheName) { + return cacheManager.getCache(cacheName, true); } @Override - public String[] getCacheNames() { - return manager.getCacheNames(); + public Set<String> getCacheNames() { + return cacheManager.getCacheNames(); } @Override - public CacheManager getCacheManager() { - return manager; + @Produces + @ApplicationScoped + public EmbeddedCacheManager getCacheManager() { + return cacheManager; } @@ -120,27 +185,27 @@ public class CachingServiceImpl implements CachingService { */ public void systemRestart(@Observes SystemRestartingEvent e) { log.warn("system restarted, flushing caches ..."); - manager.clearAll(); + cacheManager.stop(); + cacheManager.start(); } @Override public void clearAll() { - manager.clearAll(); + Set<String> set = cacheManager.getCacheNames(); + Iterator<String> iterator = set.iterator(); + while(iterator.hasNext()){ + String cacheName = iterator.next(); + Cache<String,Object> cache = cacheManager.getCache(cacheName); + cache.clear(); + } } @PreDestroy public void destroy() { log.info("Apache Marmotta Caching Service shutting down ..."); - /* - for(String cacheName : manager.getCacheNames()) { - log.info("Disposing cache {} ...",cacheName); - Cache cache = manager.getCache(cacheName); - cache.dispose(); - } - */ - manager.shutdown(); + cacheManager.stop(); log.info("Apache Marmotta Caching Service shut down successfully."); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java index afda0df..7d27576 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/http/HttpClientServiceImpl.java @@ -17,14 +17,12 @@ */ package org.apache.marmotta.platform.core.services.http; -import net.sf.ehcache.Ehcache; import org.apache.http.*; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.HttpClient; import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.ResponseHandler; -import org.apache.http.client.cache.CacheResponseStatus; -import org.apache.http.client.cache.HttpCacheStorage; +import org.apache.http.client.cache.*; import org.apache.http.client.methods.*; import org.apache.http.client.params.ClientPNames; import org.apache.http.conn.ClientConnectionManager; @@ -38,7 +36,7 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.cache.CacheConfig; import org.apache.http.impl.client.cache.CachingHttpClient; -import org.apache.http.impl.client.cache.ehcache.EhcacheHttpCacheStorage; +import org.apache.http.impl.client.cache.DefaultHttpCacheEntrySerializer; import org.apache.http.impl.conn.PoolingClientConnectionManager; import org.apache.http.params.*; import org.apache.http.pool.PoolStats; @@ -56,14 +54,16 @@ import org.apache.marmotta.platform.core.qualifiers.cache.MarmottaCache; import org.apache.marmotta.platform.core.services.http.response.LastModifiedResponseHandler; import org.apache.marmotta.platform.core.services.http.response.StatusCodeResponseHandler; import org.apache.marmotta.platform.core.services.http.response.StringBodyResponseHandler; +import org.infinispan.Cache; import org.slf4j.Logger; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; -import javax.enterprise.inject.Instance; import javax.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -101,7 +101,7 @@ public class HttpClientServiceImpl implements HttpClientService { @Inject @MarmottaCache("http-client-cache") - private Instance<Ehcache> ehcache; + private Cache httpCache; private HttpClient httpClient; private IdleConnectionMonitorThread idleConnectionMonitorThread; @@ -308,7 +308,7 @@ public class HttpClientServiceImpl implements HttpClientService { cacheConfig.setMaxCacheEntries(1000); cacheConfig.setMaxObjectSize(81920); - final HttpCacheStorage cacheStore = new EhcacheHttpCacheStorage(ehcache.get(), cacheConfig); + final HttpCacheStorage cacheStore = new InfinispanHttpCacheStorage(httpCache); this.httpClient = new MonitoredHttpClient(new CachingHttpClient(hc, cacheStore, cacheConfig)); } else { @@ -806,4 +806,101 @@ public class HttpClientServiceImpl implements HttpClientService { } } + + + + private static class InfinispanHttpCacheStorage implements HttpCacheStorage { + + Cache<String, byte[]> cache; + + private final HttpCacheEntrySerializer serializer; + + + private InfinispanHttpCacheStorage(Cache<String, byte[]> cache) { + this.cache = cache; + this.serializer = new DefaultHttpCacheEntrySerializer(); + } + + /** + * Store a given cache entry under the given key. + * + * @param key where in the cache to store the entry + * @param entry cached response to store + * @throws java.io.IOException + */ + @Override + public void putEntry(String key, HttpCacheEntry entry) throws IOException { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializer.writeTo(entry, bos); + + cache.put(key,bos.toByteArray()); + } + + /** + * Retrieves the cache entry stored under the given key + * or null if no entry exists under that key. + * + * @param key cache key + * @return an {@link org.apache.http.client.cache.HttpCacheEntry} or {@code null} if no + * entry exists + * @throws java.io.IOException + */ + @Override + public HttpCacheEntry getEntry(String key) throws IOException { + byte[] data = cache.get(key); + if(data == null) { + return null; + } else { + return serializer.readFrom(new ByteArrayInputStream(data)); + } + } + + /** + * Deletes/invalidates/removes any cache entries currently + * stored under the given key. + * + * @param key + * @throws java.io.IOException + */ + @Override + public void removeEntry(String key) throws IOException { + cache.remove(key); + } + + /** + * Atomically applies the given callback to update an existing cache + * entry under a given key. + * + * @param key indicates which entry to modify + * @param callback performs the update; see + * {@link org.apache.http.client.cache.HttpCacheUpdateCallback} for details, but roughly the + * callback expects to be handed the current entry and will return + * the new value for the entry. + * @throws java.io.IOException + * @throws org.apache.http.client.cache.HttpCacheUpdateException + */ + @Override + public void updateEntry(String key, HttpCacheUpdateCallback callback) throws IOException, HttpCacheUpdateException { + final byte[] oldData = cache.get(key); + + HttpCacheEntry existingEntry = null; + if(oldData != null){ + existingEntry = serializer.readFrom(new ByteArrayInputStream(oldData)); + } + + final HttpCacheEntry updatedEntry = callback.update(existingEntry); + + if (existingEntry == null) { + putEntry(key, updatedEntry); + return; + } else { + // Attempt to do a CAS replace, if we fail then retry + // While this operation should work fine within this instance, multiple instances + // could trample each others' data + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + serializer.writeTo(updatedEntry, bos); + cache.replace(key, oldData, bos.toByteArray()); + } + } + } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/modules/MarmottaResourceServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/modules/MarmottaResourceServiceImpl.java b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/modules/MarmottaResourceServiceImpl.java index 1aecba2..0b837e1 100644 --- a/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/modules/MarmottaResourceServiceImpl.java +++ b/platform/marmotta-core/src/main/java/org/apache/marmotta/platform/core/services/modules/MarmottaResourceServiceImpl.java @@ -27,15 +27,13 @@ import org.apache.marmotta.platform.core.api.modules.ResourceEntry; import org.apache.marmotta.platform.core.events.SystemStartupEvent; import org.apache.marmotta.platform.core.model.module.ModuleConfiguration; import org.apache.marmotta.platform.core.qualifiers.cache.MarmottaCache; -import net.sf.ehcache.Ehcache; -import net.sf.ehcache.Element; +import org.infinispan.Cache; import org.slf4j.Logger; import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; import javax.inject.Inject; - import java.io.IOException; import java.net.URL; import java.util.Collection; @@ -66,7 +64,7 @@ public class MarmottaResourceServiceImpl implements MarmottaResourceService { @Inject @MarmottaCache("resource-cache") - private Ehcache resourceCache; + private Cache resourceCache; /** * Used for detecting the mime type of resources contained in KiWi modules @@ -219,12 +217,12 @@ public class MarmottaResourceServiceImpl implements MarmottaResourceService { private boolean isCached(String key) { - return isCacheEnabled() && resourceCache.isKeyInCache(key) && resourceCache.get(key) != null; + return isCacheEnabled() && resourceCache.containsKey(key) && resourceCache.get(key) != null; } private ResourceEntry getFromCache(String key) { if (isCacheEnabled()) - return (ResourceEntry) resourceCache.get(key).getObjectValue(); + return (ResourceEntry) resourceCache.get(key); else return null; } @@ -232,7 +230,7 @@ public class MarmottaResourceServiceImpl implements MarmottaResourceService { // Store in the cache private void putInCache(String key, ResourceEntry data) { if(isCacheEnabled()) { - resourceCache.put(new Element(key,data)); + resourceCache.put(key,data); } } http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/resources/config-defaults.properties ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/resources/config-defaults.properties b/platform/marmotta-core/src/main/resources/config-defaults.properties index 3933aae..278dd9a 100644 --- a/platform/marmotta-core/src/main/resources/config-defaults.properties +++ b/platform/marmotta-core/src/main/resources/config-defaults.properties @@ -192,3 +192,13 @@ prefix.rdf = http://www.w3.org/1999/02/22-rdf-syntax-ns# prefix.skos = http://www.w3.org/2004/02/skos/core# prefix.ldp = http://www.w3.org/ns/ldp# prefix.mao = http://www.w3.org/ns/ma-ont# + + +############################################################################### +# Clustering Configuration +############################################################################### + + +# Turn on cluster-specific configuration options (e.g. replicated and distributed caching, synchronization, ...) +clustering.enabled = false +clustering.name = Marmotta \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/resources/config-descriptions.properties ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/resources/config-descriptions.properties b/platform/marmotta-core/src/main/resources/config-descriptions.properties index 213fb6c..43983c0 100644 --- a/platform/marmotta-core/src/main/resources/config-descriptions.properties +++ b/platform/marmotta-core/src/main/resources/config-descriptions.properties @@ -158,3 +158,10 @@ importer.batchsize.type = java.lang.Integer(10|0|*) statistics.enabled.description = true statistics.enabled.type = java.lang.Boolean + + +clustering.enabled.description = Turn on cluster-specific configuration options (e.g. replicated and distributed caching, synchronization, ...) +clustering.enabled.type = java.lang.Boolean + +clustering.name.description = Cluster name to use in cluster configuration (e.g. cache cluster name) +clustering.name.type = java.lang.String \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml ---------------------------------------------------------------------- diff --git a/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml b/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml new file mode 100644 index 0000000..aa5ce8c --- /dev/null +++ b/platform/marmotta-core/src/main/resources/jgroups-marmotta.xml @@ -0,0 +1,74 @@ +<config xmlns="urn:org:jgroups" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.4.xsd"> + <UDP + mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}" + mcast_port="${jgroups.udp.mcast_port:46655}" + tos="8" + ucast_recv_buf_size="20m" + ucast_send_buf_size="640k" + mcast_recv_buf_size="25m" + mcast_send_buf_size="640k" + loopback="true" + max_bundle_size="31k" + ip_ttl="${jgroups.udp.ip_ttl:2}" + enable_diagnostics="false" + bundler_type="old" + + thread_naming_pattern="pl" + + thread_pool.enabled="true" + thread_pool.min_threads="2" + thread_pool.max_threads="30" + thread_pool.keep_alive_time="60000" + thread_pool.queue_enabled="true" + thread_pool.queue_max_size="100" + thread_pool.rejection_policy="Discard" + + oob_thread_pool.enabled="true" + oob_thread_pool.min_threads="2" + oob_thread_pool.max_threads="30" + oob_thread_pool.keep_alive_time="60000" + oob_thread_pool.queue_enabled="false" + oob_thread_pool.queue_max_size="100" + oob_thread_pool.rejection_policy="Discard" + + internal_thread_pool.enabled="true" + internal_thread_pool.min_threads="1" + internal_thread_pool.max_threads="10" + internal_thread_pool.keep_alive_time="60000" + internal_thread_pool.queue_enabled="true" + internal_thread_pool.queue_max_size="100" + internal_thread_pool.rejection_policy="Discard" + /> + + <PING timeout="3000" num_initial_members="3"/> + <MERGE2 max_interval="30000" min_interval="10000"/> + + <FD_SOCK/> + <FD_ALL timeout="15000" interval="3000"/> + <VERIFY_SUSPECT timeout="1500"/> + + <pbcast.NAKACK2 + xmit_interval="1000" + xmit_table_num_rows="100" + xmit_table_msgs_per_row="10000" + xmit_table_max_compaction_time="10000" + max_msg_batch_size="100"/> + <UNICAST3 + xmit_interval="500" + xmit_table_num_rows="20" + xmit_table_msgs_per_row="10000" + xmit_table_max_compaction_time="10000" + max_msg_batch_size="100" + conn_expiry_timeout="0"/> + + <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1m"/> + <pbcast.GMS print_local_addr="false" join_timeout="3000" view_bundling="true"/> + <tom.TOA/> <!-- the TOA is only needed for total order transactions--> + + <UFC max_credits="2m" min_threshold="0.40"/> + <MFC max_credits="2m" min_threshold="0.40"/> + <FRAG2 frag_size="30k" /> + <RSVP timeout="60000" resend_interval="500" ack_on_delivery="false" /> +</config> http://git-wip-us.apache.org/repos/asf/marmotta/blob/61c14a19/platform/marmotta-user/src/main/java/org/apache/marmotta/platform/user/services/AccountServiceImpl.java ---------------------------------------------------------------------- diff --git a/platform/marmotta-user/src/main/java/org/apache/marmotta/platform/user/services/AccountServiceImpl.java b/platform/marmotta-user/src/main/java/org/apache/marmotta/platform/user/services/AccountServiceImpl.java index f0ab876..beede92 100644 --- a/platform/marmotta-user/src/main/java/org/apache/marmotta/platform/user/services/AccountServiceImpl.java +++ b/platform/marmotta-user/src/main/java/org/apache/marmotta/platform/user/services/AccountServiceImpl.java @@ -18,8 +18,6 @@ package org.apache.marmotta.platform.user.services; import com.google.common.base.Preconditions; -import net.sf.ehcache.Ehcache; -import net.sf.ehcache.Element; import org.apache.commons.lang3.StringUtils; import org.apache.marmotta.commons.sesame.model.Namespaces; import org.apache.marmotta.platform.core.api.config.ConfigurationService; @@ -32,6 +30,7 @@ import org.apache.marmotta.platform.core.qualifiers.cache.MarmottaCache; import org.apache.marmotta.platform.user.api.AccountService; import org.apache.marmotta.platform.user.model.UserAccount; import org.apache.marmotta.platform.user.model.UserAccount.PasswordHash; +import org.infinispan.Cache; import org.openrdf.model.Resource; import org.openrdf.model.URI; import org.slf4j.Logger; @@ -59,7 +58,7 @@ public class AccountServiceImpl implements AccountService { @Inject @MarmottaCache("user-cache") - private Ehcache userCache; + private Cache<String,UserAccount> userCache; private PasswordHash hashAlgo; @@ -120,8 +119,8 @@ public class AccountServiceImpl implements AccountService { for (UserAccount userAccount : list) { - userCache.put(new Element(userAccount.getLogin(), userAccount)); - userCache.put(new Element(userAccount.getWebId(), userAccount)); + userCache.put(userAccount.getLogin(), userAccount); + userCache.put(userAccount.getWebId(), userAccount); } return list; } @@ -185,7 +184,7 @@ public class AccountServiceImpl implements AccountService { if (StringUtils.isBlank(login)) return null; UserAccount account = null; if (userCache != null && userCache.get(login) != null) { - account = (UserAccount) userCache.get(login).getObjectValue(); + account = (UserAccount) userCache.get(login); } else { if (configurationService.isConfigurationSet("user."+login+".webid")) { account = new UserAccount(); @@ -195,8 +194,8 @@ public class AccountServiceImpl implements AccountService { account.setRoles(new HashSet<String>(configurationService.getListConfiguration("user."+login+".roles"))); account.setWebId(configurationService.getStringConfiguration("user."+login+".webid")); - userCache.put(new Element(account.getLogin(), account)); - userCache.put(new Element(account.getWebId(), account)); + userCache.put(account.getLogin(), account); + userCache.put(account.getWebId(), account); } else { log.info("UserAccount {} not found", login); } @@ -210,7 +209,7 @@ public class AccountServiceImpl implements AccountService { UserAccount account = null; if (userCache != null && userCache.get(resource) != null) { - account = (UserAccount) userCache.get(resource).getObjectValue(); + account = userCache.get(resource); } else { for(UserAccount a : listAccounts()) { if(a.getWebId().equals(resource.stringValue())) { @@ -219,8 +218,8 @@ public class AccountServiceImpl implements AccountService { } } if (account != null) { - userCache.put(new Element(account.getLogin(), account)); - userCache.put(new Element(account.getWebId(), account)); + userCache.put(account.getLogin(), account); + userCache.put(account.getWebId(), account); } else { log.warn("UserAccount {} not found", resource); }
