bakaid commented on a change in pull request #712: MINIFICPP-1047 Add property "Drop empty flow files" to PublishKafka URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r376957040
########## File path: extensions/librdkafka/PublishKafka.h ########## @@ -194,120 +187,142 @@ class PublishKafka : public core::Processor { // Nest Callback Class for read stream class ReadCallback : public InputStreamCallback { public: - ReadCallback(uint64_t max_seg_size, - const std::string &key, - rd_kafka_topic_t *rkt, - rd_kafka_t *rk, - const std::shared_ptr<core::FlowFile> flowFile, + struct rd_kafka_headers_deleter { + void operator()(rd_kafka_headers_t* ptr) const noexcept { + rd_kafka_headers_destroy(ptr); + } + }; + + using rd_kafka_headers_unique_ptr = std::unique_ptr<rd_kafka_headers_t, rd_kafka_headers_deleter>; + + private: + void allocate_message_object(const size_t segment_num) const { + messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& flow_file) { + // allocate message object to be filled in by the callback in produce() + if (flow_file.messages.size() < segment_num + 1) { + flow_file.messages.resize(segment_num + 1); + } + }); + } + + static rd_kafka_headers_unique_ptr make_headers(const core::FlowFile& flow_file, utils::Regex& attribute_name_regex) { + const utils::owner<rd_kafka_headers_t*> result{ rd_kafka_headers_new(8) }; + if (!result) { throw std::bad_alloc{}; } + + for (const auto& kv : flow_file.getAttributes()) { Review comment: Compilation fails with ``` /Users/danielbakai/nifi-minifi-cpp/extensions/librdkafka/PublishKafka.h:212:29: error: 'this' argument to member function 'getAttributes' has type 'const core::FlowFile', but function is not marked const for (const auto& kv : flow_file.getAttributes()) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services