This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push: new baac0a0 MINIFICPP-1033 - PublishKafka fixes baac0a0 is described below commit baac0a08af4db0f8169e68e4618f021716f41338 Author: Daniel Bakai <baka...@gmail.com> AuthorDate: Wed Sep 25 04:43:34 2019 +0200 MINIFICPP-1033 - PublishKafka fixes Fixes: - We now use delivery completion callbacks to properly determine successful delivery - The 'Delivery Guarantee' property had an invalid default value - Most of the configuration errors from librdkafka were silently ignored - rd_kafka_conf_t and rd_kafka_topic_conf_t objects were improperly destructed after their ownership has been taken by rd_kafka_t and rd_kafka_topic_t, respectively MINIFICPP-1033 - SSL fixes Signed-off-by: Arpad Boda <ab...@apche.org> This closes #653 --- NOTICE | 5 + cmake/FindPatch.cmake | 69 +++ extensions/librdkafka/CMakeLists.txt | 9 +- extensions/librdkafka/KafkaConnection.cpp | 21 +- extensions/librdkafka/KafkaConnection.h | 31 +- extensions/librdkafka/KafkaTopic.h | 14 +- extensions/librdkafka/PublishKafka.cpp | 627 +++++++++++++++++------- extensions/librdkafka/PublishKafka.h | 166 ++++++- thirdparty/librdkafka/librdkafka-libressl.patch | 148 ++++++ 9 files changed, 860 insertions(+), 230 deletions(-) diff --git a/NOTICE b/NOTICE index 252860b..7dcbc81 100644 --- a/NOTICE +++ b/NOTICE @@ -29,3 +29,8 @@ Copyright (c) 1996 - 2019, Daniel Stenberg, <dan...@haxx.se>, and many contribut The derived work is adapted from CMake/FindLibSSH2.cmake and can be found in cmake/libssh2/sys/FindLibSSH2.cmake + +This includes derived works from the CMake (BSD 3-Clause licensed) project (https://github.com/Kitware/CMake): +Copyright 2000-2019 Kitware, Inc. and Contributors +The derived work is adapted from + Modules/FindPatch.cmake diff --git a/cmake/FindPatch.cmake b/cmake/FindPatch.cmake new file mode 100644 index 0000000..a81e0eb --- /dev/null +++ b/cmake/FindPatch.cmake @@ -0,0 +1,69 @@ +# Distributed under the OSI-approved BSD 3-Clause License. See accompanying +# file Copyright.txt or https://cmake.org/licensing for details. + +#[=======================================================================[.rst: +FindPatch +--------- + +The module defines the following variables: + +``Patch_EXECUTABLE`` + Path to patch command-line executable. +``Patch_FOUND`` + True if the patch command-line executable was found. + +The following :prop_tgt:`IMPORTED` targets are also defined: + +``Patch::patch`` + The command-line executable. + +Example usage: + +.. code-block:: cmake + + find_package(Patch) + if(Patch_FOUND) + message("Patch found: ${Patch_EXECUTABLE}") + endif() +#]=======================================================================] + +set(_doc "Patch command line executable") +set(_patch_path ) + +if(CMAKE_HOST_WIN32) + set(_patch_path + "$ENV{LOCALAPPDATA}/Programs/Git/bin" + "$ENV{LOCALAPPDATA}/Programs/Git/usr/bin" + "$ENV{APPDATA}/Programs/Git/bin" + "$ENV{APPDATA}/Programs/Git/usr/bin" + ) +endif() + +# First search the PATH +find_program(Patch_EXECUTABLE + NAME patch + PATHS ${_patch_path} + DOC ${_doc} + ) + +if(CMAKE_HOST_WIN32) + # Now look for installations in Git/ directories under typical installation + # prefixes on Windows. + find_program(Patch_EXECUTABLE + NAMES patch + PATH_SUFFIXES Git/usr/bin Git/bin GnuWin32/bin + DOC ${_doc} + ) +endif() + +if(Patch_EXECUTABLE AND NOT TARGET Patch::patch) + add_executable(Patch::patch IMPORTED) + set_property(TARGET Patch::patch PROPERTY IMPORTED_LOCATION ${Patch_EXECUTABLE}) +endif() + +unset(_patch_path) +unset(_doc) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Patch + REQUIRED_VARS Patch_EXECUTABLE) diff --git a/extensions/librdkafka/CMakeLists.txt b/extensions/librdkafka/CMakeLists.txt index 769b0e3..aa74bbf 100644 --- a/extensions/librdkafka/CMakeLists.txt +++ b/extensions/librdkafka/CMakeLists.txt @@ -43,16 +43,19 @@ if(WIN32 OR NOT USE_SYSTEM_ZLIB) endif() string(REPLACE ";" "%" CMAKE_MODULE_PATH_PASSTHROUGH "${CMAKE_MODULE_PATH_PASSTHROUGH_LIST}") +find_package(Patch REQUIRED) + ExternalProject_Add( kafka-external - GIT_REPOSITORY "https://github.com/edenhill/librdkafka.git" - GIT_TAG "v1.0.1" + URL "https://github.com/edenhill/librdkafka/archive/v1.0.1.tar.gz" + URL_HASH "SHA256=b2a2defa77c0ef8c508739022a197886e0644bd7bf6179de1b68bdffb02b3550" + PATCH_COMMAND "${Patch_EXECUTABLE}" -p1 -i "${CMAKE_SOURCE_DIR}/thirdparty/librdkafka/librdkafka-libressl.patch" PREFIX "${BASE_DIR}" LIST_SEPARATOR % # This is needed for passing semicolon-separated lists CMAKE_ARGS ${PASSTHROUGH_CMAKE_ARGS} "-DCMAKE_INSTALL_PREFIX=${BASE_DIR}/install" "-DWITH_SASL=OFF" - "-DOPENSSL_VERSION=1.0.2" + "-DWITH_SSL=ON" "-DRDKAFKA_BUILD_STATIC=ON" "-DRDKAFKA_BUILD_EXAMPLES=OFF" "-DRDKAFKA_BUILD_TESTS=OFF" diff --git a/extensions/librdkafka/KafkaConnection.cpp b/extensions/librdkafka/KafkaConnection.cpp index 28b0b45..04b327d 100644 --- a/extensions/librdkafka/KafkaConnection.cpp +++ b/extensions/librdkafka/KafkaConnection.cpp @@ -25,19 +25,24 @@ namespace processors { KafkaConnection::KafkaConnection(const KafkaConnectionKey &key) : logger_(logging::LoggerFactory<KafkaConnection>::getLogger()), - conf_(nullptr), - kafka_connection_(nullptr) { + kafka_connection_(nullptr), + poll_(false) { lease_ = false; initialized_ = false; key_ = key; } +KafkaConnection::~KafkaConnection() { + remove(); +} + void KafkaConnection::remove() { topics_.clear(); removeConnection(); } void KafkaConnection::removeConnection() { + stopPoll(); if (kafka_connection_) { rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */ rd_kafka_destroy(kafka_connection_); @@ -46,10 +51,6 @@ void KafkaConnection::removeConnection() { }); kafka_connection_ = nullptr; } - if (conf_) { - rd_kafka_conf_destroy(conf_); - conf_ = nullptr; - } initialized_ = false; } @@ -57,18 +58,14 @@ bool KafkaConnection::initialized() const { return initialized_; } -void KafkaConnection::setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) { +void KafkaConnection::setConnection(rd_kafka_t *producer) { removeConnection(); kafka_connection_ = producer; - conf_ = conf; initialized_ = true; modifyLoggers([&](std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>& loggers) { loggers[producer] = logger_; }); -} - -rd_kafka_conf_t *KafkaConnection::getConf() const { - return conf_; + startPoll(); } rd_kafka_t *KafkaConnection::getConnection() const { diff --git a/extensions/librdkafka/KafkaConnection.h b/extensions/librdkafka/KafkaConnection.h index 6e158fe..774754b 100644 --- a/extensions/librdkafka/KafkaConnection.h +++ b/extensions/librdkafka/KafkaConnection.h @@ -18,6 +18,7 @@ #ifndef NIFI_MINIFI_CPP_KAFKACONNECTION_H #define NIFI_MINIFI_CPP_KAFKACONNECTION_H +#include <atomic> #include <mutex> #include <string> #include "core/logging/LoggerConfiguration.h" @@ -46,9 +47,7 @@ class KafkaConnection { explicit KafkaConnection(const KafkaConnectionKey &key); - ~KafkaConnection() { - remove(); - } + ~KafkaConnection(); void remove(); @@ -56,9 +55,7 @@ class KafkaConnection { bool initialized() const; - void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf); - - rd_kafka_conf_t *getConf() const; + void setConnection(rd_kafka_t *producer); rd_kafka_t *getConnection() const; @@ -90,9 +87,11 @@ class KafkaConnection { std::map<std::string, std::shared_ptr<KafkaTopic>> topics_; - rd_kafka_conf_t *conf_; rd_kafka_t *kafka_connection_; + std::atomic<bool> poll_; + std::thread thread_kafka_poll_; + static void modifyLoggers(const std::function<void(std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>>&)>& func) { static std::mutex loggers_mutex; static std::unordered_map<const rd_kafka_t*, std::weak_ptr<logging::Logger>> loggers; @@ -100,6 +99,24 @@ class KafkaConnection { std::lock_guard<std::mutex> lock(loggers_mutex); func(loggers); } + + void stopPoll() { + poll_ = false; + logger_->log_debug("Stop polling"); + if (thread_kafka_poll_.joinable()) { + thread_kafka_poll_.join(); + } + } + + void startPoll() { + poll_ = true; + logger_->log_debug("Start polling"); + thread_kafka_poll_ = std::thread([this]{ + while (this->poll_) { + rd_kafka_poll(this->kafka_connection_, 1000); + } + }); + } }; class KafkaLease { diff --git a/extensions/librdkafka/KafkaTopic.h b/extensions/librdkafka/KafkaTopic.h index 30c34a0..f7b8f6e 100644 --- a/extensions/librdkafka/KafkaTopic.h +++ b/extensions/librdkafka/KafkaTopic.h @@ -28,9 +28,8 @@ namespace processors { class KafkaTopic { public: - KafkaTopic(rd_kafka_topic_t *topic_reference, rd_kafka_topic_conf_t *topic_conf) - : topic_conf_(topic_conf), - topic_reference_(topic_reference) { + KafkaTopic(rd_kafka_topic_t *topic_reference) + : topic_reference_(topic_reference) { } @@ -38,21 +37,12 @@ class KafkaTopic { if (topic_reference_) { rd_kafka_topic_destroy(topic_reference_); } - if (topic_conf_) { - rd_kafka_topic_conf_destroy(topic_conf_); - } - } - - rd_kafka_topic_conf_t *getTopicConf() const { - return topic_conf_; } - rd_kafka_topic_t *getTopic() const { return topic_reference_; } private: - rd_kafka_topic_conf_t *topic_conf_; rd_kafka_topic_t *topic_reference_; }; diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index b248c1e..75f8839 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -26,6 +26,7 @@ #include <set> #include "utils/TimeUtil.h" #include "utils/StringUtils.h" +#include "utils/ScopeGuard.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -36,37 +37,51 @@ namespace minifi { namespace processors { core::Property PublishKafka::SeedBrokers( - core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")->isRequired(true)->supportsExpressionLanguage( - true)->build()); + core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + ->isRequired(true)->supportsExpressionLanguage(true)->build()); -core::Property PublishKafka::Topic(core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")->isRequired(true)->supportsExpressionLanguage(true)->build()); +core::Property PublishKafka::Topic( + core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest") + ->isRequired(true)->supportsExpressionLanguage(true)->build()); core::Property PublishKafka::DeliveryGuarantee( - core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka")->isRequired(false) - ->supportsExpressionLanguage(true)->withDefaultValue("DELIVERY_ONE_NODE")->build()); + core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka") + ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build()); -core::Property PublishKafka::MaxMessageSize(core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")->isRequired(false)->build()); +core::Property PublishKafka::MaxMessageSize( + core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size") + ->isRequired(false)->build()); core::Property PublishKafka::RequestTimeOut( - core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request in milliseconds")->isRequired(false)->withDefaultValue<core::TimePeriodValue>( - "10 sec")->supportsExpressionLanguage(true)->build()); + core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request") + ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->supportsExpressionLanguage(true)->build()); + +core::Property PublishKafka::MessageTimeOut( + core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take") + ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->supportsExpressionLanguage(true)->build()); core::Property PublishKafka::ClientName( - core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")->isRequired(true)->supportsExpressionLanguage(true)->build()); + core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka") + ->isRequired(true)->supportsExpressionLanguage(true)->build()); /** * These don't appear to need EL support */ core::Property PublishKafka::BatchSize( - core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")->isRequired(false)->withDefaultValue<uint32_t>(10)->build()); - + core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet") + ->isRequired(false)->withDefaultValue<uint32_t>(10)->build()); +core::Property PublishKafka::TargetBatchPayloadSize( + core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).") + ->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build()); core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", ""); core::Property PublishKafka::QueueBufferMaxTime("Queue Buffering Max Time", "Delay to wait for messages in the producer queue to accumulate before constructing message batches", ""); core::Property PublishKafka::QueueBufferMaxSize("Queue Max Buffer Size", "Maximum total message size sum allowed on the producer queue", ""); core::Property PublishKafka::QueueBufferMaxMessage("Queue Max Message", "Maximum number of messages allowed on the producer queue", ""); core::Property PublishKafka::CompressCodec("Compress Codec", "compression codec to use for compressing message sets", COMPRESSION_CODEC_NONE); -core::Property PublishKafka::MaxFlowSegSize("Max Flow Segment Size", "Maximum flow content payload segment size for the kafka record", ""); +core::Property PublishKafka::MaxFlowSegSize( + core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.") + ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build()); core::Property PublishKafka::SecurityProtocol("Security Protocol", "Protocol used to communicate with brokers", ""); core::Property PublishKafka::SecurityCA("Security CA", "File or directory path to CA certificate(s) for verifying the broker's key", ""); core::Property PublishKafka::SecurityCert("Security Cert", "Path to client's public key (PEM) used for authentication", ""); @@ -81,6 +96,7 @@ core::Property PublishKafka::MessageKeyField("Message Key Field", "The name of a ""); core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable." "Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", ""); + core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship"); core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship"); @@ -92,9 +108,11 @@ void PublishKafka::initialize() { properties.insert(DeliveryGuarantee); properties.insert(MaxMessageSize); properties.insert(RequestTimeOut); + properties.insert(MessageTimeOut); properties.insert(ClientName); properties.insert(AttributeNameRegex); properties.insert(BatchSize); + properties.insert(TargetBatchPayloadSize); properties.insert(QueueBufferMaxTime); properties.insert(QueueBufferMaxSize); properties.insert(QueueBufferMaxMessage); @@ -119,165 +137,220 @@ void PublishKafka::initialize() { } void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { + interrupted_ = false; +} + +void PublishKafka::notifyStop() { + logger_->log_debug("notifyStop called"); + interrupted_ = true; + std::lock_guard<std::mutex> lock(messages_mutex_); + for (auto& messages : messages_set_) { + messages->interrupt(); + } } -bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff) { +/** + * Message delivery report callback using the richer rd_kafka_message_t object. + */ +void PublishKafka::messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* /*opaque*/) { + if (rkmessage->_private == nullptr) { + return; + } + std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>* func = + reinterpret_cast<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>*>(rkmessage->_private); + try { + (*func)(rk, rkmessage); + } catch (...) { + } + delete func; +} + +bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context) { std::string value; int64_t valInt; std::string valueConf; - char errstr[512]; + std::array<char, 512U> errstr; rd_kafka_conf_res_t result; - auto conf_ = rd_kafka_conf_new(); + rd_kafka_conf_t* conf_ = rd_kafka_conf_new(); + if (conf_ == nullptr) { + logger_->log_error("Failed to create rd_kafka_conf_t object"); + return false; + } + utils::ScopeGuard confGuard([conf_](){ + rd_kafka_conf_destroy(conf_); + }); auto key = conn->getKey(); - if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: debug properties [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure debug properties error result [%s]", errstr); - } - - if (!key->brokers_.empty()) { - result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); - } else { + if (key->brokers_.empty()) { logger_->log_error("There are no brokers"); return false; } + result = rd_kafka_conf_set(conf_, "bootstrap.servers", key->brokers_.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: bootstrap.servers [%s]", key->brokers_); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } - if (!key->client_id_.empty()) { - rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); - } else { + if (key->client_id_.empty()) { logger_->log_error("Client id is empty"); return false; } + result = rd_kafka_conf_set(conf_, "client.id", key->client_id_.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: client.id [%s]", key->client_id_); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } -// Kerberos configuration + value = ""; + if (context->getProperty(DebugContexts.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "debug", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: debug [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure debug error result [%s]", errstr.data()); + return false; + } + } + value = ""; if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) { - result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) { - result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) { - result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { valueConf = std::to_string(valInt); - result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "message.max.bytes", valueConf.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: message.max.bytes [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(QueueBufferMaxMessage.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "queue.buffering.max.messages", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.messages [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); - } - value = ""; - if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) { - attributeNameRegex.assign(value); - logger_->log_debug("PublishKafka: AttributeNameRegex %s", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(QueueBufferMaxSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { valInt = valInt / 1024; valueConf = std::to_string(valInt); - rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "queue.buffering.max.kbytes", valueConf.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.kbytes [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); - } - value = ""; - max_seg_size_ = ULLONG_MAX; - if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, valInt)) { - max_seg_size_ = valInt; - logger_->log_debug("PublishKafka: max flow segment size [%llu]", max_seg_size_); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(QueueBufferMaxTime.getName(), value) && !value.empty()) { core::TimeUnit unit; if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { valueConf = std::to_string(valInt); - rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "queue.buffering.max.ms", valueConf.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: queue.buffering.max.ms [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure queue buffer error result [%s]", errstr.data()); + return false; + } } } value = ""; if (context->getProperty(BatchSize.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "batch.num.messages", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: batch.num.messages [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure batch size error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(CompressCodec.getName(), value) && !value.empty() && value != "none") { - rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "compression.codec", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: compression.codec [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure compression codec error result [%s]", errstr.data()); + return false; + } } value = ""; if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) { if (value == SECURITY_PROTOCOL_SSL) { - rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, "security.protocol", value.c_str(), errstr.data(), errstr.size()); logger_->log_debug("PublishKafka: security.protocol [%s]", value); if (result != RD_KAFKA_CONF_OK) { - logger_->log_error("PublishKafka: configure error result [%s]", errstr); - } else { - value = ""; - if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } + value = ""; + if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "ssl.ca.location", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: ssl.ca.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; } - value = ""; - if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityCert.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "ssl.certificate.location", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; } - value = ""; - if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: ssl.key.location [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityPrivateKey.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "ssl.key.location", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: ssl.key.location [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; } - value = ""; - if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { - rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: ssl.key.password [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure error result [%s]", errstr); + } + value = ""; + if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && !value.empty()) { + result = rd_kafka_conf_set(conf_, "ssl.key.password", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: ssl.key.password [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; } } + } else { + logger_->log_error("PublishKafka: unknown Security Protocol: %s", value); + return false; } } @@ -289,122 +362,320 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<KafkaConnection> value = ""; if (context->getDynamicProperty(key, value) && !value.empty()) { logger_->log_debug("PublishKafka: DynamicProperty: [%s] -> [%s]", key, value); - rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr, sizeof(errstr)); + result = rd_kafka_conf_set(conf_, key.c_str(), value.c_str(), errstr.data(), errstr.size()); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure error result [%s]", errstr.data()); + return false; + } } else { logger_->log_warn("PublishKafka Dynamic Property '%s' is empty and therefore will not be configured", key); } } + // Set the delivery callback + rd_kafka_conf_set_dr_msg_cb(conf_, &PublishKafka::messageDeliveryCallback); + // Set the logger callback - rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback); + rd_kafka_conf_set_log_cb(conf_, &KafkaConnection::logCallback); + + rd_kafka_t* producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr.data(), errstr.size()); + + if (producer == nullptr) { + logger_->log_error("Failed to create Kafka producer %s", errstr.data()); + return false; + } + + // The producer took ownership of the configuration, we must not free it + confGuard.disable(); + + conn->setConnection(producer); + + return true; +} + +bool PublishKafka::createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name) { + rd_kafka_topic_conf_t* topic_conf_ = rd_kafka_topic_conf_new(); + if (topic_conf_ == nullptr) { + logger_->log_error("Failed to create rd_kafka_topic_conf_t object"); + return false; + } + utils::ScopeGuard confGuard([topic_conf_](){ + rd_kafka_topic_conf_destroy(topic_conf_); + }); + + rd_kafka_conf_res_t result; + std::string value; + std::array<char, 512U> errstr; + int64_t valInt; + std::string valueConf; - auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)); + value = ""; + if (context->getProperty(DeliveryGuarantee.getName(), value) && !value.empty()) { + result = rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: request.required.acks [%s]", value); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure request.required.acks error result [%s]", errstr.data()); + return false; + } + } + value = ""; + if (context->getProperty(RequestTimeOut.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && + core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + valueConf = std::to_string(valInt); + result = rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure request.timeout.ms error result [%s]", errstr.data()); + return false; + } + } + } + value = ""; + if (context->getProperty(MessageTimeOut.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && + core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + valueConf = std::to_string(valInt); + result = rd_kafka_topic_conf_set(topic_conf_, "message.timeout.ms", valueConf.c_str(), errstr.data(), errstr.size()); + logger_->log_debug("PublishKafka: message.timeout.ms [%s]", valueConf); + if (result != RD_KAFKA_CONF_OK) { + logger_->log_error("PublishKafka: configure message.timeout.ms error result [%s]", errstr.data()); + return false; + } + } + } - if (!producer) { - logger_->log_error("Failed to create Kafka producer %s", errstr); + rd_kafka_topic_t* topic_reference = rd_kafka_topic_new(conn->getConnection(), topic_name.c_str(), topic_conf_); + if (topic_reference == nullptr) { + rd_kafka_resp_err_t resp_err = rd_kafka_last_error(); + logger_->log_error("PublishKafka: failed to create topic %s, error: %s", topic_name.c_str(), rd_kafka_err2str(resp_err)); return false; } - conn->setConnection(producer, conf_); + // The topic took ownership of the configuration, we must not free it + confGuard.disable(); + + std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference); + + conn->putTopic(topic_name, kafkaTopicref); return true; } void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - logger_->log_trace("Enter trigger"); - std::shared_ptr<core::FlowFile> flowFile = session->get(); + // Check whether we have been interrupted + if (interrupted_) { + logger_->log_info("The processor has been interrupted, not running onTrigger"); + context->yield(); + return; + } - if (!flowFile) { + // Try to get a KafkaConnection + std::string client_id, brokers; + if (!context->getProperty(ClientName.getName(), client_id)) { + logger_->log_error("Client Name property missing or invalid"); + context->yield(); + return; + } + if (!context->getProperty(SeedBrokers.getName(), brokers)) { + logger_->log_error("Knowb Brokers property missing or invalid"); + context->yield(); return; } - std::string client_id, brokers, topic; + KafkaConnectionKey key; + key.brokers_ = brokers; + key.client_id_ = client_id; - std::unique_ptr<KafkaLease> lease; - std::shared_ptr<KafkaConnection> conn; -// get the client ID, brokers, and topic from either the flowfile, the configuration, or the properties - if (context->getProperty(ClientName, client_id, flowFile) && context->getProperty(SeedBrokers, brokers, flowFile) && context->getProperty(Topic, topic, flowFile)) { - KafkaConnectionKey key; - key.brokers_ = brokers; - key.client_id_ = client_id; + std::unique_ptr<KafkaLease> lease = connection_pool_.getOrCreateConnection(key); + if (lease == nullptr) { + logger_->log_info("This connection is used by another thread."); + context->yield(); + return; + } - lease = connection_pool_.getOrCreateConnection(key); - if (lease == nullptr) { - logger_->log_info("This connection is used by another thread."); + std::shared_ptr<KafkaConnection> conn = lease->getConn(); + if (!conn->initialized()) { + logger_->log_trace("Connection not initialized to %s, %s", client_id, brokers); + if (!configureNewConnection(conn, context)) { + logger_->log_error("Could not configure Kafka Connection"); context->yield(); return; } - conn = lease->getConn(); - - if (!conn->initialized()) { - logger_->log_trace("Connection not initialized to %s, %s, %s", client_id, brokers, topic); - if (!configureNewConnection(conn, context, flowFile)) { - logger_->log_error("Could not configure Kafka Connection"); - session->transfer(flowFile, Failure); - return; - } - } + } - if (!conn->hasTopic(topic)) { - auto topic_conf_ = rd_kafka_topic_conf_new(); - auto topic_reference = rd_kafka_topic_new(conn->getConnection(), topic.c_str(), topic_conf_); - rd_kafka_conf_res_t result; - std::string value; - char errstr[512]; - int64_t valInt; - std::string valueConf; - - if (context->getProperty(DeliveryGuarantee, value, flowFile) && !value.empty()) { - rd_kafka_topic_conf_set(topic_conf_, "request.required.acks", value.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: request.required.acks [%s]", value); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure delivery guarantee error result [%s]", errstr); - } - value = ""; - if (context->getProperty(RequestTimeOut, value, flowFile) && !value.empty()) { - core::TimeUnit unit; - if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { - valueConf = std::to_string(valInt); - rd_kafka_topic_conf_set(topic_conf_, "request.timeout.ms", valueConf.c_str(), errstr, sizeof(errstr)); - logger_->log_debug("PublishKafka: request.timeout.ms [%s]", valueConf); - if (result != RD_KAFKA_CONF_OK) - logger_->log_error("PublishKafka: configure timeout error result [%s]", errstr); - } - } + // Get some properties not (only) used directly to set up librdkafka + std::string value; - std::shared_ptr<KafkaTopic> kafkaTopicref = std::make_shared<KafkaTopic>(topic_reference, topic_conf_); + // Batch Size + uint32_t batch_size; + value = ""; + if (context->getProperty(BatchSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, batch_size)) { + logger_->log_debug("PublishKafka: Batch Size [%lu]", batch_size); + } else { + batch_size = 10; + } - conn->putTopic(topic, kafkaTopicref); - } + // Target Batch Payload Size + uint64_t target_batch_payload_size; + value = ""; + if (context->getProperty(TargetBatchPayloadSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, target_batch_payload_size)) { + logger_->log_debug("PublishKafka: Target Batch Payload Size [%llu]", target_batch_payload_size); } else { - logger_->log_error("Do not have required properties"); - session->transfer(flowFile, Failure); - return; + target_batch_payload_size = 512 * 1024U; } - std::string kafkaKey; - kafkaKey = ""; - if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) { - logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey); + // Max Flow Segment Size + uint64_t max_flow_seg_size; + value = ""; + if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, max_flow_seg_size)) { + logger_->log_debug("PublishKafka: Max Flow Segment Size [%llu]", max_flow_seg_size); } else { - kafkaKey = flowFile->getUUIDStr(); + max_flow_seg_size = 0U; + } + + // Attributes to Send as Headers + utils::Regex attributeNameRegex; + value = ""; + if (context->getProperty(AttributeNameRegex.getName(), value) && !value.empty()) { + attributeNameRegex = utils::Regex(value); + logger_->log_debug("PublishKafka: AttributeNameRegex [%s]", value); + } + + // Collect FlowFiles to process + uint64_t actual_bytes = 0U; + std::vector<std::shared_ptr<core::FlowFile>> flowFiles; + for (uint32_t i = 0; i < batch_size; i++) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + if (flowFile == nullptr) { + break; + } + actual_bytes += flowFile->getSize(); + flowFiles.emplace_back(std::move(flowFile)); + if (target_batch_payload_size != 0U && actual_bytes >= target_batch_payload_size) { + break; + } + } + if (flowFiles.empty()) { + context->yield(); + return; } + logger_->log_debug("Processing %lu flow files with a total size of %llu B", flowFiles.size(), actual_bytes); + + auto messages = std::make_shared<Messages>(); + // We must add this to the messages set, so that it will be interrupted when notifyStop is called + { + std::lock_guard<std::mutex> lock(messages_mutex_); + messages_set_.emplace(messages); + } + // We also have to insure that it will be removed once we are done with it + utils::ScopeGuard messagesSetGuard([&]() { + std::lock_guard<std::mutex> lock(messages_mutex_); + messages_set_.erase(messages); + }); + + // Process FlowFiles + for (auto& flowFile : flowFiles) { + size_t flow_file_index = messages->addFlowFile(); + + // Get Topic (FlowFile-dependent EL property) + std::string topic; + if (!context->getProperty(Topic, topic, flowFile)) { + logger_->log_error("Flow file %s does not have a valid Topic", flowFile->getUUIDStr()); + messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { + flow_file_result.flow_file_error = true; + }); + continue; + } + + // Add topic to the connection if needed + if (!conn->hasTopic(topic)) { + if (!createNewTopic(conn, context, topic)) { + logger_->log_error("Failed to add topic %s", topic); + messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { + flow_file_result.flow_file_error = true; + }); + continue; + } + } - auto thisTopic = conn->getTopic(topic); - if (thisTopic) { - PublishKafka::ReadCallback callback(max_seg_size_, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile, attributeNameRegex); + std::string kafkaKey; + kafkaKey = ""; + if (context->getDynamicProperty(MessageKeyField, kafkaKey, flowFile) && !kafkaKey.empty()) { + logger_->log_debug("PublishKafka: Message Key Field [%s]", kafkaKey); + } else { + kafkaKey = flowFile->getUUIDStr(); + } + + auto thisTopic = conn->getTopic(topic); + if (thisTopic == nullptr) { + logger_->log_error("Topic %s is invalid", topic); + messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { + flow_file_result.flow_file_error = true; + }); + continue; + } + + PublishKafka::ReadCallback callback(max_flow_seg_size, kafkaKey, thisTopic->getTopic(), conn->getConnection(), flowFile, + attributeNameRegex, messages, flow_file_index); session->read(flowFile, &callback); if (callback.status_ < 0) { - logger_->log_error("Failed to send flow to kafka topic %s", topic); - session->transfer(flowFile, Failure); - } else { - logger_->log_debug("Sent flow with length %d to kafka topic %s", callback.read_size_, topic); - session->transfer(flowFile, Success); + logger_->log_error("Failed to send flow to kafka topic %s, error: %s", topic, callback.error_); + messages->modifyResult(flow_file_index, [](FlowFileResult& flow_file_result) { + flow_file_result.flow_file_error = true; + }); + continue; } - } else { - logger_->log_error("Topic %s is invalid", topic); - session->transfer(flowFile, Failure); } + + logger_->log_trace("PublishKafka::onTrigger waitForCompletion start"); + messages->waitForCompletion(); + if (messages->wasInterrupted()) { + logger_->log_warn("Waiting for delivery confirmation was interrupted, some flow files might be routed to Failure, even if they were successfully delivered."); + } + logger_->log_trace("PublishKafka::onTrigger waitForCompletion finish"); + + messages->iterateFlowFiles([&](size_t index, const FlowFileResult& flow_file) { + bool success; + if (flow_file.flow_file_error) { + success = false; + } else if (flow_file.messages.empty()) { + success = false; + logger_->log_error("Assertion error: no messages found for flow file %s", flowFiles[index]->getUUIDStr()); + } else { + success = true; + for (size_t segment_num = 0; segment_num < flow_file.messages.size(); segment_num++) { + const auto& message = flow_file.messages[segment_num]; + switch (message.status) { + case MessageStatus::MESSAGESTATUS_UNCOMPLETE: + success = false; + logger_->log_error("Waiting for delivery confirmation was interrupted for flow file %s segment %zu", + flowFiles[index]->getUUIDStr(), + segment_num); + break; + case MessageStatus::MESSAGESTATUS_ERROR: + success = false; + logger_->log_error("Failed to deliver flow file %s segment %zu, error: %s", + flowFiles[index]->getUUIDStr(), + segment_num, + rd_kafka_err2str(message.err_code)); + break; + case MessageStatus::MESSAGESTATUS_SUCCESS: + logger_->log_debug("Successfully delivered flow file %s segment %zu", + flowFiles[index]->getUUIDStr(), + segment_num); + break; + } + } + } + if (success) { + session->transfer(flowFiles[index], Success); + } else { + session->transfer(flowFiles[index], Failure); + } + }); } } /* namespace processors */ diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 4d0a759..2881f49 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -28,9 +28,15 @@ #include "core/Property.h" #include "core/logging/LoggerConfiguration.h" #include "core/logging/Logger.h" +#include "utils/RegexUtils.h" #include "rdkafka.h" #include "KafkaPool.h" -#include <regex> +#include <atomic> +#include <map> +#include <set> +#include <mutex> +#include <cstdint> +#include <condition_variable> namespace org { namespace apache { @@ -63,8 +69,8 @@ class PublishKafka : public core::Processor { explicit PublishKafka(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), connection_pool_(5), - logger_(logging::LoggerFactory<PublishKafka>::getLogger()) { - max_seg_size_ = -1; + logger_(logging::LoggerFactory<PublishKafka>::getLogger()), + interrupted_(false) { } // Destructor virtual ~PublishKafka() { @@ -77,8 +83,10 @@ class PublishKafka : public core::Processor { static core::Property DeliveryGuarantee; static core::Property MaxMessageSize; static core::Property RequestTimeOut; + static core::Property MessageTimeOut; static core::Property ClientName; static core::Property BatchSize; + static core::Property TargetBatchPayloadSize; static core::Property AttributeNameRegex; static core::Property QueueBufferMaxTime; static core::Property QueueBufferMaxSize; @@ -100,29 +108,124 @@ class PublishKafka : public core::Processor { static core::Relationship Failure; static core::Relationship Success; + // Message + enum class MessageStatus : uint8_t { + MESSAGESTATUS_UNCOMPLETE, + MESSAGESTATUS_ERROR, + MESSAGESTATUS_SUCCESS + }; + + struct MessageResult { + MessageStatus status; + rd_kafka_resp_err_t err_code; + + MessageResult() + : status(MessageStatus::MESSAGESTATUS_UNCOMPLETE) { + } + }; + struct FlowFileResult { + bool flow_file_error; + std::vector<MessageResult> messages; + + FlowFileResult() + : flow_file_error(false) { + } + }; + struct Messages { + std::mutex mutex; + std::condition_variable cv; + std::vector<FlowFileResult> flow_files; + bool interrupted; + + Messages() + : interrupted(false) { + } + + void waitForCompletion() { + std::unique_lock<std::mutex> lock(mutex); + cv.wait(lock, [this]() -> bool { + if (interrupted) { + return true; + } + size_t index = 0U; + return std::all_of(this->flow_files.begin(), this->flow_files.end(), [&](const FlowFileResult& flow_file) { + index++; + if (flow_file.flow_file_error) { + return true; + } + return std::all_of(flow_file.messages.begin(), flow_file.messages.end(), [](const MessageResult& message) { + return message.status != MessageStatus::MESSAGESTATUS_UNCOMPLETE; + }); + }); + }); + } + + void modifyResult(size_t index, const std::function<void(FlowFileResult&)>& fun) { + std::unique_lock<std::mutex> lock(mutex); + fun(flow_files.at(index)); + cv.notify_all(); + } + + size_t addFlowFile() { + std::lock_guard<std::mutex> lock(mutex); + flow_files.emplace_back(); + return flow_files.size() - 1; + } + + void iterateFlowFiles(const std::function<void(size_t /*index*/, const FlowFileResult& /*flow_file_result*/)>& fun) { + std::lock_guard<std::mutex> lock(mutex); + for (size_t index = 0U; index < flow_files.size(); index++) { + fun(index, flow_files[index]); + } + } + + void interrupt() { + std::unique_lock<std::mutex> lock(mutex); + interrupted = true; + cv.notify_all(); + } + + bool wasInterrupted() { + std::lock_guard<std::mutex> lock(mutex); + return interrupted; + } + }; + // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: - ReadCallback(uint64_t max_seg_size, const std::string &key, rd_kafka_topic_t *rkt, rd_kafka_t *rk, const std::shared_ptr<core::FlowFile> flowFile, const std::regex &attributeNameRegex) + ReadCallback(uint64_t max_seg_size, + const std::string &key, + rd_kafka_topic_t *rkt, + rd_kafka_t *rk, + const std::shared_ptr<core::FlowFile> flowFile, + utils::Regex &attributeNameRegex, + std::shared_ptr<Messages> messages, + size_t flow_file_index) : max_seg_size_(max_seg_size), key_(key), rkt_(rkt), rk_(rk), flowFile_(flowFile), + messages_(std::move(messages)), + flow_file_index_(flow_file_index), attributeNameRegex_(attributeNameRegex) { flow_size_ = flowFile_->getSize(); status_ = 0; read_size_ = 0; hdrs = nullptr; } + ~ReadCallback() { if (hdrs) { rd_kafka_headers_destroy(hdrs); } } + int64_t process(std::shared_ptr<io::BaseStream> stream) { - if (flow_size_ < max_seg_size_) + if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) { max_seg_size_ = flow_size_; + } std::vector<unsigned char> buffer; buffer.reserve(max_seg_size_); read_size_ = 0; @@ -130,7 +233,7 @@ class PublishKafka : public core::Processor { rd_kafka_resp_err_t err; for (auto kv : flowFile_->getAttributes()) { - if (regex_match(kv.first, attributeNameRegex_)) { + if(attributeNameRegex_.match(kv.first)) { if (!hdrs) { hdrs = rd_kafka_headers_new(8); } @@ -138,36 +241,60 @@ class PublishKafka : public core::Processor { } } + size_t segment_num = 0U; while (read_size_ < flow_size_) { int readRet = stream->read(&buffer[0], max_seg_size_); if (readRet < 0) { status_ = -1; + error_ = "Failed to read from stream"; return read_size_; } if (readRet > 0) { + messages_->modifyResult(flow_file_index_, [](FlowFileResult& flow_file) { + flow_file.messages.resize(flow_file.messages.size() + 1); + }); + auto messages_copy = this->messages_; + auto flow_file_index_copy = this->flow_file_index_; + auto callback = std::unique_ptr<std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>>( + new std::function<void(rd_kafka_t*, const rd_kafka_message_t*)>( + [messages_copy, flow_file_index_copy, segment_num](rd_kafka_t* /*rk*/, const rd_kafka_message_t* rkmessage) { + messages_copy->modifyResult(flow_file_index_copy, [segment_num, rkmessage](FlowFileResult& flow_file) { + auto& message = flow_file.messages.at(segment_num); + message.err_code = rkmessage->err; + message.status = message.err_code == 0 ? MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR; + }); + })); if (hdrs) { rd_kafka_headers_t *hdrs_copy; hdrs_copy = rd_kafka_headers_copy(hdrs); err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), - RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END); + RD_KAFKA_V_HEADERS(hdrs_copy), RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END); if (err) { rd_kafka_headers_destroy(hdrs_copy); } } else { err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(&buffer[0], readRet), - RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_END); + RD_KAFKA_V_KEY(key_.c_str(), key_.size()), RD_KAFKA_V_OPAQUE(callback.release()), RD_KAFKA_V_END); } if (err) { + messages_->modifyResult(flow_file_index_, [segment_num, err](FlowFileResult& flow_file) { + auto& message = flow_file.messages.at(segment_num); + message.status = MessageStatus::MESSAGESTATUS_ERROR; + message.err_code = err; + }); status_ = -1; + error_ = rd_kafka_err2str(err); return read_size_; } read_size_ += readRet; } else { break; } + segment_num++; } return read_size_; } + uint64_t flow_size_; uint64_t max_seg_size_; std::string key_; @@ -175,14 +302,17 @@ class PublishKafka : public core::Processor { rd_kafka_t *rk_; rd_kafka_headers_t *hdrs; std::shared_ptr<core::FlowFile> flowFile_; + std::shared_ptr<Messages> messages_; + size_t flow_file_index_; int status_; + std::string error_; int read_size_; - std::regex attributeNameRegex_; + utils::Regex& attributeNameRegex_; }; public: - virtual bool supportsDynamicProperties() { + virtual bool supportsDynamicProperties() override { return true; } @@ -195,23 +325,23 @@ class PublishKafka : public core::Processor { virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; virtual void initialize() override; virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override; + virtual void notifyStop() override; protected: - bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::FlowFile> &ff); + bool configureNewConnection(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context); + bool createNewTopic(const std::shared_ptr<KafkaConnection> &conn, const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name); private: + static void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque); + std::shared_ptr<logging::Logger> logger_; KafkaPool connection_pool_; -// rd_kafka_conf_t *conf_; - //rd_kafka_t *rk_; - //1rd_kafka_topic_conf_t *topic_conf_; - //rd_kafka_topic_t *rkt_; - //std::string topic_; - uint64_t max_seg_size_; - std::regex attributeNameRegex; + std::atomic<bool> interrupted_; + std::mutex messages_mutex_; + std::set<std::shared_ptr<Messages>> messages_set_; }; REGISTER_RESOURCE(PublishKafka, "This Processor puts the contents of a FlowFile to a Topic in Apache Kafka. The content of a FlowFile becomes the contents of a Kafka message. " diff --git a/thirdparty/librdkafka/librdkafka-libressl.patch b/thirdparty/librdkafka/librdkafka-libressl.patch new file mode 100644 index 0000000..c99a9d9 --- /dev/null +++ b/thirdparty/librdkafka/librdkafka-libressl.patch @@ -0,0 +1,148 @@ +diff --git a/src/rdkafka_transport.c b/src/rdkafka_transport.c +index 9c7fc36c..de083109 100644 +--- a/src/rdkafka_transport.c ++++ b/src/rdkafka_transport.c +@@ -3,24 +3,24 @@ + * + * Copyright (c) 2015, Magnus Edenhill + * All rights reserved. +- * ++ * + * Redistribution and use in source and binary forms, with or without +- * modification, are permitted provided that the following conditions are met: +- * ++ * modification, are permitted provided that the following conditions are met: ++ * + * 1. Redistributions of source code must retain the above copyright notice, +- * this list of conditions and the following disclaimer. ++ * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation +- * and/or other materials provided with the distribution. +- * ++ * and/or other materials provided with the distribution. ++ * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN ++ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE ++ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ++ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE ++ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ++ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF ++ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ++ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. +@@ -433,7 +433,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb, + else + rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr); + } +- ++ + ERR_error_string_n(l, buf, sizeof(buf)); + + rd_snprintf(errstr, errstr_size, "%s:%d: %s: %s", +@@ -443,7 +443,7 @@ static char *rd_kafka_ssl_error (rd_kafka_t *rk, rd_kafka_broker_t *rkb, + + if (cnt == 0) + rd_snprintf(errstr, errstr_size, "No error"); +- ++ + return errstr; + } + +@@ -507,7 +507,7 @@ void rd_kafka_transport_ssl_term (void) { + void rd_kafka_transport_ssl_init (void) { + #if OPENSSL_VERSION_NUMBER < 0x10100000L + int i; +- ++ + if (!CRYPTO_get_locking_callback()) { + rd_kafka_ssl_locks_cnt = CRYPTO_num_locks(); + rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt * +@@ -736,11 +736,11 @@ static int rd_kafka_transport_ssl_connect (rd_kafka_broker_t *rkb, + return 0; + } + +- ++ + if (rd_kafka_transport_ssl_io_update(rktrans, r, + errstr, errstr_size) == -1) + return -1; +- ++ + return 0; + + fail: +@@ -863,17 +863,6 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk, + int r; + SSL_CTX *ctx; + +-#if OPENSSL_VERSION_NUMBER >= 0x10100000 +- rd_kafka_dbg(rk, SECURITY, "OPENSSL", "Using OpenSSL version %s " +- "(0x%lx, librdkafka built with 0x%lx)", +- OpenSSL_version(OPENSSL_VERSION), +- OpenSSL_version_num(), +- OPENSSL_VERSION_NUMBER); +-#else +- rd_kafka_dbg(rk, SECURITY, "OPENSSL", "librdkafka built with OpenSSL " +- "version 0x%lx", OPENSSL_VERSION_NUMBER); +-#endif +- + if (errstr_size > 0) + errstr[0] = '\0'; + +@@ -945,7 +934,7 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk, + "Loading CA certificate(s) from %s %s", + is_dir ? "directory":"file", + rk->rk_conf.ssl.ca_location); +- ++ + r = SSL_CTX_load_verify_locations(ctx, + !is_dir ? + rk->rk_conf.ssl. +@@ -1034,8 +1023,8 @@ int rd_kafka_transport_ssl_ctx_init (rd_kafka_t *rk, + + if (!(fp = fopen(rk->rk_conf.ssl.keystore_location, "rb"))) { + rd_snprintf(errstr, errstr_size, +- "Failed to open ssl.keystore.location: %s: %s", +- rk->rk_conf.ssl.keystore_location, ++ "Failed to open ssl.keystore.location: %s: %s", ++ rk->rk_conf.ssl.keystore_location, + rd_strerror(errno)); + goto fail; + } +@@ -1495,7 +1484,7 @@ static void rd_kafka_transport_io_event (rd_kafka_transport_t *rktrans, + /** + * Poll and serve IOs + * +- * Locality: broker thread ++ * Locality: broker thread + */ + void rd_kafka_transport_io_serve (rd_kafka_transport_t *rktrans, + int timeout_ms) { +@@ -1545,7 +1534,7 @@ rd_kafka_transport_t *rd_kafka_transport_connect (rd_kafka_broker_t *rkb, + + #ifdef SO_NOSIGPIPE + /* Disable SIGPIPE signalling for this socket on OSX */ +- if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) ++ if (setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on)) == -1) + rd_rkb_dbg(rkb, BROKER, "SOCKET", + "Failed to set SO_NOSIGPIPE: %s", + socket_strerror(socket_errno)); +@@ -1709,7 +1698,7 @@ void rd_kafka_transport_term (void) { + #endif + } + #endif +- ++ + void rd_kafka_transport_init(void) { + #ifdef _MSC_VER + WSADATA d;