Github user cestella commented on a diff in the pull request: https://github.com/apache/incubator-metron/pull/505#discussion_r109441372 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java --- @@ -74,17 +91,43 @@ public BulkWriterResponse write(String sourceType ) throws Exception { BulkWriterResponse response = new BulkWriterResponse(); - SourceHandler handler = getSourceHandler(configurations.getIndex(sourceType)); + // Currently treating all the messages in a group for pass/failure. try { - handler.handle(messages); - } catch(Exception e) { + // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through + for(JSONObject message : messages) { + Map<String, Object> val = configurations.getSensorConfig(sourceType); + String path = getHdfsPathExtension( + sourceType, + (String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""), + message + ); + SourceHandler handler = getSourceHandler(sourceType, path); + handler.handle(message); + } + } catch (Exception e) { response.addAllErrors(e, tuples); } response.addAllSuccesses(tuples); return response; } + public String getHdfsPathExtension(String sourceType, String stellarFunction, JSONObject message) { + // If no function is provided, just use the sourceType directly + if(stellarFunction == null || stellarFunction.trim().isEmpty()) { + return sourceType; + } + + StellarCompiler.Expression expression = sourceTypeExpressionMap.computeIfAbsent(stellarFunction, s -> stellarProcessor.compile(stellarFunction)); + VariableResolver resolver = new MapVariableResolver(message); --- End diff -- Yeah, it's a good concern. We *do* actually have a cache in the `StellarProcessor` so that compilations happen once and are cached afterwards. As long as `StellarProcessor` is a transient member variable, I think you're good to do what I suggested.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---