Copilot commented on code in PR #2094:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2094#discussion_r2798802419
##########
core-framework/src/http/HTTPStream.cpp:
##########
@@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size)
{
size_t HttpStream::read(std::span<std::byte> buf) {
if (buf.empty()) { return 0; }
if (!IsNullOrEmpty(buf)) {
- if (!started_) {
- std::lock_guard<std::mutex> lock(mutex_);
- if (!started_) {
- auto read_callback =
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
- http_client_future_ = std::async(std::launch::async,
submit_read_client, http_client_, read_callback.get());
- http_client_->setReadCallback(std::move(read_callback));
- started_ = true;
- }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!read_started_) {
+ auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560,
true);
+ http_client_read_future_ = std::async(std::launch::async,
submit_read_client, http_client_, read_callback.get());
+ http_client_->setReadCallback(std::move(read_callback));
+ read_started_ = true;
}
return
gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()),
buf.size());
Review Comment:
HttpStream::read() holds mutex_ while calling
ByteOutputCallback::readFully(), which can block waiting for data/close. This
prevents forceClose() (and the destructor) from acquiring the same mutex to
close the callback and can deadlock shutdown/cleanup. Only hold mutex_ for
one-time initialization of the read callback/future, then release it before the
potentially-blocking readFully call (or use call_once/atomic state to guard
initialization).
```suggestion
std::shared_ptr<ByteOutputCallback> read_callback;
{
std::lock_guard<std::mutex> lock(mutex_);
if (!read_started_) {
auto new_read_callback =
std::make_unique<HTTPReadByteOutputCallback>(66560, true);
http_client_read_future_ = std::async(std::launch::async,
submit_read_client, http_client_, new_read_callback.get());
http_client_->setReadCallback(std::move(new_read_callback));
read_started_ = true;
}
read_callback = getByteOutputReadCallback();
}
return
gsl::not_null(read_callback)->readFully(reinterpret_cast<char*>(buf.data()),
buf.size());
```
##########
core-framework/src/http/HTTPStream.cpp:
##########
@@ -40,30 +40,28 @@ void HttpStream::close() {
}
void HttpStream::seek(size_t /*offset*/) {
- // seek is an unnecessary part of this implementation
throw std::logic_error{"HttpStream::seek is unimplemented"};
}
size_t HttpStream::tell() const {
- // tell is an unnecessary part of this implementation
throw std::logic_error{"HttpStream::tell is unimplemented"};
}
-// data stream overrides
+[[nodiscard]] size_t HttpStream::size() const {
+ throw std::logic_error{"HttpStream::size is unimplemented"};
+}
size_t HttpStream::write(const uint8_t* value, size_t size) {
if (size == 0) return 0;
if (IsNullOrEmpty(value)) {
return io::STREAM_ERROR;
}
- if (!started_) {
- std::lock_guard<std::mutex> lock(mutex_);
- if (!started_) {
- auto callback = std::make_unique<HttpStreamingCallback>();
- http_client_->setUploadCallback(std::move(callback));
- http_client_future_ = std::async(std::launch::async, submit_client,
http_client_);
- started_ = true;
- }
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (!write_started_) {
+ auto callback = std::make_unique<HttpStreamingCallback>();
+ http_client_->setUploadCallback(std::move(callback));
+ http_client_write_future_ = std::async(std::launch::async, submit_client,
http_client_);
+ write_started_ = true;
}
if (auto http_callback =
dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback()))
http_callback->process(value, size);
Review Comment:
HttpStream::write() holds mutex_ while invoking
HttpStreamingCallback::process(). Keeping the stream mutex held during callback
processing unnecessarily serializes writers and can block forceClose() longer
than needed. Consider limiting the mutex scope to just initializing the upload
callback/future and releasing it before calling into the callback.
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -388,4 +385,33 @@ void
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
}
}
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+ auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+ if (!http_stream) {
+ throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream
cannot be cast to HTTP stream");
+ }
+
+ try {
+ return SiteToSiteClient::readFlowFiles(transaction, session);
+ } catch (const Exception&) {
+ auto response_code = http_stream->getClientRef()->getResponseCode();
+
+ // 200 tells us that there is no content to read, so we should not treat
it as an error.
+ // The read fails in this case because the stream does not contain a valid
response body with the expected format.
+ // Unfortunately there is no way to get the response code before trying to
read, so we have to let it fail and check the response code afterwards.
+ if (response_code == 200) {
+ logger_->log_debug("Response code 200, no content to read");
+ transaction->setDataAvailable(false);
+ transaction->setState(TransactionState::TRANSACTION_CANCELED);
+ current_code_ = ResponseCode::CANCEL_TRANSACTION;
+ return {0, 0};
+ }
+ throw;
+ }
+
+ if (auto response_code = http_stream->getClientRef()->getResponseCode();
response_code >= 400) {
+ throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received
while reading flow files: {}", response_code));
+ }
+}
Review Comment:
This response-code check is unreachable because both the try block and the
catch block either return or rethrow. As written, it’s dead code and can also
trigger “control may reach end of non-void function”/unreachable-code warnings
depending on compiler settings. If you intended to validate HTTP status after a
successful read, store the base readFlowFiles() result, then check the response
code, then return the stored result; otherwise remove this block.
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -388,4 +385,33 @@ void
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
}
}
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+ auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+ if (!http_stream) {
+ throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream
cannot be cast to HTTP stream");
+ }
+
+ try {
+ return SiteToSiteClient::readFlowFiles(transaction, session);
+ } catch (const Exception&) {
+ auto response_code = http_stream->getClientRef()->getResponseCode();
+
+ // 200 tells us that there is no content to read, so we should not treat
it as an error.
+ // The read fails in this case because the stream does not contain a valid
response body with the expected format.
+ // Unfortunately there is no way to get the response code before trying to
read, so we have to let it fail and check the response code afterwards.
+ if (response_code == 200) {
+ logger_->log_debug("Response code 200, no content to read");
+ transaction->setDataAvailable(false);
+ transaction->setState(TransactionState::TRANSACTION_CANCELED);
+ current_code_ = ResponseCode::CANCEL_TRANSACTION;
+ return {0, 0};
+ }
Review Comment:
The new 200-response special-case is behaviorally significant (treating an
exception from parsing the body as “no content”). There are existing
HTTPSiteToSite integration tests, but none appear to cover the “GET /flow-files
returns 200 with empty/invalid Site-to-Site body” scenario. Please add an
integration/unit test that exercises this path to prevent regressions.
##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
}
inline bool isFinished(int seconds = 0) {
- return http_client_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
+ return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0
&& getByteOutputReadCallback()->waitingOps();
}
- /**
- * Waits for more data to become available.
- */
bool waitForDataAvailable() {
do {
logger_->log_trace("Waiting for more data");
- } while (http_client_future_.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready
+ } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0);
Review Comment:
HttpStream::waitForDataAvailable() also calls wait_for() on
http_client_read_future_ without checking valid(), which can throw if no read()
has been started yet. Add a valid()/read_started_ guard and define what should
happen before the read is initiated (likely return false).
##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
}
inline bool isFinished(int seconds = 0) {
- return http_client_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
+ return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0
&& getByteOutputReadCallback()->waitingOps();
}
Review Comment:
HttpStream::isFinished() calls wait_for() on http_client_read_future_
without checking valid(). If isFinished() is called before any read() starts,
std::future::wait_for will throw std::future_error. Guard with
http_client_read_future_.valid() (and likely return false when not started).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]