[ 
https://issues.apache.org/jira/browse/METRON-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638456#comment-16638456
 ] 

ASF GitHub Bot commented on METRON-1681:
----------------------------------------

Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1213#discussion_r222736631
  
    --- Diff: 
metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 ---
    @@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
         }
       }
     
    -  protected void initializeStellar() {
    -    Context.Builder builder = new Context.Builder()
    -                                
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
    -                                .with(Context.Capabilities.GLOBAL_CONFIG, 
() -> getConfigurations().getGlobalConfig())
    -                                .with(Context.Capabilities.STELLAR_CONFIG, 
() -> getConfigurations().getGlobalConfig())
    -                                ;
    -    if(cache != null) {
    -      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
    -    }
    -    this.stellarContext = builder.build();
    -    StellarFunctions.initialize(stellarContext);
    -  }
    -
     
       @SuppressWarnings("unchecked")
       @Override
       public void execute(Tuple tuple) {
         if (TupleUtils.isTick(tuple)) {
    -      try {
    -        for (Entry<String, ParserComponents> entry : 
sensorToComponentMap.entrySet()) {
    -          entry.getValue().getWriter().flush(getConfigurations(), 
messageGetStrategy);
    -        }
    -      } catch (Exception e) {
    -        throw new RuntimeException(
    -            "This should have been caught in the writerHandler.  If you 
see this, file a JIRA", e);
    -      } finally {
    -        collector.ack(tuple);
    -      }
    +      handleTickTuple(tuple);
           return;
         }
    -
    +    numWritten = 0;
         byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
    +    String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
    +    String sensorType = topicToSensorMap.get(topic);
         try {
    -      SensorParserConfig sensorParserConfig;
    -      MessageParser<JSONObject> parser;
    -      String sensor;
    -      Map<String, Object> metadata;
    -      if (sensorToComponentMap.size() == 1) {
    -        // There's only one parser, so grab info directly
    -        Entry<String, ParserComponents> sensorParser = 
sensorToComponentMap.entrySet().iterator()
    -            .next();
    -        sensor = sensorParser.getKey();
    -        parser = sensorParser.getValue().getMessageParser();
    -        sensorParserConfig = getSensorParserConfig(sensor);
    -      } else {
    -        // There's multiple parsers, so pull the topic from the Tuple and 
look up the sensor
    -        String topic = 
tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
    -        sensor = topicToSensorMap.get(topic);
    -        parser = sensorToComponentMap.get(sensor).getMessageParser();
    -        sensorParserConfig = getSensorParserConfig(sensor);
    -      }
    +      ParserConfigurations parserConfigurations = getConfigurations();
    +      SensorParserConfig sensorParserConfig = 
parserConfigurations.getSensorParserConfig(sensorType);
    +      RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( 
sensorParserConfig.getRawMessageStrategy()
    +              , tuple
    +              , originalMessage
    +              , sensorParserConfig.getReadMetadata()
    +              , sensorParserConfig.getRawMessageStrategyConfig()
    +      );
    +      parserRunner.setOnSuccess(parserResult -> onSuccess(parserResult, 
tuple));
    --- End diff --
    
    Ah, that's right. This answers my earlier comment about setsuccess/error.


> Decouple the ParserBolt from the Parse execution logic
> ------------------------------------------------------
>
>                 Key: METRON-1681
>                 URL: https://issues.apache.org/jira/browse/METRON-1681
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Justin Leet
>            Priority: Major
>
> Per discussion on https://github.com/apache/metron/pull/1099, there are 
> concerns about the ParserBolt needed some refactoring.  The discussion didn't 
> hold the PR up, but it was generally agreed that we should decouple some of 
> the initialization and execution logic.
> This also aids us in integrating with other systems such as NiFi or Spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to