Copilot commented on code in PR #2094:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2094#discussion_r2798802419


##########
core-framework/src/http/HTTPStream.cpp:
##########
@@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) 
{
 size_t HttpStream::read(std::span<std::byte> buf) {
   if (buf.empty()) { return 0; }
   if (!IsNullOrEmpty(buf)) {
-    if (!started_) {
-      std::lock_guard<std::mutex> lock(mutex_);
-      if (!started_) {
-        auto read_callback = 
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
-        http_client_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, read_callback.get());
-        http_client_->setReadCallback(std::move(read_callback));
-        started_ = true;
-      }
+    std::lock_guard<std::mutex> lock(mutex_);
+    if (!read_started_) {
+      auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, 
true);
+      http_client_read_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, read_callback.get());
+      http_client_->setReadCallback(std::move(read_callback));
+      read_started_ = true;
     }
     return 
gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()),
 buf.size());

Review Comment:
   HttpStream::read() holds mutex_ while calling 
ByteOutputCallback::readFully(), which can block waiting for data/close. This 
prevents forceClose() (and the destructor) from acquiring the same mutex to 
close the callback and can deadlock shutdown/cleanup. Only hold mutex_ for 
one-time initialization of the read callback/future, then release it before the 
potentially-blocking readFully call (or use call_once/atomic state to guard 
initialization).
   ```suggestion
       std::shared_ptr<ByteOutputCallback> read_callback;
       {
         std::lock_guard<std::mutex> lock(mutex_);
         if (!read_started_) {
           auto new_read_callback = 
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
           http_client_read_future_ = std::async(std::launch::async, 
submit_read_client, http_client_, new_read_callback.get());
           http_client_->setReadCallback(std::move(new_read_callback));
           read_started_ = true;
         }
         read_callback = getByteOutputReadCallback();
       }
       return 
gsl::not_null(read_callback)->readFully(reinterpret_cast<char*>(buf.data()), 
buf.size());
   ```



##########
core-framework/src/http/HTTPStream.cpp:
##########
@@ -40,30 +40,28 @@ void HttpStream::close() {
 }
 
 void HttpStream::seek(size_t /*offset*/) {
-  // seek is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
 
 size_t HttpStream::tell() const {
-  // tell is an unnecessary part of this implementation
   throw std::logic_error{"HttpStream::tell is unimplemented"};
 }
 
-// data stream overrides
+[[nodiscard]] size_t HttpStream::size() const {
+  throw std::logic_error{"HttpStream::size is unimplemented"};
+}
 
 size_t HttpStream::write(const uint8_t* value, size_t size) {
   if (size == 0) return 0;
   if (IsNullOrEmpty(value)) {
     return io::STREAM_ERROR;
   }
-  if (!started_) {
-    std::lock_guard<std::mutex> lock(mutex_);
-    if (!started_) {
-      auto callback = std::make_unique<HttpStreamingCallback>();
-      http_client_->setUploadCallback(std::move(callback));
-      http_client_future_ = std::async(std::launch::async, submit_client, 
http_client_);
-      started_ = true;
-    }
+  std::lock_guard<std::mutex> lock(mutex_);
+  if (!write_started_) {
+    auto callback = std::make_unique<HttpStreamingCallback>();
+    http_client_->setUploadCallback(std::move(callback));
+    http_client_write_future_ = std::async(std::launch::async, submit_client, 
http_client_);
+    write_started_ = true;
   }
   if (auto http_callback = 
dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback()))
     http_callback->process(value, size);

Review Comment:
   HttpStream::write() holds mutex_ while invoking 
HttpStreamingCallback::process(). Keeping the stream mutex held during callback 
processing unnecessarily serializes writers and can block forceClose() longer 
than needed. Consider limiting the mutex scope to just initializing the upload 
callback/future and releasing it before calling into the callback.



##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -388,4 +385,33 @@ void 
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
   }
 }
 
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+  auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+  if (!http_stream) {
+    throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream 
cannot be cast to HTTP stream");
+  }
+
+  try {
+    return SiteToSiteClient::readFlowFiles(transaction, session);
+  } catch (const Exception&) {
+    auto response_code = http_stream->getClientRef()->getResponseCode();
+
+    // 200 tells us that there is no content to read, so we should not treat 
it as an error.
+    // The read fails in this case because the stream does not contain a valid 
response body with the expected format.
+    // Unfortunately there is no way to get the response code before trying to 
read, so we have to let it fail and check the response code afterwards.
+    if (response_code == 200) {
+      logger_->log_debug("Response code 200, no content to read");
+      transaction->setDataAvailable(false);
+      transaction->setState(TransactionState::TRANSACTION_CANCELED);
+      current_code_ = ResponseCode::CANCEL_TRANSACTION;
+      return {0, 0};
+    }
+    throw;
+  }
+
+  if (auto response_code = http_stream->getClientRef()->getResponseCode(); 
response_code >= 400) {
+    throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received 
while reading flow files: {}", response_code));
+  }
+}

Review Comment:
   This response-code check is unreachable because both the try block and the 
catch block either return or rethrow. As written, it’s dead code and can also 
trigger “control may reach end of non-void function”/unreachable-code warnings 
depending on compiler settings. If you intended to validate HTTP status after a 
successful read, store the base readFlowFiles() result, then check the response 
code, then return the stored result; otherwise remove this block.



##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -388,4 +385,33 @@ void 
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
   }
 }
 
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const 
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+  auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+  if (!http_stream) {
+    throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream 
cannot be cast to HTTP stream");
+  }
+
+  try {
+    return SiteToSiteClient::readFlowFiles(transaction, session);
+  } catch (const Exception&) {
+    auto response_code = http_stream->getClientRef()->getResponseCode();
+
+    // 200 tells us that there is no content to read, so we should not treat 
it as an error.
+    // The read fails in this case because the stream does not contain a valid 
response body with the expected format.
+    // Unfortunately there is no way to get the response code before trying to 
read, so we have to let it fail and check the response code afterwards.
+    if (response_code == 200) {
+      logger_->log_debug("Response code 200, no content to read");
+      transaction->setDataAvailable(false);
+      transaction->setState(TransactionState::TRANSACTION_CANCELED);
+      current_code_ = ResponseCode::CANCEL_TRANSACTION;
+      return {0, 0};
+    }

Review Comment:
   The new 200-response special-case is behaviorally significant (treating an 
exception from parsing the body as “no content”). There are existing 
HTTPSiteToSite integration tests, but none appear to cover the “GET /flow-files 
returns 200 with empty/invalid Site-to-Site body” scenario. Please add an 
integration/unit test that exercises this path to prevent regressions.



##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
   }
 
   inline bool isFinished(int seconds = 0) {
-    return http_client_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
+    return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
         && getByteOutputReadCallback()
         && getByteOutputReadCallback()->getSize() == 0
         && getByteOutputReadCallback()->waitingOps();
   }
 
-  /**
-   * Waits for more data to become available.
-   */
   bool waitForDataAvailable() {
     do {
       logger_->log_trace("Waiting for more data");
-    } while (http_client_future_.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready
+    } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != 
std::future_status::ready
         && getByteOutputReadCallback()
         && getByteOutputReadCallback()->getSize() == 0);
 

Review Comment:
   HttpStream::waitForDataAvailable() also calls wait_for() on 
http_client_read_future_ without checking valid(), which can throw if no read() 
has been started yet. Add a valid()/read_started_ guard and define what should 
happen before the read is initiated (likely return false).



##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
   }
 
   inline bool isFinished(int seconds = 0) {
-    return http_client_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
+    return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == 
std::future_status::ready
         && getByteOutputReadCallback()
         && getByteOutputReadCallback()->getSize() == 0
         && getByteOutputReadCallback()->waitingOps();
   }

Review Comment:
   HttpStream::isFinished() calls wait_for() on http_client_read_future_ 
without checking valid(). If isFinished() is called before any read() starts, 
std::future::wait_for will throw std::future_error. Guard with 
http_client_read_future_.valid() (and likely return false when not started).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to