Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/940#discussion_r172711543 --- Diff: metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java --- @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.enrichment.parallel; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.enrichment.handler.ConfigHandler; +import org.apache.metron.common.performance.PerformanceLogger; +import org.apache.metron.common.utils.MessageUtils; +import org.apache.metron.enrichment.bolt.CacheKey; +import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.enrichment.utils.EnrichmentUtils; +import org.json.simple.JSONObject; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; + +/** + * This is an independent component which will accept a message and a set of enrichment adapters as well as a config which defines + * how those enrichments should be performed and fully enrich the message. The result will be the enriched message + * unified together and a list of errors which happened. + */ +public class ParallelEnricher { + + private Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType = new HashMap<>(); + private EnumMap<EnrichmentStrategies, CacheStats> cacheStats = new EnumMap<>(EnrichmentStrategies.class); + + /** + * The result of an enrichment. + */ + public static class EnrichmentResult { + private JSONObject result; + private List<Map.Entry<Object, Throwable>> enrichmentErrors; + + public EnrichmentResult(JSONObject result, List<Map.Entry<Object, Throwable>> enrichmentErrors) { + this.result = result; + this.enrichmentErrors = enrichmentErrors; + } + + /** + * The unified fully enriched result. + * @return + */ + public JSONObject getResult() { + return result; + } + + /** + * The errors that happened in the course of enriching. + * @return + */ + public List<Map.Entry<Object, Throwable>> getEnrichmentErrors() { + return enrichmentErrors; + } + } + + private ConcurrencyContext concurrencyContext; + + /** + * Construct a parallel enricher with a set of enrichment adapters associated with their enrichment types. + * @param enrichmentsByType + */ + public ParallelEnricher( Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType + , ConcurrencyContext concurrencyContext + , boolean logStats + ) + { + this.enrichmentsByType = enrichmentsByType; + this.concurrencyContext = concurrencyContext; + if(logStats) { + for(EnrichmentStrategies s : EnrichmentStrategies.values()) { + cacheStats.put(s, null); + } + } + } + + /** + * Fully enriches a message. Each enrichment is done in parallel via a threadpool. + * Each enrichment is fronted with a LRU cache. + * + * @param message the message to enrich + * @param strategy The enrichment strategy to use (e.g. enrichment or threat intel) + * @param config The sensor enrichment config + * @param perfLog The performance logger. We log the performance for this call, the split portion and the enrichment portion. + * @return the enrichment result + */ + public EnrichmentResult apply( JSONObject message + , EnrichmentStrategies strategy + , SensorEnrichmentConfig config + , PerformanceLogger perfLog + ) throws ExecutionException, InterruptedException { + if(message == null) { + return null; + } + if(perfLog != null) { + perfLog.mark("execute"); + if(perfLog.isDebugEnabled() && !cacheStats.isEmpty()) { + CacheStats before = cacheStats.get(strategy); + CacheStats after = concurrencyContext.getCache().stats(); + if(before != null && after != null) { + CacheStats delta = after.minus(before); + perfLog.log("cache", delta.toString()); + } + cacheStats.put(strategy, after); + } + } + String sensorType = MessageUtils.getSensorType(message); + message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis()); --- End diff -- Well, split time and enrichment time as enrichment + join is really bound up together. I know that some people are using these statistics and I wanted to match as much as it made sense the old topology. If you think they aren't useful at all, though, I can certainly take them out or just replace them with an overall time.
---