Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/1022#discussion_r190730655 --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java --- @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.writer; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.field.DeDotFieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; +import org.apache.metron.common.field.NoopFieldNameConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +/** + * A {@link FieldNameConverterFactory} that is backed by a cache. + * + * <p>Each sensor type can use a different {@link FieldNameConverter} implementation. + * + * <p>The {@link WriterConfiguration} allows a user to define the {@link FieldNameConverter} + * that should be used for a given sensor type. + * + * <p>The {@link FieldNameConverter}s are maintained in a cache for a fixed period of time + * after they are created. Once they expire, the {@link WriterConfiguration} is used to + * reload the {@link FieldNameConverter}. + * + * <p>The user can change the {@link FieldNameConverter} in use at runtime. A change + * to this configuration is recognized once the old {@link FieldNameConverter} expires + * from the cache. + * + * <p>Defining a shorter expiration interval allows config changes to be recognized more + * quickly, but also can impact performance negatively. + */ +public class CachedFieldNameConverterFactory implements FieldNameConverterFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * A cache that contains a {@link FieldNameConverter} for each sensor type. + * + * A user can alter the {@link FieldNameConverter} for a given sensor at any time + * by altering the Indexing configuration. The actual {@link FieldNameConverter} + * in use for a given sensor will only change once the original converter has + * expired from the cache. + */ + private Cache<String, FieldNameConverter> fieldNameConverters; + + /** + * Creates a {@link CachedFieldNameConverterFactory}. + * + * @param expires The duration before {@link FieldNameConverter}s are expired. + * @param expiresUnits The units before {@link FieldNameConverter}s are expired. + */ + public CachedFieldNameConverterFactory(int expires, TimeUnit expiresUnits) { + + fieldNameConverters = createFieldNameConverterCache(expires, expiresUnits); + } + + /** + * Creates a {@link CachedFieldNameConverterFactory} where the cache expires after 5 minutes. + */ + public CachedFieldNameConverterFactory() { + + this(5, TimeUnit.MINUTES); + } + + /** + * Creates a {@link CachedFieldNameConverterFactory} using the given cache. This should only + * be used for testing. + * + * @param fieldNameConverters A {@link Cache} containing {@link FieldNameConverter}s. + */ + public CachedFieldNameConverterFactory(Cache<String, FieldNameConverter> fieldNameConverters) { + + this.fieldNameConverters = fieldNameConverters; + } + + /** + * Creates a cache of {@link FieldNameConverter}s, one for each source type. + * + * @return A cache of {@link FieldNameConverter}s. + */ + private Cache<String, FieldNameConverter> createFieldNameConverterCache(int expire, TimeUnit expireUnits) { + + return Caffeine + .newBuilder() + .expireAfterWrite(expire, expireUnits) + .build(); + } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + @Override + public FieldNameConverter create(String sensorType, WriterConfiguration config) { + + return fieldNameConverters.get(sensorType, (s) -> createInstance(sensorType, config)); + } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + private FieldNameConverter createInstance(String sensorType, WriterConfiguration config) { + + // default to the 'DEDOT' field name converter to maintain backwards compatibility --- End diff -- This looks interesting, but one bit of functionality has changed as part of doing this PR. Currently, we specify the field converter at the writer implementation level, so: * ES uses dedot * HDFS uses noop * solr uses noop By doing this, we're actually not maintaining backwards compatibility, we're changing the behavior for the HDFS writer and Solr to dedot. What I'd suggest doing is adding a method to the `BulkMessageWriter` interface like so: ``` default FieldNameConverter defaultFieldNameConverter() { return FieldNameConverters.NOOP.get(); } ``` and in ElasticsearchWriter specify DEDOT as the default. Also, here, you probably want to pass in the default field name converter if unspecified as a 3rd argument. This would allow us to maintain backwards compatibility and enable users to override.
---