[ https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494320#comment-16494320 ]
ASF GitHub Bot commented on METRON-1584: ---------------------------------------- Github user cestella commented on a diff in the pull request: https://github.com/apache/metron/pull/1036#discussion_r191584626 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java --- @@ -115,15 +116,37 @@ public void commit(BulkWriterResponse response) { } public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) { + + if(!Iterables.isEmpty(tuples)) { + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); + } tuples.forEach(t -> collector.ack(t)); MetronError error = new MetronError() .withSensorType(sensorType) .withErrorType(Constants.ErrorType.INDEXING_ERROR) .withThrowable(e); + tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + ErrorUtils.handleError(collector, error); + } + + /** + * Error a set of tuples that may not contain a valid message. + * + * <p>Without a valid message, the source type is unknown. + * <p>Without a valid message, the JSON message cannot be added to the error. + * + * @param e The exception that occurred. + * @param tuples The tuples to error that may not contain valid messages. + */ + public void error(Throwable e, Iterable<Tuple> tuples) { + if(!Iterables.isEmpty(tuples)) { - LOG.error("Failing {} tuples", Iterables.size(tuples), e); + LOG.error("Failing tuples; count={}, error={}", Iterables.size(tuples), ExceptionUtils.getRootCauseMessage(e)); } - tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t))); + tuples.forEach(t -> collector.ack(t)); --- End diff -- wow, we did we really not ack here before? That's a bug. good catch. > Indexing Topology Crashes with Invalid Message > ---------------------------------------------- > > Key: METRON-1584 > URL: https://issues.apache.org/jira/browse/METRON-1584 > Project: Metron > Issue Type: Bug > Reporter: Nick Allen > Assignee: Nick Allen > Priority: Major > > Per Mohan Venkateshaiah: > I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see > that the random access indexing topology worker thread died and couldn't > recover until the kafka topic was deleted and recreated. > {code:java} > Caused by: java.lang.IllegalStateException: Unable to parse > adkadknalkda;LK;ad;Da;dD; due to null > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25) > ~[stormjar.jar:?] > at > org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > ... 6 more > Caused by: org.json.simple.parser.ParseException > at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25) > ~[stormjar.jar:?] > at > org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > ... 6 more > 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] > [ERROR] Halting process: ("Worker died") > java.lang.RuntimeException: ("Worker died") > at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] > at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161] > 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker > random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700 > 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging > context > 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down > executors > 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down > executor > __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2] > 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 > 2]-send-queue [INFO] Async loop interrupted! > 2018-05-24 09:21:22.239 o.a.s.util > Thread-7-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink-executor[2 > 2] [INFO] Async loop interrupted! > 2018-05-24 09:21:22.240 o.a.h.m.s.s.StormTimelineMetricsSink Thread-20 [INFO] > Stopping Storm Metrics Sink > 2018-05-24 09:21:22.240 o.a.s.d.executor Thread-20 [INFO] Shut down executor > __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2] > 2018-05-24 09:21:22.241 o.a.s.d.executor Thread-20 [INFO] Shutting down > executor indexingBolt:[3 3] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)