[ https://issues.apache.org/jira/browse/METRON-1584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493980#comment-16493980 ]
ASF GitHub Bot commented on METRON-1584: ---------------------------------------- GitHub user nickwallen opened a pull request: https://github.com/apache/metron/pull/1036 METRON-1584 Indexing Topology Crashes with Invalid Message If the indexing topology receives a message containing invalid JSON, the topology will crash. The topology needs to handle these types of errors and continue processing. ## Changes * Updated the `BulkWriterComponent` to allow tuples that do not contain valid JSON messages to be error'd out. * Updated `BulkMessageWriterBolt` to handle the case where a tuple does not contain a valid JSON message. * Updated the BulkMessageWriterBolt unit tests to validate that error'd messages are being ack'd and emitted on the correct error stream. Previously a mock was being used that hid a bug with the `BulkWriterComponent`. * Added a unit test for the case where an invalid message is sent as input to the indexing topology. ## Testing 1. Spin-up the development environment. 1. Ensure that messages are successfully being indexing by running the Service Check and/or validating the Alerts UI. 1. Write an invalid message to the "indexing" topic. 1. Ensure the invalid message is sent to the "error" topic. 1. Ensure the indexing topology did not crash. ## Pull Request Checklist - [ ] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel). - [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed? - [ ] Have you included steps or a guide to how the change may be verified and tested manually? - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via: - [ ] Have you written or updated unit tests and or integration tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent? You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/metron METRON-1584 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/metron/pull/1036.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1036 ---- commit 765e05264b378a0a7608092e5842a31239418ae5 Author: Nick Allen <nick@...> Date: 2018-05-29T17:49:31Z METRON-1584 Indexing Topology Crashes with Invalid Message ---- > Indexing Topology Crashes with Invalid Message > ---------------------------------------------- > > Key: METRON-1584 > URL: https://issues.apache.org/jira/browse/METRON-1584 > Project: Metron > Issue Type: Bug > Reporter: Nick Allen > Assignee: Nick Allen > Priority: Major > > Per Mohan Venkateshaiah: > I published message "adkadknalkda;LK;ad;Da;dD;" to indexing topic , I see > that the random access indexing topology worker thread died and couldn't > recover until the kafka topic was deleted and recreated. > {code:java} > Caused by: java.lang.IllegalStateException: Unable to parse > adkadknalkda;LK;ad;Da;dD; due to null > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:49) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25) > ~[stormjar.jar:?] > at > org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > ... 6 more > Caused by: org.json.simple.parser.ParseException > at org.json.simple.parser.Yylex.yylex(Yylex.java:610) ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.nextToken(JSONParser.java:269) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:118) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:81) > ~[stormjar.jar:?] > at org.json.simple.parser.JSONParser.parse(JSONParser.java:75) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:47) > ~[stormjar.jar:?] > at > org.apache.metron.common.message.JSONFromPosition.get(JSONFromPosition.java:25) > ~[stormjar.jar:?] > at > org.apache.metron.writer.bolt.BulkMessageWriterBolt.execute(BulkMessageWriterBolt.java:237) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10195$tuple_action_fn__10197.invoke(executor.clj:735) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__10114.invoke(executor.clj:466) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.disruptor$clojure_handler$reify__4137.onEvent(disruptor.clj:40) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) > ~[storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > ... 6 more > 2018-05-24 09:21:22.236 o.a.s.util Thread-9-indexingBolt-executor[3 3] > [ERROR] Halting process: ("Worker died") > java.lang.RuntimeException: ("Worker died") > at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] > at org.apache.storm.daemon.worker$fn__10799$fn__10800.invoke(worker.clj:763) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at > org.apache.storm.daemon.executor$mk_executor_data$fn__10011$fn__10012.invoke(executor.clj:276) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) > [storm-core-1.1.0.2.6.5.0-292.jar:1.1.0.2.6.5.0-292] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161] > 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Shutting down worker > random_access_indexing-24-1527147389 703b5bf7-6c9d-46f3-8136-0c4877a69375 6700 > 2018-05-24 09:21:22.237 o.a.s.d.worker Thread-20 [INFO] Terminating messaging > context > 2018-05-24 09:21:22.238 o.a.s.d.worker Thread-20 [INFO] Shutting down > executors > 2018-05-24 09:21:22.238 o.a.s.d.executor Thread-20 [INFO] Shutting down > executor > __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2] > 2018-05-24 09:21:22.239 o.a.s.util Thread-6-disruptor-executor[2 > 2]-send-queue [INFO] Async loop interrupted! > 2018-05-24 09:21:22.239 o.a.s.util > Thread-7-__metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink-executor[2 > 2] [INFO] Async loop interrupted! > 2018-05-24 09:21:22.240 o.a.h.m.s.s.StormTimelineMetricsSink Thread-20 [INFO] > Stopping Storm Metrics Sink > 2018-05-24 09:21:22.240 o.a.s.d.executor Thread-20 [INFO] Shut down executor > __metricsorg.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink:[2 2] > 2018-05-24 09:21:22.241 o.a.s.d.executor Thread-20 [INFO] Shutting down > executor indexingBolt:[3 3] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)