This is an automated email from the ASF dual-hosted git repository. otto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron-bro-plugin-kafka.git
The following commit(s) were added to refs/heads/master by this push: new 92b85e5 METRON-2360 BRO-PLUGIN: does not build with 3.2.1 (ottobackwards) closes apache/metron-bro-plugin-kafka#48 92b85e5 is described below commit 92b85e5e00cc2fd9023ea7f53466db6592eb6634 Author: ottobackwards <ottobackwa...@gmail.com> AuthorDate: Tue Sep 22 12:46:41 2020 -0400 METRON-2360 BRO-PLUGIN: does not build with 3.2.1 (ottobackwards) closes apache/metron-bro-plugin-kafka#48 --- docker/containers/zeek/Dockerfile | 2 +- docker/docker-compose.yml | 2 +- src/KafkaWriter.cc | 430 ++++++++++++++++++-------------------- src/KafkaWriter.h | 19 +- src/TaggedJSON.cc | 9 +- src/TaggedJSON.h | 23 +- 6 files changed, 238 insertions(+), 247 deletions(-) diff --git a/docker/containers/zeek/Dockerfile b/docker/containers/zeek/Dockerfile index dba31d7..d8eda31 100644 --- a/docker/containers/zeek/Dockerfile +++ b/docker/containers/zeek/Dockerfile @@ -57,7 +57,7 @@ ENV PATH="${PATH}:/usr/bin" # install pip3 and zkg WORKDIR /root COPY requirements.txt requirements.txt -RUN dnf -y install python3-pip && \ +RUN dnf -y install python3-pip diffutils && \ dnf clean all && \ python3 -m pip install --upgrade pip && \ python3 -m pip install -r requirements.txt && \ diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 0579887..4f8ba10 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -29,7 +29,7 @@ services: build: context: containers/zeek args: - ZEEK_VERSION: "3.1.3" + ZEEK_VERSION: "3.2.1" LIBRDKAFKA_VERSION: "1.4.2" image: metron-bro-plugin-kafka_zeek:latest depends_on: diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc index deeea95..1e19b3b 100644 --- a/src/KafkaWriter.cc +++ b/src/KafkaWriter.cc @@ -22,12 +22,8 @@ using namespace logging; using namespace writer; // The Constructor is called once for each log filter that uses this log writer. -KafkaWriter::KafkaWriter(WriterFrontend* frontend): - WriterBackend(frontend), - formatter(NULL), - producer(NULL), - topic(NULL) -{ +KafkaWriter::KafkaWriter(WriterFrontend *frontend) + : WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) { /** * We need thread-local copies of all user-defined settings coming from zeek * scripting land. accessing these is not thread-safe and 'DoInit' is @@ -41,177 +37,170 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend): // json_timestamps ODesc tsfmt; BifConst::Kafka::json_timestamps->Describe(&tsfmt); - json_timestamps.assign( - (const char*) tsfmt.Bytes(), - tsfmt.Len() - ); + json_timestamps.assign((const char *)tsfmt.Bytes(), tsfmt.Len()); // topic name - thread local copy - topic_name.assign( - (const char*)BifConst::Kafka::topic_name->Bytes(), - BifConst::Kafka::topic_name->Len()); + topic_name.assign((const char *)BifConst::Kafka::topic_name->Bytes(), + BifConst::Kafka::topic_name->Len()); // kafka_conf - thread local copy - Val* val = BifConst::Kafka::kafka_conf->AsTableVal(); - IterCookie* c = val->AsTable()->InitForIteration(); - HashKey* k; - TableEntryVal* v; + Val *val = BifConst::Kafka::kafka_conf->AsTableVal(); + IterCookie *c = val->AsTable()->InitForIteration(); + HashKey *k; + TableEntryVal *v; while ((v = val->AsTable()->NextEntry(k, c))) { + // fetch the key and value + ListVal *index = val->AsTableVal()->RecoverIndex(k); + std::string key = index->Index(0)->AsString()->CheckString(); + std::string val = v->Value()->AsString()->CheckString(); + kafka_conf.insert(kafka_conf.begin(), + std::pair<std::string, std::string>(key, val)); - // fetch the key and value - ListVal* index = val->AsTableVal()->RecoverIndex(k); - string key = index->Index(0)->AsString()->CheckString(); - string val = v->Value()->AsString()->CheckString(); - kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val)); - - // cleanup - Unref(index); - delete k; + // cleanup + Unref(index); + delete k; } - Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal(); + Val *mvals = BifConst::Kafka::additional_message_values->AsTableVal(); c = val->AsTable()->InitForIteration(); while ((v = mvals->AsTable()->NextEntry(k, c))) { - - // fetch the key and value - ListVal* index = mvals->AsTableVal()->RecoverIndex(k); - string key = index->Index(0)->AsString()->CheckString(); - string val = v->Value()->AsString()->CheckString(); - additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val)); - - // cleanup + ListVal *index = mvals->AsTableVal()->RecoverIndex(k); + std::string key = index->Index(0)->AsString()->CheckString(); + std::string val = v->Value()->AsString()->CheckString(); + additional_message_values.insert(additional_message_values.begin(), + std::pair<std::string, std::string>(key, val)); Unref(index); delete k; } - } -KafkaWriter::~KafkaWriter() -{ +KafkaWriter::~KafkaWriter() { // Cleanup must happen in DoFinish, not in the destructor } -string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const -{ - map<const char*, const char*>::const_iterator it = info.config.find(name.c_str()); - if (it == info.config.end()) - return string(); - else - return it->second; +std::string KafkaWriter::GetConfigValue(const WriterInfo &info, + const std::string name) const { + std::map<const char *, const char *>::const_iterator it = + info.config.find(name.c_str()); + if (it == info.config.end()) + return std::string(); + else + return it->second; } /** * DoInit is called once for each call to the constructor, but in a separate * thread */ -bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) -{ - - // Timeformat object, default to TS_EPOCH - threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH; - - // Allow overriding of the kafka topic via the Zeek script constant - // 'topic_name' which can be applied when adding a new Zeek log filter. - topic_name_override = GetConfigValue(info, "topic_name"); - - if(!topic_name_override.empty()) { - // Override the topic name if 'topic_name' is specified in the log - // filter's $conf - topic_name = topic_name_override; - } else if(topic_name.empty()) { - // If no global 'topic_name' is defined, use the log stream's 'path' - topic_name = info.path; - } +bool KafkaWriter::DoInit(const WriterInfo &info, int num_fields, + const threading::Field *const *fields) { + // TimeFormat object, default to TS_EPOCH + threading::formatter::JSON::TimeFormat tf = + threading::formatter::JSON::TS_EPOCH; + + // Allow overriding of the kafka topic via the Zeek script constant + // 'topic_name' which can be applied when adding a new Zeek log filter. + topic_name_override = GetConfigValue(info, "topic_name"); + + if (!topic_name_override.empty()) { + // Override the topic name if 'topic_name' is specified in the log + // filter's $conf + topic_name = topic_name_override; + } else if (topic_name.empty()) { + // If no global 'topic_name' is defined, use the log stream's 'path' + topic_name = info.path; + } - if (mocking) { - raise_topic_resolved_event(topic_name); - } + if (mocking) { + raise_topic_resolved_event(topic_name); + } - /** - * Format the timestamps - * NOTE: This string comparision implementation is currently the necessary - * way to do it, as there isn't a way to pass the Zeek enum into C++ enum. - * This makes the user interface consistent with the existing Zeek Logging - * configuration for the ASCII log output. - */ - if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) { - tf = threading::formatter::JSON::TS_EPOCH; - } - else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) { - tf = threading::formatter::JSON::TS_MILLIS; - } - else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) { - tf = threading::formatter::JSON::TS_ISO8601; - } - else { - Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s", - json_timestamps.c_str())); - return false; - } + /** + * Format the timestamps + * NOTE: This string comparision implementation is currently the necessary + * way to do it, as there isn't a way to pass the Zeek enum into C++ enum. + * This makes the user interface consistent with the existing Zeek Logging + * configuration for the ASCII log output. + */ + if (strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0) { + tf = threading::formatter::JSON::TS_EPOCH; + } else if (strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0) { + tf = threading::formatter::JSON::TS_MILLIS; + } else if (strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0) { + tf = threading::formatter::JSON::TS_ISO8601; + } else { + Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s", + json_timestamps.c_str())); + return false; + } - // initialize the formatter - if(BifConst::Kafka::tag_json) { - formatter = new threading::formatter::TaggedJSON(info.path, this, tf); - } - else { - formatter = new threading::formatter::JSON(this, tf); + // initialize the formatter + if (BifConst::Kafka::tag_json) { + formatter = new threading::formatter::TaggedJSON(info.path, this, tf); + } else { + formatter = new threading::formatter::JSON(this, tf); + } + + // is debug enabled + std::string debug; + debug.assign((const char *)BifConst::Kafka::debug->Bytes(), + BifConst::Kafka::debug->Len()); + bool is_debug(!debug.empty()); + if (is_debug) { + MsgThread::Info( + Fmt("Debug is turned on and set to: %s. Available debug context: %s.", + debug.c_str(), RdKafka::get_debug_contexts().c_str())); + } + + // kafka global configuration + std::string err; + conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + + // apply the user-defined settings to kafka + std::map<std::string, std::string>::iterator i; + for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) { + std::string key = i->first; + std::string val = i->second; + + // apply setting to kafka + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), + err.c_str())); + return false; } + } - // is debug enabled - string debug; - debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len()); - bool is_debug(!debug.empty()); - if(is_debug) { - MsgThread::Info(Fmt("Debug is turned on and set to: %s. Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str())); + if (is_debug) { + std::string key("debug"); + std::string val(debug); + if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { + Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), + err.c_str())); + return false; } + } - // kafka global configuration - string err; - conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); - - // apply the user-defined settings to kafka - map<string,string>::iterator i; - for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) { - string key = i->first; - string val = i->second; - - // apply setting to kafka - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str())); - return false; - } + if (!mocking) { + // create kafka producer + producer = RdKafka::Producer::create(conf, err); + if (!producer) { + Error(Fmt("Failed to create producer: %s", err.c_str())); + return false; } - if(is_debug) { - string key("debug"); - string val(debug); - if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) { - Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str())); - return false; - } + // create handle to topic + topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); + topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err); + if (!topic) { + Error(Fmt("Failed to create topic handle: %s", err.c_str())); + return false; } - if (!mocking) { - // create kafka producer - producer = RdKafka::Producer::create(conf, err); - if (!producer) { - Error(Fmt("Failed to create producer: %s", err.c_str())); - return false; - } - - // create handle to topic - topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); - topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err); - if (!topic) { - Error(Fmt("Failed to create topic handle: %s", err.c_str())); - return false; - } - - if (is_debug) { - MsgThread::Info(Fmt("Successfully created producer.")); - } + if (is_debug) { + MsgThread::Info(Fmt("Successfully created producer.")); } - return true; + } + return true; } /** @@ -220,69 +209,68 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading * the thread can be safely terminated. As such, all resources created must be * removed here. */ -bool KafkaWriter::DoFinish(double network_time) -{ - bool success = false; - int poll_interval = 1000; - int waited = 0; - int max_wait = BifConst::Kafka::max_wait_on_shutdown; - - if (!mocking) { - // wait a bit for queued messages to be delivered - while (producer->outq_len() > 0 && waited <= max_wait) { - producer->poll(poll_interval); - waited += poll_interval; - } - - // successful only if all messages delivered - if (producer->outq_len() == 0) { - success = true; - } else { - Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len())); - } - - delete topic; - delete producer; - delete topic_conf; +bool KafkaWriter::DoFinish(double network_time) { + bool success = false; + int poll_interval = 1000; + int waited = 0; + int max_wait = BifConst::Kafka::max_wait_on_shutdown; + + if (!mocking) { + // wait a bit for queued messages to be delivered + while (producer->outq_len() > 0 && waited <= max_wait) { + producer->poll(poll_interval); + waited += poll_interval; + } + + // successful only if all messages delivered + if (producer->outq_len() == 0) { + success = true; + } else { + Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len())); } - delete formatter; - delete conf; - return success; + delete topic; + delete producer; + delete topic_conf; + } + delete formatter; + delete conf; + + return success; } /** * Writer-specific output method implementing recording of one log * entry. */ -bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals) -{ - if (!mocking) { - ODesc buff; - buff.Clear(); - - // format the log entry - if(BifConst::Kafka::tag_json) { - dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals, - additional_message_values); - } else { - formatter->Describe(&buff, num_fields, fields, vals); - } - - // send the formatted log entry to kafka - const char *raw = (const char *) buff.Bytes(); - RdKafka::ErrorCode resp = producer->produce( - topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, - const_cast<char *>(raw), strlen(raw), NULL, NULL); - - if (RdKafka::ERR_NO_ERROR == resp) { - producer->poll(0); - } else { - string err = RdKafka::err2str(resp); - Error(Fmt("Kafka send failed: %s", err.c_str())); - } +bool KafkaWriter::DoWrite(int num_fields, const threading::Field *const *fields, + threading::Value **vals) { + if (!mocking) { + ODesc buff; + buff.Clear(); + + // format the log entry + if (BifConst::Kafka::tag_json) { + dynamic_cast<threading::formatter::TaggedJSON *>(formatter)->Describe( + &buff, num_fields, fields, vals, additional_message_values); + } else { + formatter->Describe(&buff, num_fields, fields, vals); } - return true; + + // send the formatted log entry to kafka + const char *raw = (const char *)buff.Bytes(); + RdKafka::ErrorCode resp = producer->produce( + topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, + const_cast<char *>(raw), strlen(raw), NULL, NULL); + + if (RdKafka::ERR_NO_ERROR == resp) { + producer->poll(0); + } else { + std::string err = RdKafka::err2str(resp); + Error(Fmt("Kafka send failed: %s", err.c_str())); + } + } + return true; } /** @@ -294,10 +282,9 @@ bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, * optimized for performance. The current buffering state can be * queried via IsBuf(). */ -bool KafkaWriter::DoSetBuf(bool enabled) -{ - // no change in behavior - return true; +bool KafkaWriter::DoSetBuf(bool enabled) { + // no change in behavior + return true; } /** @@ -305,12 +292,11 @@ bool KafkaWriter::DoSetBuf(bool enabled) * implementation must override this method but it can just * ignore calls if flushing doesn't align with its semantics. */ -bool KafkaWriter::DoFlush(double network_time) -{ - if (!mocking) { - producer->flush(0); - } - return true; +bool KafkaWriter::DoFlush(double network_time) { + if (!mocking) { + producer->flush(0); + } + return true; } /** @@ -322,31 +308,31 @@ bool KafkaWriter::DoFlush(double network_time) * FinishedRotation() to signal the log manager that potential * postprocessors can now run. */ -bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating) -{ - // no need to perform log rotation - return FinishedRotation(); +bool KafkaWriter::DoRotate(const char *rotated_path, double open, double close, + bool terminating) { + // no need to perform log rotation + return FinishedRotation(); } /** * Triggered by regular heartbeat messages from the main thread. */ -bool KafkaWriter::DoHeartbeat(double network_time, double current_time) -{ - if (!mocking) { - producer->poll(0); - } - return true; +bool KafkaWriter::DoHeartbeat(double network_time, double current_time) { + if (!mocking) { + producer->poll(0); + } + return true; } /** - * Triggered when the topic is resolved from the configuration, when mocking/testing + * Triggered when the topic is resolved from the configuration, when + * mocking/testing * @param topic */ -void KafkaWriter::raise_topic_resolved_event(const string topic) { - if (kafka_topic_resolved_event) { - val_list *vl = new val_list; - vl->append(new StringVal(topic.c_str())); - mgr.QueueEvent(kafka_topic_resolved_event, vl); - } +void KafkaWriter::raise_topic_resolved_event(const std::string topic) { + if (kafka_topic_resolved_event) { + val_list *vl = new val_list; + vl->append(new StringVal(topic.c_str())); + mgr.QueueEvent(kafka_topic_resolved_event, vl); + } } diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h index ad29ac7..ad5fc09 100644 --- a/src/KafkaWriter.h +++ b/src/KafkaWriter.h @@ -19,6 +19,7 @@ #define ZEEK_PLUGIN_BRO_KAFKA_KAFKAWRITER_H #include <librdkafka/rdkafkacpp.h> +#include <map> #include <string> #include <Desc.h> #include <logging/WriterBackend.h> @@ -65,17 +66,17 @@ protected: virtual bool DoHeartbeat(double network_time, double current_time); private: - string GetConfigValue(const WriterInfo& info, const string name) const; - void raise_topic_resolved_event(const string topic); - static const string default_topic_key; - string stream_id; + std::string GetConfigValue(const WriterInfo& info, const std::string name) const; + void raise_topic_resolved_event(const std::string topic); + static const std::string default_topic_key; + std::string stream_id; bool tag_json; bool mocking; - string json_timestamps; - map<string, string> kafka_conf; - map<string, string> additional_message_values; - string topic_name; - string topic_name_override; + std::string json_timestamps; + std::map<std::string, std::string> kafka_conf; + std::map<std::string, std::string> additional_message_values; + std::string topic_name; + std::string topic_name_override; threading::formatter::Formatter *formatter; RdKafka::Producer* producer; RdKafka::Topic* topic; diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc index f182d95..071dc30 100644 --- a/src/TaggedJSON.cc +++ b/src/TaggedJSON.cc @@ -19,13 +19,13 @@ namespace threading { namespace formatter { -TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn) +TaggedJSON::TaggedJSON(std::string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn) {} TaggedJSON::~TaggedJSON() {} -bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const +bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, std::map<std::string,std::string> &const_vals) const { desc->AddRaw("{"); @@ -40,7 +40,7 @@ bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* field JSON::Describe(desc, num_fields, fields, vals); if (const_vals.size() > 0) { - map<string, string>::iterator it = const_vals.begin(); + std::map<std::string, std::string>::iterator it = const_vals.begin(); while (it != const_vals.end()) { desc->AddRaw(","); desc->AddRaw("\""); @@ -56,4 +56,5 @@ bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* field desc->AddRaw("}"); return true; } -}} +} // namespace formatter +} // namespace threading diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h index 9135bf2..f8d3005 100644 --- a/src/TaggedJSON.h +++ b/src/TaggedJSON.h @@ -18,17 +18,19 @@ #ifndef ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H #define ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H -#include <string> #include <Desc.h> +#include <map> +#include <string> #include <threading/Formatter.h> #include <threading/formatters/JSON.h> -using threading::formatter::JSON; +using threading::Field; using threading::MsgThread; using threading::Value; -using threading::Field; +using threading::formatter::JSON; -namespace threading { namespace formatter { +namespace threading { +namespace formatter { /* * A JSON formatter that prepends or 'tags' the content with a log stream @@ -37,15 +39,16 @@ namespace threading { namespace formatter { * { 'http' : { ... }} */ class TaggedJSON : public JSON { - public: - TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf); - virtual ~TaggedJSON(); - virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const; + TaggedJSON(std::string stream_name, MsgThread *t, JSON::TimeFormat tf); + virtual ~TaggedJSON(); + virtual bool Describe(ODesc *desc, int num_fields, const Field *const *fields, + Value **vals, std::map<std::string, std::string> &const_vals) const; private: - string stream_name; + std::string stream_name; }; -}} +} // namespace formatter +} // namespace threading #endif