METRON-883 Capture Bro Plugin Enhancements from bro/bro-plugins (nickwallen) closes apache/incubator-metron#545
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/19e0e715 Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/19e0e715 Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/19e0e715 Branch: refs/heads/Metron_0.4.0 Commit: 19e0e715d30670612b0ad826ed133ba138bdc20f Parents: fbce3b5 Author: nickwallen <n...@nickallen.org> Authored: Tue Apr 25 19:42:28 2017 -0400 Committer: nickallen <nickal...@apache.org> Committed: Tue Apr 25 19:42:28 2017 -0400 ---------------------------------------------------------------------- metron-sensors/bro-plugin-kafka/README.md | 106 +++++++++++++------ .../bro-plugin-kafka/cmake/FindLibRDKafka.cmake | 30 +++--- .../bro-plugin-kafka/cmake/FindOpenSSL.cmake | 2 + .../scripts/Bro/Kafka/__load__.bro | 2 + .../scripts/Bro/Kafka/logs-to-kafka.bro | 3 +- .../bro-plugin-kafka/src/KafkaWriter.cc | 85 +++++++++------ .../bro-plugin-kafka/src/KafkaWriter.h | 15 +++ 7 files changed, 163 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/README.md ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md index 6d0582e..31b1f54 100644 --- a/metron-sensors/bro-plugin-kafka/README.md +++ b/metron-sensors/bro-plugin-kafka/README.md @@ -1,60 +1,98 @@ -Bro Logging Output to Kafka +Logging Bro Output to Kafka =========================== -A Bro log writer that sends logging output to Kafka. This provides a convenient -means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to -process the data generated by Bro. +A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro. Installation ------------ -Install librdkafka (https://github.com/edenhill/librdkafka), a native client -library for Kafka. This plugin has been tested against the latest release of -librdkafka, which at the time of this writing is v0.9.4. In order to support interacting -with a kerberized kafka, you will need libsasl2 installed +1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4. -``` -# curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz -# cd librdkafka-0.9.4/ -# ./configure --enable-sasl -# make -# sudo make install -``` + In order to use this plugin within a kerberized Kafka environment, you will also need `libsasl2` installed and will need to pass `--enable-sasl` to the `configure` script. -Then compile this Bro plugin using the following commands. + ``` + curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz + cd librdkafka-0.9.4/ + ./configure --enable-sasl + make + sudo make install + ``` -``` -# ./configure --bro-dist=$BRO_SRC -# make -# sudo make install -``` +1. Build the plugin using the following commands. -Run the following command to ensure that the plugin was installed successfully. + ``` + ./configure --bro-dist=$BRO_SRC + make + sudo make install + ``` -``` -# bro -N Bro::Kafka -Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) -``` +1. Run the following command to ensure that the plugin was installed successfully. + + ``` + $ bro -N Bro::Kafka + Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) + ``` Activation ---------- -The easiest way to enable Kafka output is to load the plugin's -`logs-to-kafka.bro` script. If you are using BroControl, the following lines -added to local.bro will activate it. +The following examples highlight different ways that the plugin can be used. Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin. + +### Example 1 + +The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. + * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. + * By defining `topic_name` all records will be sent to the same Kafka topic. ``` @load Bro/Kafka/logs-to-kafka.bro -redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG); +redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); redef Kafka::topic_name = "bro"; redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "localhost:9092" ); ``` -This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on -the localhost to a topic called `bro`. Any configuration value accepted by -librdkafka can be added to the `kafka_conf` configuration table. +### Example 2 + +It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. + * The `topic_name` value must be set to an empty string. + * The `$path` value of Bro's Log Writer mechanism is used to define the topic name. + * Any configuration value accepted by librdkafka can be added to the `$config` configuration table. + * Each log writer accepts a separate configuration table. + +``` +@load Bro/Kafka/logs-to-kafka.bro +redef Kafka::topic_name = ""; +redef Kafka::tag_json = T; + +event bro_init() +{ + # handles HTTP + local http_filter: Log::Filter = [ + $name = "kafka-http", + $writer = Log::WRITER_KAFKAWRITER, + $config = table( + ["stream_id"] = "HTTP::LOG", + ["metadata.broker.list"] = "localhost:9092" + ), + $path = "http" + ]; + Log::add_filter(HTTP::LOG, http_filter); + + # handles DNS + local dns_filter: Log::Filter = [ + $name = "kafka-dns", + $writer = Log::WRITER_KAFKAWRITER, + $config = table( + ["stream_id"] = "DNS::LOG", + ["metadata.broker.list"] = "localhost:9092" + ), + $path = "dns" + ]; + Log::add_filter(DNS::LOG, dns_filter); +} +``` Settings -------- @@ -147,7 +185,7 @@ For an environment where the following is true: The kafka topic `bro` has been given permission for the `metron` user to write: ``` -# login using the metron user +# login using the metron user kinit -kt /etc/security/keytabs/metron.headless.keytab met...@example.com ${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro ``` http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake index c64d8f9..904bfff 100644 --- a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake +++ b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake @@ -16,34 +16,36 @@ # find_path(LibRDKafka_ROOT_DIR - NAMES include/librdkafka/rdkafkacpp.h + NAMES include/librdkafka/rdkafkacpp.h ) find_library(LibRDKafka_LIBRARIES - NAMES rdkafka++ - HINTS ${LibRDKafka_ROOT_DIR}/lib + NAMES rdkafka++ + HINTS ${LibRDKafka_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_library(LibRDKafka_C_LIBRARIES - NAMES rdkafka - HINTS ${LibRDKafka_ROT_DIR}/lib + NAMES rdkafka + HINTS ${LibRDKafka_ROT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_path(LibRDKafka_INCLUDE_DIR - NAMES librdkafka/rdkafkacpp.h - HINTS ${LibRDKafka_ROOT_DIR}/include + NAMES librdkafka/rdkafkacpp.h + HINTS ${LibRDKafka_ROOT_DIR}/include ) include(FindPackageHandleStandardArgs) find_package_handle_standard_args(LibRDKafka DEFAULT_MSG - LibRDKafka_LIBRARIES - LibRDKafka_C_LIBRARIES - LibRDKafka_INCLUDE_DIR + LibRDKafka_LIBRARIES + LibRDKafka_C_LIBRARIES + LibRDKafka_INCLUDE_DIR ) mark_as_advanced( - LibRDKafka_ROOT_DIR - LibRDKafka_LIBRARIES - LibRDKafka_C_LIBRARIES - LibRDKafka_INCLUDE_DIR + LibRDKafka_ROOT_DIR + LibRDKafka_LIBRARIES + LibRDKafka_C_LIBRARIES + LibRDKafka_INCLUDE_DIR ) http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake index 5ed955c..58af5c7 100644 --- a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake +++ b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake @@ -47,11 +47,13 @@ find_path(OpenSSL_INCLUDE_DIR find_library(OpenSSL_SSL_LIBRARY NAMES ssl ssleay32 ssleay32MD HINTS ${OpenSSL_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) find_library(OpenSSL_CRYPTO_LIBRARY NAMES crypto HINTS ${OpenSSL_ROOT_DIR}/lib + PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE} ) set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro index 12295a9..1df1136 100644 --- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro +++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro @@ -17,3 +17,5 @@ # This is loaded when a user activates the plugin. Include scripts here that should be # loaded automatically at that point. # + +@load ./init.bro http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro index 84e390c..d62e03f 100644 --- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro +++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro @@ -35,7 +35,8 @@ event bro_init() &priority=-5 { local filter: Log::Filter = [ $name = fmt("kafka-%s", stream_id), - $writer = Log::WRITER_KAFKAWRITER + $writer = Log::WRITER_KAFKAWRITER, + $config = table(["stream_id"] = fmt("%s", stream_id)) ]; Log::add_filter(stream_id, filter); http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc index 79a85ed..951a60c 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc @@ -22,9 +22,35 @@ using namespace writer; KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) { - // TODO do we need this?? - topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(), - BifConst::Kafka::topic_name->Len()); + // need thread-local copies of all user-defined settings coming from + // bro scripting land. accessing these is not thread-safe and 'DoInit' + // is potentially accessed from multiple threads. + + // tag_json - thread local copy + tag_json = BifConst::Kafka::tag_json; + + // topic name - thread local copy + topic_name.assign( + (const char*)BifConst::Kafka::topic_name->Bytes(), + BifConst::Kafka::topic_name->Len()); + + // kafka_conf - thread local copy + Val* val = BifConst::Kafka::kafka_conf->AsTableVal(); + IterCookie* c = val->AsTable()->InitForIteration(); + HashKey* k; + TableEntryVal* v; + while ((v = val->AsTable()->NextEntry(k, c))) { + + // fetch the key and value + ListVal* index = val->AsTableVal()->RecoverIndex(k); + string key = index->Index(0)->AsString()->CheckString(); + string val = v->Value()->AsString()->CheckString(); + kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val)); + + // cleanup + Unref(index); + delete k; + } } KafkaWriter::~KafkaWriter() @@ -32,6 +58,11 @@ KafkaWriter::~KafkaWriter() bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) { + // if no global 'topic_name' is defined, use the log stream's 'path' + if(topic_name.empty()) { + topic_name = info.path; + } + // initialize the formatter if(BifConst::Kafka::tag_json) { formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH); @@ -39,8 +70,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH); } - // kafka global configuration - string err; + // is debug enabled string debug; debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len()); bool is_debug(!debug.empty()); @@ -53,41 +83,31 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading else { reporter->Info( "Debug is turned off."); } + + // kafka global configuration + string err; conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); // apply the user-defined settings to kafka - Val* val = BifConst::Kafka::kafka_conf->AsTableVal(); - IterCookie* c = val->AsTable()->InitForIteration(); - HashKey* k; - TableEntryVal* v; - while ((v = val->AsTable()->NextEntry(k, c))) { - - // fetch the key and value - ListVal* index = val->AsTableVal()->RecoverIndex(k); - string key = index->Index(0)->AsString()->CheckString(); - string val = v->Value()->AsString()->CheckString(); - - if(is_debug) { - reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str()); - } - // apply setting to kafka - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); - return false; - } - - // cleanup - Unref(index); - delete k; + map<string,string>::iterator i; + for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) { + string key = i->first; + string val = i->second; + + // apply setting to kafka + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); + return false; + } } if(is_debug) { string key("debug"); string val(debug); - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()); return false; - } + } } // create kafka producer @@ -104,9 +124,11 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading reporter->Error("Failed to create topic handle: %s", err.c_str()); return false; } + if(is_debug) { reporter->Info("Successfully created producer."); } + return true; } @@ -130,8 +152,9 @@ bool KafkaWriter::DoFinish(double network_time) // successful only if all messages delivered if (producer->outq_len() == 0) { - reporter->Error("Unable to deliver %0d message(s)", producer->outq_len()); success = true; + } else { + reporter->Error("Unable to deliver %0d message(s)", producer->outq_len()); } delete topic; http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h ---------------------------------------------------------------------- diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h index 7e77bc0..ad3e03f 100644 --- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h +++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h @@ -28,6 +28,17 @@ #include "kafka.bif.h" #include "TaggedJSON.h" +namespace RdKafka { + class Conf; + class Producer; + class Topic; +} + +namespace threading { + namespace formatter { + class Formatter; +}} + namespace logging { namespace writer { /** @@ -54,6 +65,10 @@ protected: virtual bool DoHeartbeat(double network_time, double current_time); private: + static const string default_topic_key; + string stream_id; + bool tag_json; + map<string, string> kafka_conf; string topic_name; threading::formatter::Formatter *formatter; RdKafka::Producer* producer;