szaszm commented on a change in pull request #712: MINIFICPP-1047 Add property 
"Drop empty flow files" to PublishKafka
URL: https://github.com/apache/nifi-minifi-cpp/pull/712#discussion_r369472796
 
 

 ##########
 File path: extensions/librdkafka/PublishKafka.h
 ##########
 @@ -193,121 +186,146 @@ class PublishKafka : public core::Processor {
 
   // Nest Callback Class for read stream
   class ReadCallback : public InputStreamCallback {
+    void allocate_message_object(const size_t segment_num) const {
+      messages_->modifyResult(flow_file_index_, [segment_num](FlowFileResult& 
flow_file) {
+        // allocate message object to be filled in by the callback in produce()
+        if (flow_file.messages.size() < segment_num + 1) {
+          flow_file.messages.resize(segment_num + 1);
+        }
+      });
+    }
+
+    utils::owner<rd_kafka_headers_s*> make_headers() const {
+      rd_kafka_headers_s* const result = rd_kafka_headers_new(8);
+      for (const auto& kv : flowFile_->getAttributes()) {
+        if(attributeNameRegex_.match(kv.first)) {
+          rd_kafka_header_add(result, kv.first.c_str(), kv.first.size(), 
kv.second.c_str(), kv.second.size());
+        }
+      }
+      return result;
+    }
+
+    rd_kafka_resp_err_t produce(const size_t segment_num, std::vector<unsigned 
char>& buffer, const size_t buflen) const {
+      const auto messages_copy = this->messages_;
+      const auto flow_file_index_copy = this->flow_file_index_;
+      const auto produce_callback = [messages_copy, flow_file_index_copy, 
segment_num](rd_kafka_t * /*rk*/, const rd_kafka_message_t *rkmessage) {
+        messages_copy->modifyResult(flow_file_index_copy, [segment_num, 
rkmessage](FlowFileResult &flow_file) {
+          auto &message = flow_file.messages.at(segment_num);
+          message.err_code = rkmessage->err;
+          message.status = message.err_code == 0 ? 
MessageStatus::MESSAGESTATUS_SUCCESS : MessageStatus::MESSAGESTATUS_ERROR;
+        });
+      };
+      // release()d below, deallocated in PublishKafka::messageDeliveryCallback
+      auto callback_ptr = utils::make_unique<std::function<void(rd_kafka_t*, 
const rd_kafka_message_t*)>>(std::move(produce_callback));
+
+      allocate_message_object(segment_num);
+
+      if (hdrs) {
+        const utils::owner<rd_kafka_headers_t*> hdrs_copy = 
rd_kafka_headers_copy(hdrs);
+        const auto err = rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                           RD_KAFKA_V_HEADERS(hdrs_copy), 
RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+        if (err) {
+          // the message only takes ownership of the headers in case of success
+          rd_kafka_headers_destroy(hdrs_copy);
+        }
+        return err;
+      } else {
+        return rd_kafka_producev(rk_, RD_KAFKA_V_RKT(rkt_), 
RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), 
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), RD_KAFKA_V_VALUE(buffer.data(), 
buflen),
+                                RD_KAFKA_V_KEY(key_.c_str(), key_.size()), 
RD_KAFKA_V_OPAQUE(callback_ptr.release()), RD_KAFKA_V_END);
+      }
+    }
+
    public:
-    ReadCallback(uint64_t max_seg_size,
-                 const std::string &key,
-                 rd_kafka_topic_t *rkt,
-                 rd_kafka_t *rk,
-                 const std::shared_ptr<core::FlowFile> flowFile,
+    ReadCallback(const uint64_t max_seg_size,
+                 std::string key,
+                 rd_kafka_topic_t * const rkt,
+                 rd_kafka_t * const rk,
+                 std::shared_ptr<core::FlowFile> flowFile,
                  utils::Regex &attributeNameRegex,
                  std::shared_ptr<Messages> messages,
-                 size_t flow_file_index)
-        : max_seg_size_(max_seg_size),
-          key_(key),
+                 const size_t flow_file_index,
+                 const bool fail_empty_flow_files)
+        : flowFile_(std::move(flowFile)),
+          flow_size_(flowFile_->getSize()),
+          max_seg_size_(max_seg_size == 0 || flow_size_ < max_seg_size ? 
flow_size_ : max_seg_size),
+          key_(std::move(key)),
           rkt_(rkt),
           rk_(rk),
-          flowFile_(flowFile),
+          hdrs(make_headers()),
           messages_(std::move(messages)),
           flow_file_index_(flow_file_index),
-          attributeNameRegex_(attributeNameRegex) {
-      flow_size_ = flowFile_->getSize();
-      status_ = 0;
-      read_size_ = 0;
-      hdrs = nullptr;
-    }
+          status_(0),
+          read_size_(0),
+          attributeNameRegex_(attributeNameRegex),
+          fail_empty_flow_files_(fail_empty_flow_files)
+    { }
 
     ~ReadCallback() {
-      if (hdrs) {
-        rd_kafka_headers_destroy(hdrs);
-      }
+      rd_kafka_headers_destroy(hdrs);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      if (max_seg_size_ == 0U || flow_size_ < max_seg_size_) {
-        max_seg_size_ = flow_size_;
-      }
+    int64_t process(const std::shared_ptr<io::BaseStream> stream) {
       std::vector<unsigned char> buffer;
-      buffer.reserve(max_seg_size_);
+
+      buffer.resize(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
-      rd_kafka_resp_err_t err;
+      called_ = true;
 
-      for (auto kv : flowFile_->getAttributes()) {
-        if(attributeNameRegex_.match(kv.first)) {
-          if (!hdrs) {
-            hdrs = rd_kafka_headers_new(8);
-          }
-          err = rd_kafka_header_add(hdrs, kv.first.c_str(), kv.first.size(), 
kv.second.c_str(), kv.second.size());
-        }
+      assert(max_seg_size_ != 0 || flow_size_ == 0 && "max_seg_size_ == 0 
implies flow_size_ == 0");
+      // ^^ therefore checking max_seg_size_ == 0 handles both division by 
zero and flow_size_ == 0 cases
+      const size_t reserved_msg_capacity = max_seg_size_ == 0 ? 1 : 
utils::intdiv_ceil(flow_size_, max_seg_size_);
+      messages_->modifyResult(flow_file_index_, 
[reserved_msg_capacity](FlowFileResult& flow_file) {
+        flow_file.messages.reserve(reserved_msg_capacity);
+      });
+
+      // If the flow file is empty, we still want to send the message, unless 
the user wants to fail_empty_flow_files_
 
 Review comment:
   see these 3 lines for proof: 
https://github.com/apache/nifi-minifi-cpp/pull/712/files#diff-9917deef91cc1503aff82b472310b7eaL662

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to