Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/505#discussion_r109438625
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
 ---
    @@ -74,17 +91,43 @@ public BulkWriterResponse write(String sourceType
                        ) throws Exception
       {
         BulkWriterResponse response = new BulkWriterResponse();
    -    SourceHandler handler = 
getSourceHandler(configurations.getIndex(sourceType));
    +    // Currently treating all the messages in a group for pass/failure.
         try {
    -      handler.handle(messages);
    -    } catch(Exception e) {
    +      // Messages can all result in different HDFS paths, because of 
Stellar Expressions, so we'll need to iterate through
    +      for(JSONObject message : messages) {
    +        Map<String, Object> val = 
configurations.getSensorConfig(sourceType);
    +        String path = getHdfsPathExtension(
    +                sourceType,
    +                
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
 ""),
    +                message
    +        );
    +        SourceHandler handler = getSourceHandler(sourceType, path);
    +        handler.handle(message);
    +      }
    +    } catch (Exception e) {
           response.addAllErrors(e, tuples);
         }
     
         response.addAllSuccesses(tuples);
         return response;
       }
     
    +  public String getHdfsPathExtension(String sourceType, String 
stellarFunction, JSONObject message) {
    +    // If no function is provided, just use the sourceType directly
    +    if(stellarFunction == null || stellarFunction.trim().isEmpty()) {
    +      return sourceType;
    +    }
    +
    +    StellarCompiler.Expression expression = 
sourceTypeExpressionMap.computeIfAbsent(stellarFunction, s -> 
stellarProcessor.compile(stellarFunction));
    +    VariableResolver resolver = new MapVariableResolver(message);
    --- End diff --
    
    @cestella I'm mostly concerned about the performance of function compile on 
every single message that comes through indexing.
    
    If we keep the current approach, I would be interested in if there's a way 
to make things a little cleaner.
    
    In retrospect, I think this should be an LRU cache, so that we don't keep 
around a given parse forever. Any thoughts on that, assuming performance would 
be enough of a concern to not just use your proposal?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to