METRON-883 Capture Bro Plugin Enhancements from bro/bro-plugins (nickwallen) 
closes apache/incubator-metron#545


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/19e0e715
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/19e0e715
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/19e0e715

Branch: refs/heads/Metron_0.4.0
Commit: 19e0e715d30670612b0ad826ed133ba138bdc20f
Parents: fbce3b5
Author: nickwallen <n...@nickallen.org>
Authored: Tue Apr 25 19:42:28 2017 -0400
Committer: nickallen <nickal...@apache.org>
Committed: Tue Apr 25 19:42:28 2017 -0400

----------------------------------------------------------------------
 metron-sensors/bro-plugin-kafka/README.md       | 106 +++++++++++++------
 .../bro-plugin-kafka/cmake/FindLibRDKafka.cmake |  30 +++---
 .../bro-plugin-kafka/cmake/FindOpenSSL.cmake    |   2 +
 .../scripts/Bro/Kafka/__load__.bro              |   2 +
 .../scripts/Bro/Kafka/logs-to-kafka.bro         |   3 +-
 .../bro-plugin-kafka/src/KafkaWriter.cc         |  85 +++++++++------
 .../bro-plugin-kafka/src/KafkaWriter.h          |  15 +++
 7 files changed, 163 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/README.md 
b/metron-sensors/bro-plugin-kafka/README.md
index 6d0582e..31b1f54 100644
--- a/metron-sensors/bro-plugin-kafka/README.md
+++ b/metron-sensors/bro-plugin-kafka/README.md
@@ -1,60 +1,98 @@
-Bro Logging Output to Kafka
+Logging Bro Output to Kafka
 ===========================
 
-A Bro log writer that sends logging output to Kafka.  This provides a 
convenient
-means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to
-process the data generated by Bro.
+A Bro log writer that sends logging output to Kafka.  This provides a 
convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and 
others, to process the data generated by Bro.
 
 Installation
 ------------
 
-Install librdkafka (https://github.com/edenhill/librdkafka), a native client
-library for Kafka.  This plugin has been tested against the latest release of
-librdkafka, which at the time of this writing is v0.9.4.  In order to support 
interacting
-with a kerberized kafka, you will need libsasl2 installed
+1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native 
client library for Kafka.  This plugin has been tested against the latest 
release of librdkafka, which at the time of this writing is v0.9.4.  
 
-```
-# curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar 
xvz
-# cd librdkafka-0.9.4/
-# ./configure --enable-sasl
-# make
-# sudo make install
-```
+    In order to use this plugin within a kerberized Kafka environment, you 
will also need `libsasl2` installed and will need to pass `--enable-sasl` to 
the `configure` script.
 
-Then compile this Bro plugin using the following commands.
+    ```
+    curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar 
xvz
+    cd librdkafka-0.9.4/
+    ./configure --enable-sasl
+    make
+    sudo make install
+    ```
 
-```
-# ./configure --bro-dist=$BRO_SRC
-# make
-# sudo make install
-```
+1. Build the plugin using the following commands.
 
-Run the following command to ensure that the plugin was installed successfully.
+    ```
+    ./configure --bro-dist=$BRO_SRC
+    make
+    sudo make install
+    ```
 
-```
-# bro -N Bro::Kafka
-Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
-```
+1. Run the following command to ensure that the plugin was installed 
successfully.
+
+    ```
+    $ bro -N Bro::Kafka
+    Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
+    ```
 
 Activation
 ----------
 
-The easiest way to enable Kafka output is to load the plugin's
-`logs-to-kafka.bro` script.  If you are using BroControl, the following lines
-added to local.bro will activate it.
+The following examples highlight different ways that the plugin can be used.  
Simply add Bro script to your `local.bro` file (for example, 
`/usr/share/bro/site/local.bro`) as shown to activate the plugin.
+
+### Example 1
+
+The goal in this example is to send all HTTP and DNS records to a Kafka topic 
named `bro`. 
+ * Any configuration value accepted by librdkafka can be added to the 
`kafka_conf` configuration table.  
+ * By defining `topic_name` all records will be sent to the same Kafka topic.
 
 ```
 @load Bro/Kafka/logs-to-kafka.bro
-redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
+redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
 redef Kafka::topic_name = "bro";
 redef Kafka::kafka_conf = table(
     ["metadata.broker.list"] = "localhost:9092"
 );
 ```
 
-This example will send all HTTP, DNS, and Conn logs to a Kafka broker running 
on
-the localhost to a topic called `bro`. Any configuration value accepted by
-librdkafka can be added to the `kafka_conf` configuration table.
+### Example 2
+
+It is also possible to send each log stream to a uniquely named topic.  The 
goal in this example is to send all HTTP records to a Kafka topic named `http` 
and all DNS records to a separate Kafka topic named `dns`.
+ * The `topic_name` value must be set to an empty string.
+ * The `$path` value of Bro's Log Writer mechanism is used to define the topic 
name.
+ * Any configuration value accepted by librdkafka can be added to the 
`$config` configuration table.  
+ * Each log writer accepts a separate configuration table.
+
+```
+@load Bro/Kafka/logs-to-kafka.bro
+redef Kafka::topic_name = "";
+redef Kafka::tag_json = T;
+
+event bro_init()
+{
+    # handles HTTP
+    local http_filter: Log::Filter = [
+        $name = "kafka-http",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $config = table(
+                ["stream_id"] = "HTTP::LOG",
+                ["metadata.broker.list"] = "localhost:9092"
+        ),
+        $path = "http"
+    ];
+    Log::add_filter(HTTP::LOG, http_filter);
+
+    # handles DNS
+    local dns_filter: Log::Filter = [
+        $name = "kafka-dns",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $config = table(
+                ["stream_id"] = "DNS::LOG",
+                ["metadata.broker.list"] = "localhost:9092"
+        ),
+        $path = "dns"
+    ];
+    Log::add_filter(DNS::LOG, dns_filter);
+}
+```
 
 Settings
 --------
@@ -147,7 +185,7 @@ For an environment where the following is true:
 The kafka topic `bro` has been given permission for the `metron` user to
 write:
 ```
-# login using the metron user 
+# login using the metron user
 kinit -kt /etc/security/keytabs/metron.headless.keytab met...@example.com
 ${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer 
kafka.security.auth.SimpleAclAuthorizer --authorizer-properties 
zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro
 ```

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake 
b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
index c64d8f9..904bfff 100644
--- a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
+++ b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
@@ -16,34 +16,36 @@
 #
 
 find_path(LibRDKafka_ROOT_DIR
-  NAMES include/librdkafka/rdkafkacpp.h
+    NAMES include/librdkafka/rdkafkacpp.h
 )
 
 find_library(LibRDKafka_LIBRARIES
-  NAMES rdkafka++
-  HINTS ${LibRDKafka_ROOT_DIR}/lib
+    NAMES rdkafka++
+    HINTS ${LibRDKafka_ROOT_DIR}/lib
+    PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
 )
 
 find_library(LibRDKafka_C_LIBRARIES
-       NAMES rdkafka
-       HINTS ${LibRDKafka_ROT_DIR}/lib
+    NAMES rdkafka
+    HINTS ${LibRDKafka_ROT_DIR}/lib
+    PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
 )
 
 find_path(LibRDKafka_INCLUDE_DIR
-  NAMES librdkafka/rdkafkacpp.h
-  HINTS ${LibRDKafka_ROOT_DIR}/include
+    NAMES librdkafka/rdkafkacpp.h
+    HINTS ${LibRDKafka_ROOT_DIR}/include
 )
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(LibRDKafka DEFAULT_MSG
-  LibRDKafka_LIBRARIES
-  LibRDKafka_C_LIBRARIES
-  LibRDKafka_INCLUDE_DIR
+    LibRDKafka_LIBRARIES
+    LibRDKafka_C_LIBRARIES
+    LibRDKafka_INCLUDE_DIR
 )
 
 mark_as_advanced(
-  LibRDKafka_ROOT_DIR
-  LibRDKafka_LIBRARIES
-  LibRDKafka_C_LIBRARIES
-  LibRDKafka_INCLUDE_DIR
+    LibRDKafka_ROOT_DIR
+    LibRDKafka_LIBRARIES
+    LibRDKafka_C_LIBRARIES
+    LibRDKafka_INCLUDE_DIR
 )

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake 
b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
index 5ed955c..58af5c7 100644
--- a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
+++ b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
@@ -47,11 +47,13 @@ find_path(OpenSSL_INCLUDE_DIR
 find_library(OpenSSL_SSL_LIBRARY
     NAMES ssl ssleay32 ssleay32MD
     HINTS ${OpenSSL_ROOT_DIR}/lib
+    PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
 )
 
 find_library(OpenSSL_CRYPTO_LIBRARY
     NAMES crypto
     HINTS ${OpenSSL_ROOT_DIR}/lib
+    PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
 )
 
 set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro 
b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
index 12295a9..1df1136 100644
--- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
+++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
@@ -17,3 +17,5 @@
 # This is loaded when a user activates the plugin. Include scripts here that 
should be
 # loaded automatically at that point.
 #
+
+@load ./init.bro

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git 
a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro 
b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
index 84e390c..d62e03f 100644
--- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
+++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
@@ -35,7 +35,8 @@ event bro_init() &priority=-5
                {
                        local filter: Log::Filter = [
                                $name = fmt("kafka-%s", stream_id),
-                               $writer = Log::WRITER_KAFKAWRITER
+                               $writer = Log::WRITER_KAFKAWRITER,
+                               $config = table(["stream_id"] = fmt("%s", 
stream_id))
                        ];
 
                        Log::add_filter(stream_id, filter);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc 
b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
index 79a85ed..951a60c 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
@@ -22,9 +22,35 @@ using namespace writer;
 
 KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), 
formatter(NULL), producer(NULL), topic(NULL)
 {
-    // TODO do we need this??
-    topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(),
-        BifConst::Kafka::topic_name->Len());
+  // need thread-local copies of all user-defined settings coming from
+  // bro scripting land.  accessing these is not thread-safe and 'DoInit'
+  // is potentially accessed from multiple threads.
+
+  // tag_json - thread local copy
+  tag_json = BifConst::Kafka::tag_json;
+
+  // topic name - thread local copy
+  topic_name.assign(
+    (const char*)BifConst::Kafka::topic_name->Bytes(),
+    BifConst::Kafka::topic_name->Len());
+
+  // kafka_conf - thread local copy
+  Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
+  IterCookie* c = val->AsTable()->InitForIteration();
+  HashKey* k;
+  TableEntryVal* v;
+  while ((v = val->AsTable()->NextEntry(k, c))) {
+
+      // fetch the key and value
+      ListVal* index = val->AsTableVal()->RecoverIndex(k);
+      string key = index->Index(0)->AsString()->CheckString();
+      string val = v->Value()->AsString()->CheckString();
+      kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));
+
+      // cleanup
+      Unref(index);
+      delete k;
+  }
 }
 
 KafkaWriter::~KafkaWriter()
@@ -32,6 +58,11 @@ KafkaWriter::~KafkaWriter()
 
 bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const 
threading::Field* const* fields)
 {
+    // if no global 'topic_name' is defined, use the log stream's 'path'
+    if(topic_name.empty()) {
+        topic_name = info.path;
+    }
+
     // initialize the formatter
     if(BifConst::Kafka::tag_json) {
       formatter = new threading::formatter::TaggedJSON(info.path, this, 
threading::formatter::JSON::TS_EPOCH);
@@ -39,8 +70,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
       formatter = new threading::formatter::JSON(this, 
threading::formatter::JSON::TS_EPOCH);
     }
 
-    // kafka global configuration
-    string err;
+    // is debug enabled
     string debug;
     debug.assign((const char*)BifConst::Kafka::debug->Bytes(), 
BifConst::Kafka::debug->Len());
     bool is_debug(!debug.empty());
@@ -53,41 +83,31 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     else {
       reporter->Info( "Debug is turned off.");
     }
+
+    // kafka global configuration
+    string err;
     conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 
     // apply the user-defined settings to kafka
-    Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
-    IterCookie* c = val->AsTable()->InitForIteration();
-    HashKey* k;
-    TableEntryVal* v;
-    while ((v = val->AsTable()->NextEntry(k, c))) {
-
-        // fetch the key and value
-        ListVal* index = val->AsTableVal()->RecoverIndex(k);
-        string key = index->Index(0)->AsString()->CheckString();
-        string val = v->Value()->AsString()->CheckString();
-
-        if(is_debug) {
-            reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str()); 
-        }
-        // apply setting to kafka
-        if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-            reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), 
val.c_str(), err.c_str());
-            return false;
-        }
-
-        // cleanup
-        Unref(index);
-        delete k;
+    map<string,string>::iterator i;
+    for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
+      string key = i->first;
+      string val = i->second;
+
+      // apply setting to kafka
+      if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+          reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), 
val.c_str(), err.c_str());
+          return false;
+      }
     }
 
     if(is_debug) {
         string key("debug");
         string val(debug);
-       if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+        if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
             reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), 
val.c_str(), err.c_str());
             return false;
-       }
+        }
     }
 
     // create kafka producer
@@ -104,9 +124,11 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
         reporter->Error("Failed to create topic handle: %s", err.c_str());
         return false;
     }
+
     if(is_debug) {
         reporter->Info("Successfully created producer.");
     }
+
     return true;
 }
 
@@ -130,8 +152,9 @@ bool KafkaWriter::DoFinish(double network_time)
 
     // successful only if all messages delivered
     if (producer->outq_len() == 0) {
-        reporter->Error("Unable to deliver %0d message(s)", 
producer->outq_len());
         success = true;
+    } else {
+        reporter->Error("Unable to deliver %0d message(s)", 
producer->outq_len());
     }
 
     delete topic;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h 
b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
index 7e77bc0..ad3e03f 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
@@ -28,6 +28,17 @@
 #include "kafka.bif.h"
 #include "TaggedJSON.h"
 
+namespace RdKafka {
+    class Conf;
+    class Producer;
+    class Topic;
+}
+
+namespace threading {
+  namespace formatter {
+    class Formatter;
+}}
+
 namespace logging { namespace writer {
 
 /**
@@ -54,6 +65,10 @@ protected:
     virtual bool DoHeartbeat(double network_time, double current_time);
 
 private:
+    static const string default_topic_key;
+    string stream_id;
+    bool tag_json;
+    map<string, string> kafka_conf;
     string topic_name;
     threading::formatter::Formatter *formatter;
     RdKafka::Producer* producer;

Reply via email to