[ 
https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494322#comment-16494322
 ] 

ASF GitHub Bot commented on METRON-1584:
----------------------------------------

Github user cestella commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1036#discussion_r191585015
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
 ---
    @@ -213,60 +214,144 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
       @SuppressWarnings("unchecked")
       @Override
       public void execute(Tuple tuple) {
    +
         if (isTick(tuple)) {
    -      try {
    -        if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
    -          //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
    -          LOG.debug("Flushing message queues older than their 
batchTimeouts");
    -          getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
    -                  new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
    -                  , messageGetStrategy);
    -        }
    -      }
    -      catch(Exception e) {
    -        throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
    -      }
    -      finally {
    -        collector.ack(tuple);
    -      }
    -      return;
    +      handleTick(tuple);
    +
    +    } else {
    +      handleMessage(tuple);
         }
    +  }
    +
    +  /**
    +   * Handle a tuple containing a message; anything other than a tick tuple.
    +   *
    +   * @param tuple The tuple containing a message.
    +   */
    +  private void handleMessage(Tuple tuple) {
    +    try {
    +
    +      JSONObject message = getMessage(tuple);
    +      if(message == null) {
    +        handleMissingMessage(tuple);
    +        return;
    +      }
     
    -    try
    -    {
    -      JSONObject message = (JSONObject) messageGetStrategy.get(tuple);
           String sensorType = MessageUtils.getSensorType(message);
    -      LOG.trace("Writing enrichment message: {}", message);
    -      WriterConfiguration writerConfiguration = 
configurationTransformation.apply(
    -              new IndexingWriterConfiguration(bulkMessageWriter.getName(), 
getConfigurations()));
           if(sensorType == null) {
    -        //sensor type somehow ended up being null.  We want to error this 
message directly.
    -        getWriterComponent().error("null"
    -                             , new Exception("Sensor type is not specified 
for message "
    -                                            + message.toJSONString()
    -                                            )
    -                             , ImmutableList.of(tuple)
    -                             , messageGetStrategy
    -                             );
    -      }
    -      else {
    -        if (writerConfiguration.isDefault(sensorType)) {
    -          //want to warn, but not fail the tuple
    -          collector.reportError(new Exception("WARNING: Default and 
(likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " 
writer and sensor " + sensorType));
    -        }
    -
    -        getWriterComponent().write(sensorType
    -                , tuple
    -                , message
    -                , bulkMessageWriter
    -                , writerConfiguration
    -                , messageGetStrategy
    -        );
    +        handleMissingSensorType(tuple, message);
    +        return;
           }
    +
    +      writeMessage(tuple, message, sensorType);
    +
    +    } catch (Exception e) {
    +      throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
         }
    -    catch(Exception e) {
    +  }
    +
    +  /**
    +   * Handles a tick tuple.
    +   *
    +   * @param tickTuple The tick tuple.
    +   */
    +  private void handleTick(Tuple tickTuple) {
    +
    +    try {
    +      if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
    +        //WriterToBulkWriter doesn't allow batching, so no need to flush 
on Tick.
    +        LOG.debug("Flushing message queues older than their 
batchTimeouts");
    +        getWriterComponent().flushTimeouts(bulkMessageWriter, 
configurationTransformation.apply(
    +                new 
IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
    +                , messageGetStrategy);
    +      }
    +
    +    } catch(Exception e) {
           throw new RuntimeException("This should have been caught in the 
writerComponent.  If you see this, file a JIRA", e);
    +
    +    } finally {
    +      collector.ack(tickTuple);
    +    }
    +  }
    +
    +  /**
    +   * Retrieves the JSON message contained in a tuple.
    +   *
    +   * @param tuple The tuple containing a JSON message.
    +   * @return The JSON message contained in the tuple. If none, returns 
null.
    +   */
    +  private JSONObject getMessage(Tuple tuple) {
    +
    --- End diff --
    
    can we clean up some of the newlines in this method while we're here?


> Indexing Topology Crashes with Invalid Message
> ----------------------------------------------
>
>                 Key: METRON-1584
>                 URL: https://issues.apache.org/jira/browse/METRON-1584
>             Project: Metron
>          Issue Type: Bug
>            Reporter: Nick Allen
>            Assignee: Nick Allen
>            Priority: Major
>
> Per Mohan Venkateshaiah:
> I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see 
> that the random access indexing topology worker thread died and couldn't 
> recover until the kafka topic was deleted and recreated.
> {code:java}
> Caused by: java.lang.IllegalStateException: Unable to parse 
> adkadknalkda;LK;ad;Da;dD; due to null
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> Caused by: org.json.simple.parser.ParseException
>  at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) 
> ~[stormjar.jar:?]
>  at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) 
> ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25)
>  ~[stormjar.jar:?]
>  at 
> org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237)
>  ~[stormjar.jar:?]
>  at 
> org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472)
>  ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  ... 6 more
> 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] 
> [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>  at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>  at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276)
>  [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) 
> [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker 
> random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700
> 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging 
> context
> 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down 
> executors
> 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 
> 2]-send-queue [INFO] Async loop interrupted!
> 2018-05-24 09:21:22.239 o.a.s.util 
> Thread-7-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink-executor[2
>  2] [INFO] Async loop interrupted!
> 2018-05-24 09:21:22.240 o.a.h.m.s.s.StormTimelineMetricsSink Thread-20 [INFO] 
> Stopping Storm Metrics Sink
> 2018-05-24 09:21:22.240 o.a.s.d.executor Thread-20 [INFO] Shut down executor 
> __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2]
> 2018-05-24 09:21:22.241 o.a.s.d.executor Thread-20 [INFO] Shutting down 
> executor indexingBolt:[3 3] 
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to