This is an automated email from the ASF dual-hosted git repository.

otto pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da1637  METRON-2343 Bro Kafka plugin - ability to dynamically modify 
JSON (ottobackwards) closes apache/metron-bro-plugin-kafka#46
8da1637 is described below

commit 8da1637a50815d6093e482bdb7a1a0882e02df3a
Author: ottobackwards <ottobackwa...@gmail.com>
AuthorDate: Tue May 19 20:54:23 2020 -0400

    METRON-2343 Bro Kafka plugin - ability to dynamically modify JSON 
(ottobackwards) closes apache/metron-bro-plugin-kafka#46
---
 README.md                                    | 28 ++++++++++++++++++++++++++++
 docker/in_docker_scripts/configure_plugin.sh |  2 ++
 scripts/init.zeek                            |  7 +++++++
 src/KafkaWriter.cc                           | 28 +++++++++++++++++++++++-----
 src/KafkaWriter.h                            |  1 +
 src/TaggedJSON.cc                            | 22 +++++++++++++++++++---
 src/TaggedJSON.h                             |  2 +-
 src/kafka.bif                                |  1 +
 tests/Baseline/kafka.show-plugin/output      |  1 +
 9 files changed, 83 insertions(+), 9 deletions(-)

diff --git a/README.md b/README.md
index b4aa98d..b7e7e58 100644
--- a/README.md
+++ b/README.md
@@ -266,6 +266,22 @@ event zeek_init() &priority=-10
 
 _Note_:  Because `Kafka::tag_json` is set to True in this example, the value 
of `$path` is used as the tag for each `Log::Filter`. If you were to add a log 
filter with the same `$path` as an existing filter, Zeek will append "-N", 
where N is an integer starting at 2, to the end of the log path so that each 
filter has its own unique log path. For instance, the second instance of `conn` 
would become `conn-2`.
 
+### Example 7 - Add static values to each outgoing Kafka message
+It is possible to define name value pairs and have them added to each outgoing 
Kafka json message when tagged_json is set to true.  Each will be added to the 
root json object.
+    * the Kafka::additional_message_values table can be configured with each 
name and value
+    * based on the following configuration, each outgoing message will have 
"FIRST_STATIC_NAME": "FIRST_STATIC_VALUE", "SECOND_STATIC_NAME": 
"SECOND_STATIC_VALUE" added.
+```
+@load packages
+redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG, DPD::LOG, 
FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG, Weird::LOG, 
Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG, X509::LOG, 
RFB::LOG, Stats::LOG, CaptureLoss::LOG, SIP::LOG);
+redef Kafka::topic_name = "zeek";
+redef Kafka::tag_json = T;
+redef Kafka::kafka_conf = table(["metadata.broker.list"] = 
"kafka-1:9092,kafka-2:9092");
+redef Kafka::additional_message_values = table(["FIRST_STATIC_NAME"] = 
"FIRST_STATIC_VALUE", ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE");
+redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);
+redef Known::cert_tracking = ALL_HOSTS;
+redef Software::asset_tracking = ALL_HOSTS;
+```
+
 ## Settings
 
 ### `logs_to_send`
@@ -315,6 +331,18 @@ redef Kafka::kafka_conf = table(
 );
 ```
 
+### `additonal_message_values`
+
+A table of of name value pairs.  Each item in this table will be added to each 
outgoing message
+at the root level if tag_json is set to T.
+
+```
+redef Kafka::additional_message_values = table(
+    ["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE",
+    ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE"
+);
+```
+
 ### `tag_json`
 
 If true, a log stream identifier is appended to each JSON-formatted message. 
For
diff --git a/docker/in_docker_scripts/configure_plugin.sh 
b/docker/in_docker_scripts/configure_plugin.sh
index c4479db..8d2f3da 100755
--- a/docker/in_docker_scripts/configure_plugin.sh
+++ b/docker/in_docker_scripts/configure_plugin.sh
@@ -23,6 +23,7 @@ shopt -s nocasematch
 # Configures the zeek kafka plugin
 # Configures the kafka broker
 # Configures the plugin for all the traffic types
+# Configures the plugin to add some additional json values
 #
 
 function help {
@@ -74,6 +75,7 @@ echo "Configuring kafka plugin"
   echo "redef Kafka::topic_name = \"${KAFKA_TOPIC}\";"
   echo "redef Kafka::tag_json = T;"
   echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = 
\"kafka-1:9092,kafka-2:9092\");"
+  echo "redef Kafka::additional_message_values = table([\"FIRST_STATIC_NAME\"] 
= \"FIRST_STATIC_VALUE\", [\"SECOND_STATIC_NAME\"] = \"SECOND_STATIC_VALUE\");"
   echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);"
   echo "redef Known::cert_tracking = ALL_HOSTS;"
   echo "redef Software::asset_tracking = ALL_HOSTS;"
diff --git a/scripts/init.zeek b/scripts/init.zeek
index 6f5a7ae..5636a13 100644
--- a/scripts/init.zeek
+++ b/scripts/init.zeek
@@ -53,6 +53,13 @@ export {
                 ["metadata.broker.list"] = "localhost:9092"
         ) &redef;
 
+        ##  Key value pairs that will be added to outgoing messages at the 
root level
+        ##  for example:          ["zeek_server"] = "this_server_name"
+        ##  will results in a  "zeek_server":"this_server_name" field added to 
the outgoing
+        ##  json
+        ##  note this depends on tag_json being T
+        const additional_message_values: table[string] of string = table() 
&redef;
+
         ## A comma separated list of librdkafka debug contexts
         const debug: string = "" &redef;
 
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 7f26092..deeea95 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -68,6 +68,22 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend):
       Unref(index);
       delete k;
   }
+
+  Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal();
+  c = val->AsTable()->InitForIteration();
+  while ((v = mvals->AsTable()->NextEntry(k, c))) {
+
+    // fetch the key and value
+    ListVal* index = mvals->AsTableVal()->RecoverIndex(k);
+    string key = index->Index(0)->AsString()->CheckString();
+    string val = v->Value()->AsString()->CheckString();
+    additional_message_values.insert (additional_message_values.begin(), 
pair<string, string> (key, val));
+
+    // cleanup
+    Unref(index);
+    delete k;
+  }
+
 }
 
 KafkaWriter::~KafkaWriter()
@@ -126,7 +142,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     }
     else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
       tf = threading::formatter::JSON::TS_ISO8601;
-    } 
+    }
     else {
       Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
         json_timestamps.c_str()));
@@ -136,7 +152,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     // initialize the formatter
     if(BifConst::Kafka::tag_json) {
       formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
-    } 
+    }
     else {
       formatter = new threading::formatter::JSON(this, tf);
     }
@@ -148,9 +164,6 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int 
num_fields, const threading
     if(is_debug) {
       MsgThread::Info(Fmt("Debug is turned on and set to: %s.  Available debug 
context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
     }
-    else {
-      MsgThread::Info(Fmt("Debug is turned off."));
-    }
 
     // kafka global configuration
     string err;
@@ -249,7 +262,12 @@ bool KafkaWriter::DoWrite(int num_fields, const 
threading::Field* const* fields,
         buff.Clear();
 
         // format the log entry
+      if(BifConst::Kafka::tag_json) {
+          
dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, 
num_fields, fields, vals,
+                              additional_message_values);
+       } else {
         formatter->Describe(&buff, num_fields, fields, vals);
+       }
 
         // send the formatted log entry to kafka
         const char *raw = (const char *) buff.Bytes();
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 5ebf4ef..ad29ac7 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -73,6 +73,7 @@ private:
     bool mocking;
     string json_timestamps;
     map<string, string> kafka_conf;
+    map<string, string> additional_message_values;
     string topic_name;
     string topic_name_override;
     threading::formatter::Formatter *formatter;
diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc
index db3f305..f182d95 100644
--- a/src/TaggedJSON.cc
+++ b/src/TaggedJSON.cc
@@ -25,7 +25,7 @@ TaggedJSON::TaggedJSON(string sn, MsgThread* t, 
JSON::TimeFormat tf): JSON(t, tf
 TaggedJSON::~TaggedJSON()
 {}
 
-bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* 
fields, Value** vals) const
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* 
fields, Value** vals, map<string,string> &const_vals) const
 {
     desc->AddRaw("{");
 
@@ -34,10 +34,26 @@ bool TaggedJSON::Describe(ODesc* desc, int num_fields, 
const Field* const* field
     desc->AddRaw(stream_name);
     desc->AddRaw("\": ");
 
+
+
     // append the JSON formatted log record itself
     JSON::Describe(desc, num_fields, fields, vals);
-
-    desc->AddRaw("}");
+    if (const_vals.size() > 0) {
+
+      map<string, string>::iterator it = const_vals.begin();
+      while (it != const_vals.end()) {
+        desc->AddRaw(",");
+        desc->AddRaw("\"");
+        desc->AddRaw(it->first);
+        desc->AddRaw("\": ");
+        desc->AddRaw("\"");
+        desc->AddRaw(it->second);
+        desc->AddRaw("\"");
+        it++;
+      }
+    }
+
+  desc->AddRaw("}");
     return true;
 }
 }}
diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h
index 51b1bf3..9135bf2 100644
--- a/src/TaggedJSON.h
+++ b/src/TaggedJSON.h
@@ -41,7 +41,7 @@ class TaggedJSON : public JSON {
 public:
     TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
     virtual ~TaggedJSON();
-    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* 
fields, Value** vals) const;
+    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* 
fields, Value** vals, map<string,string> &const_vals) const;
 
 private:
     string stream_name;
diff --git a/src/kafka.bif b/src/kafka.bif
index 53e3bfe..47983b1 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -18,6 +18,7 @@
 module Kafka;
 
 const kafka_conf: config;
+const additional_message_values : config;
 const topic_name: string;
 const max_wait_on_shutdown: count;
 const tag_json: bool;
diff --git a/tests/Baseline/kafka.show-plugin/output 
b/tests/Baseline/kafka.show-plugin/output
index 978febc..6e82dd3 100644
--- a/tests/Baseline/kafka.show-plugin/output
+++ b/tests/Baseline/kafka.show-plugin/output
@@ -1,6 +1,7 @@
 Apache::Kafka - Writes logs to Kafka (dynamic)
     [Writer] KafkaWriter (Log::WRITER_KAFKAWRITER)
     [Constant] Kafka::kafka_conf
+    [Constant] Kafka::additional_message_values
     [Constant] Kafka::topic_name
     [Constant] Kafka::max_wait_on_shutdown
     [Constant] Kafka::tag_json

Reply via email to