METRON-1467: Replace guava caches in places where the keyspace might be large closes apache/metron#947
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/abb152b8 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/abb152b8 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/abb152b8 Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: abb152b83631001ad067849dfaefd2d4e3b0cdb4 Parents: 9fb0d06 Author: cstella <ceste...@gmail.com> Authored: Wed Mar 7 11:20:56 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Wed Mar 7 11:20:56 2018 -0500 ---------------------------------------------------------------------- metron-interface/metron-rest/pom.xml | 5 +++ metron-platform/metron-enrichment/pom.xml | 2 +- .../enrichment/bolt/GenericEnrichmentBolt.java | 19 ++++------- .../apache/metron/enrichment/bolt/JoinBolt.java | 34 +++++++++----------- .../bolt/GenericEnrichmentBoltTest.java | 2 +- .../metron/enrichment/bolt/JoinBoltTest.java | 7 ++-- metron-stellar/stellar-common/pom.xml | 5 +++ .../stellar/common/BaseStellarProcessor.java | 31 +++++++----------- .../stellar/dsl/functions/DateFunctions.java | 8 ++--- pom.xml | 1 + 10 files changed, 56 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-interface/metron-rest/pom.xml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml index 44bad97..dcdea2b 100644 --- a/metron-interface/metron-rest/pom.xml +++ b/metron-interface/metron-rest/pom.xml @@ -38,6 +38,11 @@ <eclipse.link.version>2.6.4</eclipse.link.version> </properties> <dependencies> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${global_caffeine_version}</version> + </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml index bcfb41b..1dffd8b 100644 --- a/metron-platform/metron-enrichment/pom.xml +++ b/metron-platform/metron-enrichment/pom.xml @@ -70,7 +70,7 @@ <dependency> <groupId>com.github.ben-manes.caffeine</groupId> <artifactId>caffeine</artifactId> - <version>2.6.2</version> + <version>${global_caffeine_version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 7d67d2d..0677453 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -18,13 +18,13 @@ package org.apache.metron.enrichment.bolt; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import java.util.HashSet; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; + +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; import org.apache.metron.common.configuration.ConfigurationType; @@ -146,13 +146,8 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { throw new IllegalStateException("MAX_TIME_RETAIN_MINUTES must be specified"); if (this.adapter == null) throw new IllegalStateException("Adapter must be specified"); - loader = new CacheLoader<CacheKey, JSONObject>() { - @Override - public JSONObject load(CacheKey key) throws Exception { - return adapter.enrich(key); - } - }; - cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize) + loader = key -> adapter.enrich(key); + cache = Caffeine.newBuilder().maximumSize(maxCacheSize) .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) .build(loader); boolean success = adapter.initializeAdapter(getConfigurations().getGlobalConfig()); @@ -228,7 +223,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { subGroup = adapter.getStreamSubGroup(enrichmentType, field); perfLog.mark("enrich"); - enrichedField = cache.getUnchecked(cacheKey); + enrichedField = cache.get(cacheKey); perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType); if (enrichedField == null) http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index 61d7c32..a9263fb 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -17,13 +17,12 @@ */ package org.apache.metron.enrichment.bolt; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; import com.google.common.base.Joiner; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalCause; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; import com.google.common.collect.Sets; import java.lang.invoke.MethodHandles; import java.util.HashMap; @@ -46,6 +45,9 @@ import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { public static class Perf {} // used for performance logging @@ -89,29 +91,25 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { if (this.maxTimeRetain == null) { throw new IllegalStateException("maxTimeRetain must be specified"); } - loader = new CacheLoader<String, Map<String, Tuple>>() { - @Override - public Map<String, Tuple> load(String key) throws Exception { - return new HashMap<>(); - } - }; - cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize) - .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES).removalListener(new JoinRemoveListener()) - .build(loader); + loader = s -> new HashMap<>(); + cache = Caffeine.newBuilder().maximumSize(maxCacheSize) + .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) + .removalListener(new JoinRemoveListener()) + .build(loader); prepare(map, topologyContext); } class JoinRemoveListener implements RemovalListener<String, Map<String, Tuple>> { @Override - public void onRemoval(RemovalNotification<String, Map<String, Tuple>> removalNotification) { - if (removalNotification.getCause() == RemovalCause.SIZE) { + public void onRemoval(@Nullable String s, @Nullable Map<String, Tuple> stringTupleMap, @Nonnull RemovalCause removalCause) { + if (removalCause == RemovalCause.SIZE) { String errorMessage = "Join cache reached max size limit. Increase the maxCacheSize setting or add more tasks to enrichment/threatintel join bolt."; Exception exception = new Exception(errorMessage); LOG.error(errorMessage, exception); collector.reportError(exception); } - if (removalNotification.getCause() == RemovalCause.EXPIRED) { + if (removalCause == RemovalCause.EXPIRED) { String errorMessage = "Message was in the join cache too long which may be caused by slow enrichments/threatintels. Increase the maxTimeRetain setting."; Exception exception = new Exception(errorMessage); LOG.error(errorMessage, exception); http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java index d7b54dd..17a53f4 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java @@ -240,7 +240,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest { put("field2", "value2"); put("source.type", "test"); }}) - .withThrowable(new CacheLoader.InvalidCacheLoadException("CacheLoader returned null for key CacheKey{field='field1', value='value1'}.")); + .withThrowable(new Exception("[Metron] Could not enrich string: value1")); verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject()))); } } http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java index 1bb1083..0da6eaa 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java @@ -17,7 +17,7 @@ */ package org.apache.metron.enrichment.bolt; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; import org.apache.metron.common.error.MetronError; @@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -176,10 +177,10 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest { when(tuple.getValueByField("key")).thenReturn(key); when(tuple.getValueByField("message")).thenReturn(new JSONObject()); joinBolt.cache = mock(LoadingCache.class); - when(joinBolt.cache.get(key)).thenThrow(new ExecutionException(new Exception("join exception"))); + when(joinBolt.cache.get(any())).thenThrow(new RuntimeException(new Exception("join exception"))); joinBolt.execute(tuple); - ExecutionException expectedExecutionException = new ExecutionException(new Exception("join exception")); + RuntimeException expectedExecutionException = new RuntimeException(new Exception("join exception")); MetronError error = new MetronError() .withErrorType(Constants.ErrorType.ENRICHMENT_ERROR) .withMessage("Joining problem: {}") http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml index 6b07e68..dc4eb90 100644 --- a/metron-stellar/stellar-common/pom.xml +++ b/metron-stellar/stellar-common/pom.xml @@ -30,6 +30,11 @@ </properties> <dependencies> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${global_caffeine_version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>${global_hadoop_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java index 922feb7..941c66d 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/BaseStellarProcessor.java @@ -18,16 +18,14 @@ package org.apache.metron.stellar.common; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.util.concurrent.UncheckedExecutionException; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; import org.antlr.v4.runtime.ANTLRInputStream; import org.antlr.v4.runtime.CommonTokenStream; import org.antlr.v4.runtime.TokenStream; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.metron.stellar.dsl.Context; @@ -95,16 +93,11 @@ public class BaseStellarProcessor<T> { , int expiryTime , TimeUnit expiryUnit ) { - CacheLoader<String, StellarCompiler.Expression> loader = new CacheLoader<String, StellarCompiler.Expression>() { - @Override - public StellarCompiler.Expression load(String key) throws Exception { - return compile(key); - } - }; - return CacheBuilder.newBuilder() - .maximumSize(cacheSize) - .expireAfterAccess(expiryTime, expiryUnit) - .build(loader); + CacheLoader<String, StellarCompiler.Expression> loader = key -> compile(key); + return Caffeine.newBuilder() + .maximumSize(cacheSize) + .expireAfterAccess(expiryTime, expiryUnit) + .build(loader); } /** @@ -119,8 +112,8 @@ public class BaseStellarProcessor<T> { } StellarCompiler.Expression expression = null; try { - expression = expressionCache.get(rule, () -> compile(rule)); - } catch (ExecutionException e) { + expression = expressionCache.get(rule, r -> compile(r)); + } catch (Throwable e) { throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e); } return expression.variablesUsed; @@ -143,8 +136,8 @@ public class BaseStellarProcessor<T> { context.setActivityType(ActivityType.PARSE_ACTIVITY); } try { - expression = expressionCache.get(rule, () -> compile(rule)); - } catch (ExecutionException|UncheckedExecutionException e) { + expression = expressionCache.get(rule, r -> compile(r)); + } catch (Throwable e) { throw new ParseException("Unable to parse: " + rule + " due to: " + e.getMessage(), e); } try { http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java index 6031b6c..212d6e9 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DateFunctions.java @@ -18,9 +18,9 @@ package org.apache.metron.stellar.dsl.functions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; import org.apache.metron.stellar.dsl.BaseStellarFunction; import org.apache.metron.stellar.dsl.Stellar; import org.apache.metron.stellar.common.utils.ConversionUtils; @@ -77,7 +77,7 @@ public class DateFunctions { } private static LoadingCache<TimezonedFormat, ThreadLocal<SimpleDateFormat>> formatCache = - CacheBuilder.newBuilder().build( + Caffeine.newBuilder().build( new CacheLoader<TimezonedFormat, ThreadLocal<SimpleDateFormat>>() { @Override public ThreadLocal<SimpleDateFormat> load(final TimezonedFormat format) throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/abb152b8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 98c942a..e1049dc 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ <base_flume_version>1.5.2</base_flume_version> <!-- full dependency versions --> <global_accumulo_version>1.8.0</global_accumulo_version> + <global_caffeine_version>2.6.2</global_caffeine_version> <global_antlr_version>4.5</global_antlr_version> <global_opencsv_version>3.7</global_opencsv_version> <global_curator_version>2.7.1</global_curator_version>