Github user dcode commented on a diff in the pull request:
https://github.com/apache/metron-bro-plugin-kafka/pull/6#discussion_r172036578
--- Diff: src/KafkaWriter.cc ---
@@ -54,20 +66,51 @@ KafkaWriter::KafkaWriter(WriterFrontend* frontend):
WriterBackend(frontend), for
}
KafkaWriter::~KafkaWriter()
-{}
+{
+ // Cleanup Kafka resources
+ while (producer->outq_len() > 0) {
+ producer->poll(1000);
+ }
+ producer->poll(1000);
+
+ // Cleanup all the things
+ delete topic;
+ delete producer;
+ delete formatter;
+ delete conf;
+ delete topic_conf;
+
+}
bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const
threading::Field* const* fields)
{
+ // Timeformat object, default to TS_EPOCH
+ threading::formatter::JSON::TimeFormat tf =
threading::formatter::JSON::TS_EPOCH;
+
// if no global 'topic_name' is defined, use the log stream's 'path'
if(topic_name.empty()) {
topic_name = info.path;
}
+ // format timestamps
+ if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 )
--- End diff --
Fixed.
---