szaszm commented on code in PR #2153: URL: https://github.com/apache/nifi-minifi-cpp/pull/2153#discussion_r3365891531
########## extension-framework/cpp-extension-lib/include/api/utils/Proxy.h: ########## @@ -0,0 +1,42 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <filesystem> +#include <optional> +#include <string> + +namespace org::apache::nifi::minifi::api::utils { + +enum class ProxyType { + DIRECT, + HTTP +}; + +struct BasicAuthCredentials { + std::string username; + std::string password; +}; + +struct ProxyData { + std::string host; + uint16_t port; + std::optional<BasicAuthCredentials> proxy_credentials; + ProxyType proxy_type; Review Comment: I'd move the type to be the first member just out of personal preference, because it depends on the type whether the rest of the struct members are meaningful at all. ########## extensions/gcp/processors/FetchGCSObject.cpp: ########## @@ -82,80 +80,78 @@ class FetchFromGCSCallback { }; } // namespace - -void FetchGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void FetchGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - GCSProcessor::onSchedule(context, session_factory); - if (auto encryption_key = context.getProperty(EncryptionKey)) { +MinifiStatus FetchGCSObject::onScheduleImpl(api::core::ProcessContext& context) { + const auto status = GCSProcessor::onScheduleImpl(context); + if (MINIFI_STATUS_SUCCESS != status) { + return status; + } + if (auto encryption_key = context.getProperty(EncryptionKey, nullptr)) { try { encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); } catch (const google::cloud::RuntimeStatusError&) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name)); } + logger_->log_error("Could not decode the base64-encoded encryption key from property {}", std::string(EncryptionKey.name)); + return MINIFI_STATUS_UNKNOWN_ERROR; + } } + return MINIFI_STATUS_SUCCESS; } -void FetchGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus FetchGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Client client = getClient(); FetchFromGCSCallback callback(client, *bucket, *object_name); callback.setEncryptionKey(encryption_key_); - if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) { + if (const auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) { if (const auto geni64 = parsing::parseIntegral<int64_t>(*object_generation_str)) { gcs::Generation generation = gcs::Generation{*geni64}; callback.setGeneration(generation); } else { logger_->log_error("Invalid generation: {}", *object_generation_str); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; Review Comment: ```suggestion return MINIFI_STATUS_SUCCESS; ``` ########## extensions/gcp/processors/DeleteGCSObject.cpp: ########## @@ -17,69 +17,62 @@ #include "DeleteGCSObject.h" -#include "utils/ProcessorConfigUtils.h" #include "../GCPAttributes.h" -#include "minifi-cpp/core/FlowFile.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "core/Resource.h" +#include "api/core/ProcessContext.h" +#include "api/core/ProcessSession.h" +#include "api/core/Resource.h" +#include "api/utils/ProcessorConfigUtils.h" namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -void DeleteGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} -void DeleteGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus DeleteGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Generation generation; - if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) { + if (auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) { if (const auto geni64 = parsing::parseIntegral<int64_t>(*object_generation_str)) { generation = gcs::Generation{*geni64}; } else { logger_->log_error("Invalid generation: {}", *object_generation_str); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; Review Comment: incorrect indentation ```suggestion return MINIFI_STATUS_SUCCESS; ``` ########## extensions/gcp/processors/FetchGCSObject.cpp: ########## @@ -82,80 +80,78 @@ class FetchFromGCSCallback { }; } // namespace - -void FetchGCSObject::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void FetchGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) { - GCSProcessor::onSchedule(context, session_factory); - if (auto encryption_key = context.getProperty(EncryptionKey)) { +MinifiStatus FetchGCSObject::onScheduleImpl(api::core::ProcessContext& context) { + const auto status = GCSProcessor::onScheduleImpl(context); + if (MINIFI_STATUS_SUCCESS != status) { + return status; + } + if (auto encryption_key = context.getProperty(EncryptionKey, nullptr)) { try { encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); } catch (const google::cloud::RuntimeStatusError&) { - throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name)); } + logger_->log_error("Could not decode the base64-encoded encryption key from property {}", std::string(EncryptionKey.name)); + return MINIFI_STATUS_UNKNOWN_ERROR; + } } + return MINIFI_STATUS_SUCCESS; } -void FetchGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { +MinifiStatus FetchGCSObject::onTriggerImpl(api::core::ProcessContext& context, api::core::ProcessSession& session) { gsl_Expects(gcp_credentials_); auto flow_file = session.get(); if (!flow_file) { - context.yield(); - return; + return MINIFI_STATUS_PROCESSOR_YIELD; } - auto bucket = context.getProperty(Bucket, flow_file.get()); + auto bucket = api::utils::parseOptionalProperty(context, Bucket, &flow_file); if (!bucket || bucket->empty()) { logger_->log_error("Missing bucket name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } - auto object_name = context.getProperty(Key, flow_file.get()); + auto object_name = api::utils::parseOptionalProperty(context, Key, &flow_file); if (!object_name || object_name->empty()) { logger_->log_error("Missing object name"); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } gcs::Client client = getClient(); FetchFromGCSCallback callback(client, *bucket, *object_name); callback.setEncryptionKey(encryption_key_); - if (const auto object_generation_str = context.getProperty(ObjectGeneration, flow_file.get()); object_generation_str && !object_generation_str->empty()) { + if (const auto object_generation_str = api::utils::parseOptionalProperty(context, ObjectGeneration, &flow_file); object_generation_str && !object_generation_str->empty()) { if (const auto geni64 = parsing::parseIntegral<int64_t>(*object_generation_str)) { gcs::Generation generation = gcs::Generation{*geni64}; callback.setGeneration(generation); } else { logger_->log_error("Invalid generation: {}", *object_generation_str); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } } session.write(flow_file, std::ref(callback)); if (!callback.getStatus().ok()) { - flow_file->setAttribute(GCS_STATUS_MESSAGE, callback.getStatus().message()); - flow_file->setAttribute(GCS_ERROR_REASON, callback.getStatus().error_info().reason()); - flow_file->setAttribute(GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain()); + session.setAttribute(flow_file, GCS_STATUS_MESSAGE, callback.getStatus().message()); + session.setAttribute(flow_file, GCS_ERROR_REASON, callback.getStatus().error_info().reason()); + session.setAttribute(flow_file, GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain()); logger_->log_error("Failed to fetch from Google Cloud Storage {} {}", callback.getStatus().message(), callback.getStatus().error_info().reason()); - session.transfer(flow_file, Failure); - return; + session.transfer(std::move(flow_file), Failure); + return MINIFI_STATUS_SUCCESS; } if (auto generation = callback.getGeneration()) - flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation)); + session.setAttribute(flow_file, GCS_GENERATION, std::to_string(*generation)); if (auto meta_generation = callback.getMetaGeneration()) - flow_file->setAttribute(GCS_META_GENERATION, std::to_string(*meta_generation)); + session.setAttribute(flow_file, GCS_META_GENERATION, std::to_string(*meta_generation)); if (auto storage_class = callback.getStorageClass()) - flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class); - session.transfer(flow_file, Success); + session.setAttribute(flow_file, GCS_STORAGE_CLASS, *storage_class); Review Comment: Could you wrap these if blocks in braces? ########## extensions/gcp/processors/GCSProcessor.cpp: ########## @@ -17,45 +17,43 @@ #include "GCSProcessor.h" -#include "utils/ProcessorConfigUtils.h" - #include "../controllerservices/GCPCredentialsControllerService.h" -#include "minifi-cpp/core/ProcessContext.h" -#include "core/ProcessSession.h" +#include "api/utils/ProcessorConfigUtils.h" namespace gcs = ::google::cloud::storage; namespace org::apache::nifi::minifi::extensions::gcp { -std::shared_ptr<google::cloud::Credentials> GCSProcessor::getCredentials(core::ProcessContext& context) const { - auto gcp_credentials_controller_service = utils::parseOptionalControllerService<GCPCredentialsControllerService>(context, GCSProcessor::GCPCredentials, getUUID()); - if (gcp_credentials_controller_service) { +std::shared_ptr<google::cloud::Credentials> GCSProcessor::getCredentials(const api::core::ProcessContext& context) { + if (const auto gcp_credentials_controller_service = api::utils::parseOptionalControllerService<GCPCredentialsControllerService>(context, + GCPCredentials)) { Review Comment: Continuation indentation should be 4 spaces. Unless it's 8 because you have two open parentheses? I don't know how I'd handle such cases, so I accept either, I just wanna know if it was intentional or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
