lordgamez commented on code in PR #2094:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2094#discussion_r2799238436
##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
}
inline bool isFinished(int seconds = 0) {
- return http_client_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
+ return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0
&& getByteOutputReadCallback()->waitingOps();
}
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/2094/commits/532063b0141955af8e2bf52f29253a9a56082697
##########
core-framework/include/http/HTTPStream.h:
##########
@@ -114,19 +102,16 @@ class HttpStream : public io::BaseStreamImpl {
}
inline bool isFinished(int seconds = 0) {
- return http_client_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
+ return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) ==
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0
&& getByteOutputReadCallback()->waitingOps();
}
- /**
- * Waits for more data to become available.
- */
bool waitForDataAvailable() {
do {
logger_->log_trace("Waiting for more data");
- } while (http_client_future_.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready
+ } while (http_client_read_future_.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready
&& getByteOutputReadCallback()
&& getByteOutputReadCallback()->getSize() == 0);
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/2094/commits/532063b0141955af8e2bf52f29253a9a56082697
##########
libminifi/src/sitetosite/HttpSiteToSiteClient.cpp:
##########
@@ -388,4 +385,33 @@ void
HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
}
}
+std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const
std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
+ auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
+ if (!http_stream) {
+ throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream
cannot be cast to HTTP stream");
+ }
+
+ try {
+ return SiteToSiteClient::readFlowFiles(transaction, session);
+ } catch (const Exception&) {
+ auto response_code = http_stream->getClientRef()->getResponseCode();
+
+ // 200 tells us that there is no content to read, so we should not treat
it as an error.
+ // The read fails in this case because the stream does not contain a valid
response body with the expected format.
+ // Unfortunately there is no way to get the response code before trying to
read, so we have to let it fail and check the response code afterwards.
+ if (response_code == 200) {
+ logger_->log_debug("Response code 200, no content to read");
+ transaction->setDataAvailable(false);
+ transaction->setState(TransactionState::TRANSACTION_CANCELED);
+ current_code_ = ResponseCode::CANCEL_TRANSACTION;
+ return {0, 0};
+ }
+ throw;
+ }
+
+ if (auto response_code = http_stream->getClientRef()->getResponseCode();
response_code >= 400) {
+ throw Exception(SITE2SITE_EXCEPTION, fmt::format("HTTP error code received
while reading flow files: {}", response_code));
+ }
+}
Review Comment:
Updated in
https://github.com/apache/nifi-minifi-cpp/pull/2094/commits/532063b0141955af8e2bf52f29253a9a56082697
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]