Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1022#discussion_r190730655
  
    --- Diff: 
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java
 ---
    @@ -0,0 +1,155 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.writer;
    +
    +import com.github.benmanes.caffeine.cache.Cache;
    +import com.github.benmanes.caffeine.cache.Caffeine;
    +import org.apache.commons.lang.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.commons.lang3.exception.ExceptionUtils;
    +import org.apache.metron.common.configuration.writer.WriterConfiguration;
    +import org.apache.metron.common.field.DeDotFieldNameConverter;
    +import org.apache.metron.common.field.FieldNameConverter;
    +import org.apache.metron.common.field.FieldNameConverters;
    +import org.apache.metron.common.field.NoopFieldNameConverter;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.lang.invoke.MethodHandles;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A {@link FieldNameConverterFactory} that is backed by a cache.
    + *
    + * <p>Each sensor type can use a different {@link FieldNameConverter} 
implementation.
    + *
    + * <p>The {@link WriterConfiguration} allows a user to define the {@link 
FieldNameConverter}
    + * that should be used for a given sensor type.
    + *
    + * <p>The {@link FieldNameConverter}s are maintained in a cache for a 
fixed period of time
    + * after they are created.  Once they expire, the {@link 
WriterConfiguration} is used to
    + * reload the {@link FieldNameConverter}.
    + *
    + * <p>The user can change the {@link FieldNameConverter} in use at 
runtime. A change
    + * to this configuration is recognized once the old {@link 
FieldNameConverter} expires
    + * from the cache.
    + *
    + * <p>Defining a shorter expiration interval allows config changes to be 
recognized more
    + * quickly, but also can impact performance negatively.
    + */
    +public class CachedFieldNameConverterFactory implements 
FieldNameConverterFactory {
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +
    +  /**
    +   * A cache that contains a {@link FieldNameConverter} for each sensor 
type.
    +   *
    +   * A user can alter the {@link FieldNameConverter} for a given sensor at 
any time
    +   * by altering the Indexing configuration.  The actual {@link 
FieldNameConverter}
    +   * in use for a given sensor will only change once the original 
converter has
    +   * expired from the cache.
    +   */
    +  private Cache<String, FieldNameConverter> fieldNameConverters;
    +
    +  /**
    +   * Creates a {@link CachedFieldNameConverterFactory}.
    +   *
    +   * @param expires The duration before {@link FieldNameConverter}s are 
expired.
    +   * @param expiresUnits The units before {@link FieldNameConverter}s are 
expired.
    +   */
    +  public CachedFieldNameConverterFactory(int expires, TimeUnit 
expiresUnits) {
    +
    +    fieldNameConverters = createFieldNameConverterCache(expires, 
expiresUnits);
    +  }
    +
    +  /**
    +   * Creates a {@link CachedFieldNameConverterFactory} where the cache 
expires after 5 minutes.
    +   */
    +  public CachedFieldNameConverterFactory() {
    +
    +    this(5, TimeUnit.MINUTES);
    +  }
    +
    +  /**
    +   * Creates a {@link CachedFieldNameConverterFactory} using the given 
cache.  This should only
    +   * be used for testing.
    +   *
    +   * @param fieldNameConverters A {@link Cache} containing {@link 
FieldNameConverter}s.
    +   */
    +  public CachedFieldNameConverterFactory(Cache<String, FieldNameConverter> 
fieldNameConverters) {
    +
    +    this.fieldNameConverters = fieldNameConverters;
    +  }
    +
    +  /**
    +   * Creates a cache of {@link FieldNameConverter}s, one for each source 
type.
    +   *
    +   * @return A cache of {@link FieldNameConverter}s.
    +   */
    +  private Cache<String, FieldNameConverter> 
createFieldNameConverterCache(int expire, TimeUnit expireUnits) {
    +
    +    return Caffeine
    +            .newBuilder()
    +            .expireAfterWrite(expire, expireUnits)
    +            .build();
    +  }
    +
    +  /**
    +   * Create a new {@link FieldNameConverter}.
    +   *
    +   * @param sensorType The type of sensor.
    +   * @param config The writer configuration.
    +   * @return
    +   */
    +  @Override
    +  public FieldNameConverter create(String sensorType, WriterConfiguration 
config) {
    +
    +    return fieldNameConverters.get(sensorType, (s) -> 
createInstance(sensorType, config));
    +  }
    +
    +  /**
    +   * Create a new {@link FieldNameConverter}.
    +   *
    +   * @param sensorType The type of sensor.
    +   * @param config The writer configuration.
    +   * @return
    +   */
    +  private FieldNameConverter createInstance(String sensorType, 
WriterConfiguration config) {
    +
    +    // default to the 'DEDOT' field name converter to maintain backwards 
compatibility
    --- End diff --
    
    This looks interesting, but one bit of functionality has changed as part of 
doing this PR.  Currently, we specify the field converter at the writer 
implementation level, so:
    * ES uses dedot
    * HDFS uses noop
    * solr uses noop
    
    By doing this, we're actually not maintaining backwards compatibility, 
we're changing the behavior for the HDFS writer and Solr to dedot.  What I'd 
suggest doing is adding a method to the `BulkMessageWriter` interface like so:
    ```
    default FieldNameConverter defaultFieldNameConverter() {
      return FieldNameConverters.NOOP.get();
    }
    ```
    and in ElasticsearchWriter specify DEDOT as the default.  Also, here, you 
probably want to pass in the default field name converter if unspecified as a 
3rd argument.
    
    This would allow us to maintain backwards compatibility and enable users to 
override.


---

Reply via email to