Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1213#discussion_r222736631 --- Diff: metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java --- @@ -242,169 +226,81 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll } } - protected void initializeStellar() { - Context.Builder builder = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig()) - .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig()) - ; - if(cache != null) { - builder = builder.with(Context.Capabilities.CACHE, () -> cache); - } - this.stellarContext = builder.build(); - StellarFunctions.initialize(stellarContext); - } - @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { if (TupleUtils.isTick(tuple)) { - try { - for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) { - entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy); - } - } catch (Exception e) { - throw new RuntimeException( - "This should have been caught in the writerHandler. If you see this, file a JIRA", e); - } finally { - collector.ack(tuple); - } + handleTickTuple(tuple); return; } - + numWritten = 0; byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple); + String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); + String sensorType = topicToSensorMap.get(topic); try { - SensorParserConfig sensorParserConfig; - MessageParser<JSONObject> parser; - String sensor; - Map<String, Object> metadata; - if (sensorToComponentMap.size() == 1) { - // There's only one parser, so grab info directly - Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator() - .next(); - sensor = sensorParser.getKey(); - parser = sensorParser.getValue().getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } else { - // There's multiple parsers, so pull the topic from the Tuple and look up the sensor - String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName()); - sensor = topicToSensorMap.get(topic); - parser = sensorToComponentMap.get(sensor).getMessageParser(); - sensorParserConfig = getSensorParserConfig(sensor); - } + ParserConfigurations parserConfigurations = getConfigurations(); + SensorParserConfig sensorParserConfig = parserConfigurations.getSensorParserConfig(sensorType); + RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy() + , tuple + , originalMessage + , sensorParserConfig.getReadMetadata() + , sensorParserConfig.getRawMessageStrategyConfig() + ); + parserRunner.setOnSuccess(parserResult -> onSuccess(parserResult, tuple)); --- End diff -- Ah, that's right. This answers my earlier comment about setsuccess/error.
---