This is an automated email from the ASF dual-hosted git repository. otto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git
The following commit(s) were added to refs/heads/master by this push: new 8da1637 METRON-2343 Bro Kafka plugin - ability to dynamically modify JSON (ottobackwards) closes apache/metron-bro-plugin-kafka#46 8da1637 is described below commit 8da1637a50815d6093e482bdb7a1a0882e02df3a Author: ottobackwards <ottobackwa...@gmail.com> AuthorDate: Tue May 19 20:54:23 2020 -0400 METRON-2343 Bro Kafka plugin - ability to dynamically modify JSON (ottobackwards) closes apache/metron-bro-plugin-kafka#46 --- README.md | 28 ++++++++++++++++++++++++++++ docker/in_docker_scripts/configure_plugin.sh | 2 ++ scripts/init.zeek | 7 +++++++ src/KafkaWriter.cc | 28 +++++++++++++++++++++++----- src/KafkaWriter.h | 1 + src/TaggedJSON.cc | 22 +++++++++++++++++++--- src/TaggedJSON.h | 2 +- src/kafka.bif | 1 + tests/Baseline/kafka.show-plugin/output | 1 + 9 files changed, 83 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b4aa98d..b7e7e58 100644 --- a/README.md +++ b/README.md @@ -266,6 +266,22 @@ event zeek_init() &priority=-10 _Note_: Because `Kafka::tag_json` is set to True in this example, the value of `$path` is used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path` as an existing filter, Zeek will append "-N", where N is an integer starting at 2, to the end of the log path so that each filter has its own unique log path. For instance, the second instance of `conn` would become `conn-2`. +### Example 7 - Add static values to each outgoing Kafka message +It is possible to define name value pairs and have them added to each outgoing Kafka json message when tagged_json is set to true. Each will be added to the root json object. + * the Kafka::additional_message_values table can be configured with each name and value + * based on the following configuration, each outgoing message will have "FIRST_STATIC_NAME": "FIRST_STATIC_VALUE", "SECOND_STATIC_NAME": "SECOND_STATIC_VALUE" added. +``` +@load packages +redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG, DPD::LOG, FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG, Weird::LOG, Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG, X509::LOG, RFB::LOG, Stats::LOG, CaptureLoss::LOG, SIP::LOG); +redef Kafka::topic_name = "zeek"; +redef Kafka::tag_json = T; +redef Kafka::kafka_conf = table(["metadata.broker.list"] = "kafka-1:9092,kafka-2:9092"); +redef Kafka::additional_message_values = table(["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE", ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE"); +redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG); +redef Known::cert_tracking = ALL_HOSTS; +redef Software::asset_tracking = ALL_HOSTS; +``` + ## Settings ### `logs_to_send` @@ -315,6 +331,18 @@ redef Kafka::kafka_conf = table( ); ``` +### `additonal_message_values` + +A table of of name value pairs. Each item in this table will be added to each outgoing message +at the root level if tag_json is set to T. + +``` +redef Kafka::additional_message_values = table( + ["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE", + ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE" +); +``` + ### `tag_json` If true, a log stream identifier is appended to each JSON-formatted message. For diff --git a/docker/in_docker_scripts/configure_plugin.sh b/docker/in_docker_scripts/configure_plugin.sh index c4479db..8d2f3da 100755 --- a/docker/in_docker_scripts/configure_plugin.sh +++ b/docker/in_docker_scripts/configure_plugin.sh @@ -23,6 +23,7 @@ shopt -s nocasematch # Configures the zeek kafka plugin # Configures the kafka broker # Configures the plugin for all the traffic types +# Configures the plugin to add some additional json values # function help { @@ -74,6 +75,7 @@ echo "Configuring kafka plugin" echo "redef Kafka::topic_name = \"${KAFKA_TOPIC}\";" echo "redef Kafka::tag_json = T;" echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = \"kafka-1:9092,kafka-2:9092\");" + echo "redef Kafka::additional_message_values = table([\"FIRST_STATIC_NAME\"] = \"FIRST_STATIC_VALUE\", [\"SECOND_STATIC_NAME\"] = \"SECOND_STATIC_VALUE\");" echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);" echo "redef Known::cert_tracking = ALL_HOSTS;" echo "redef Software::asset_tracking = ALL_HOSTS;" diff --git a/scripts/init.zeek b/scripts/init.zeek index 6f5a7ae..5636a13 100644 --- a/scripts/init.zeek +++ b/scripts/init.zeek @@ -53,6 +53,13 @@ export { ["metadata.broker.list"] = "localhost:9092" ) &redef; + ## Key value pairs that will be added to outgoing messages at the root level + ## for example: ["zeek_server"] = "this_server_name" + ## will results in a "zeek_server":"this_server_name" field added to the outgoing + ## json + ## note this depends on tag_json being T + const additional_message_values: table[string] of string = table() &redef; + ## A comma separated list of librdkafka debug contexts const debug: string = "" &redef; diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc index 7f26092..deeea95 100644 --- a/src/KafkaWriter.cc +++ b/src/KafkaWriter.cc @@ -68,6 +68,22 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend): Unref(index); delete k; } + + Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal(); + c = val->AsTable()->InitForIteration(); + while ((v = mvals->AsTable()->NextEntry(k, c))) { + + // fetch the key and value + ListVal* index = mvals->AsTableVal()->RecoverIndex(k); + string key = index->Index(0)->AsString()->CheckString(); + string val = v->Value()->AsString()->CheckString(); + additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val)); + + // cleanup + Unref(index); + delete k; + } + } KafkaWriter::~KafkaWriter() @@ -126,7 +142,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading } else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) { tf = threading::formatter::JSON::TS_ISO8601; - } + } else { Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s", json_timestamps.c_str())); @@ -136,7 +152,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading // initialize the formatter if(BifConst::Kafka::tag_json) { formatter = new threading::formatter::TaggedJSON(info.path, this, tf); - } + } else { formatter = new threading::formatter::JSON(this, tf); } @@ -148,9 +164,6 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading if(is_debug) { MsgThread::Info(Fmt("Debug is turned on and set to: %s. Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str())); } - else { - MsgThread::Info(Fmt("Debug is turned off.")); - } // kafka global configuration string err; @@ -249,7 +262,12 @@ bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, buff.Clear(); // format the log entry + if(BifConst::Kafka::tag_json) { + dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals, + additional_message_values); + } else { formatter->Describe(&buff, num_fields, fields, vals); + } // send the formatted log entry to kafka const char *raw = (const char *) buff.Bytes(); diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h index 5ebf4ef..ad29ac7 100644 --- a/src/KafkaWriter.h +++ b/src/KafkaWriter.h @@ -73,6 +73,7 @@ private: bool mocking; string json_timestamps; map<string, string> kafka_conf; + map<string, string> additional_message_values; string topic_name; string topic_name_override; threading::formatter::Formatter *formatter; diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc index db3f305..f182d95 100644 --- a/src/TaggedJSON.cc +++ b/src/TaggedJSON.cc @@ -25,7 +25,7 @@ TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf TaggedJSON::~TaggedJSON() {} -bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const +bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const { desc->AddRaw("{"); @@ -34,10 +34,26 @@ bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* field desc->AddRaw(stream_name); desc->AddRaw("\": "); + + // append the JSON formatted log record itself JSON::Describe(desc, num_fields, fields, vals); - - desc->AddRaw("}"); + if (const_vals.size() > 0) { + + map<string, string>::iterator it = const_vals.begin(); + while (it != const_vals.end()) { + desc->AddRaw(","); + desc->AddRaw("\""); + desc->AddRaw(it->first); + desc->AddRaw("\": "); + desc->AddRaw("\""); + desc->AddRaw(it->second); + desc->AddRaw("\""); + it++; + } + } + + desc->AddRaw("}"); return true; } }} diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h index 51b1bf3..9135bf2 100644 --- a/src/TaggedJSON.h +++ b/src/TaggedJSON.h @@ -41,7 +41,7 @@ class TaggedJSON : public JSON { public: TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf); virtual ~TaggedJSON(); - virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const; + virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const; private: string stream_name; diff --git a/src/kafka.bif b/src/kafka.bif index 53e3bfe..47983b1 100644 --- a/src/kafka.bif +++ b/src/kafka.bif @@ -18,6 +18,7 @@ module Kafka; const kafka_conf: config; +const additional_message_values : config; const topic_name: string; const max_wait_on_shutdown: count; const tag_json: bool; diff --git a/tests/Baseline/kafka.show-plugin/output b/tests/Baseline/kafka.show-plugin/output index 978febc..6e82dd3 100644 --- a/tests/Baseline/kafka.show-plugin/output +++ b/tests/Baseline/kafka.show-plugin/output @@ -1,6 +1,7 @@ Apache::Kafka - Writes logs to Kafka (dynamic) [Writer] KafkaWriter (Log::WRITER_KAFKAWRITER) [Constant] Kafka::kafka_conf + [Constant] Kafka::additional_message_values [Constant] Kafka::topic_name [Constant] Kafka::max_wait_on_shutdown [Constant] Kafka::tag_json