Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
szaszm closed pull request #1695: MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API URL: https://github.com/apache/nifi-minifi-cpp/pull/1695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452385790 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote service
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
adamdebreceni commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452371311 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote ser
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452345696 ## PROCESSORS.md: ## @@ -2196,6 +2197,40 @@ In the list below, the names of required properties appear in bold. Any other pr | failure | FlowFiles that failed to be sent to the destination are transferred to this relationship | +## PushGrafanaLokiREST + +### Description + +A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be pushed to Grafana Loki, therefore it is usually used together with the TailFile processor. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|--|---|--|| +| **Url** | | | Url of the Grafana Loki server. For example http://localhost:3100/. | +| **Stream Labels**| | | Comma separated list of = labels to be sent as stream labels. | +| Log Line Metadata Attributes | | | Comma separated list of attributes to be sent as log line metadata for a log line. | +| Tenant ID| | | The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent. | +| Max Batch Size | 100 | | The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once. | Review Comment: Good catch, fixed in 0f1a614f803abf8ea538df7f95330a953ae2afd7 ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::system_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
szaszm commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1446396780 ## PROCESSORS.md: ## @@ -2196,6 +2197,40 @@ In the list below, the names of required properties appear in bold. Any other pr | failure | FlowFiles that failed to be sent to the destination are transferred to this relationship | +## PushGrafanaLokiREST + +### Description + +A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be pushed to Grafana Loki, therefore it is usually used together with the TailFile processor. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|--|---|--|| +| **Url** | | | Url of the Grafana Loki server. For example http://localhost:3100/. | +| **Stream Labels**| | | Comma separated list of = labels to be sent as stream labels. | +| Log Line Metadata Attributes | | | Comma separated list of attributes to be sent as log line metadata for a log line. | +| Tenant ID| | | The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent. | +| Max Batch Size | 100 | | The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once. | Review Comment: If not set, isn't it just using the default 100? ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::system_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfi
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452221843 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote service
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
adamdebreceni commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452092124 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote ser
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439542044 ## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + SECTION("Stream labels cannot be empty") { +test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); + } + SECTION("Stream labels need to be valid") { +test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); + } + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "0"); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +class PushGrafanaLokiRESTTestFixture { + public: + PushGrafanaLokiRESTTestFixture() + : mock_loki_("10990"), + push_grafana_loki_rest_(std::make_shared("PushGrafanaLokiREST")), +test_controller_(push_grafana_loki_rest_) { +LogTestController::getInstance().setDebug(); +LogTestController::getInstance().setDebug(); +LogTestController::getInstance().setTrace(); +LogTestController::getInstance().setTrace(); +CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::Url, "localhost:10990")); +CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + } + + void setProperty(const auto& property, const std::string& property_value) { +CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, property, property_value)); + } + + void verifyLastRequestIsEmpty() { +const auto& request = mock_loki_.getLastRequest(); +REQUIRE(request.IsNull()); + } + + void verifyTenantId(const std::string& tenant_id) { +REQUIRE(mock_loki_.getLastTenantId() == tenant_id); + } + + void verifyBasicAuthorization(const std::string& expected_username_and_password) { +auto last_authorization = mock_loki_.getLastAuthorization(); +std::string expected_authorization = "Basic "; +REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization))
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541779 ## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); Review Comment: Removed the checks in 083c28ee59f96352e89d5675d635ea3b91bd9942 ## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + SECTION("Stream labels cannot be empty") { +test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); + } + SECTION("Stream labels need to be valid") { +test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); + } + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setPro
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541507 ## extensions/grafana-loki/tests/MockGrafanaLoki.h: ## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "tests/CivetLibrary.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +class GrafanaLokiHandler : public CivetHandler { + public: + const rapidjson::Document& getLastRequest() const { +return request_received_; + } + + std::string getLastTenantId() const { +return tenant_id_set_; + } + + std::string getLastAuthorization() const { +return authorization_set_; + } + + private: + bool handlePost(CivetServer*, struct mg_connection* conn) override { +tenant_id_set_.clear(); +authorization_set_.clear(); +const char *org_id = mg_get_header(conn, "X-Scope-OrgID"); +if (org_id != nullptr) { + tenant_id_set_ = org_id; +} + +const char *authorization = mg_get_header(conn, "Authorization"); +if (authorization != nullptr) { + authorization_set_ = authorization; +} + +std::array request; +size_t chars_read = mg_read(conn, request.data(), 2048); Review Comment: Updated in 083c28ee59f96352e89d5675d635ea3b91bd9942 ## extensions/grafana-loki/tests/MockGrafanaLoki.h: ## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "tests/CivetLibrary.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +class GrafanaLokiHandler : public CivetHandler { + public: + const rapidjson::Document& getLastRequest() const { +return request_received_; + } + + std::string getLastTenantId() const { +return tenant_id_set_; + } + + std::string getLastAuthorization() const { +return authorization_set_; + } + + private: + bool handlePost(CivetServer*, struct mg_connection* conn) override { +tenant_id_set_.clear(); +authorization_set_.clear(); +const char *org_id = mg_get_header(conn, "X-Scope-OrgID"); +if (org_id != nullptr) { + tenant_id_set_ = org_id; +} + +const char *authorization = mg_get_header(conn, "Authorization"); +if (authorization != nullptr) { + authorization_set_ = authorization; +} + +std::array request; +size_t chars_read = mg_read(conn, request.data(), 2048); +std::string json_str(request.data(), chars_read); +request_received_.Parse(json_str.c_str()); + +mg_printf(conn, "HTTP/1.1 204 OK\r\n"); +mg_printf(conn, "Content-length: 0"); +mg_printf(conn, "\r\n\r\n"); +return true; + } + + rapidjson::Document request_received_; + std::string tenant_id_set_; + std::string authorization_set_; +}; + +class MockGrafanaLoki { + public: + explicit MockGrafanaLoki(std::string port) : port_(std::move(port)) { +std::vector options; +options.emplace_back("listening_ports"); +options.emplace_back(p
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439541257 ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::steady_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { +start_push_time_ = {}; +std::unordered_map state; +logger_->log_debug("Reset start push time state"); +state["start_push_time"] = "0"; +state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { +return std::dynamic_pointer_cast(context.getControllerService(*ssl_context)); + } + return std::shared_ptr{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { +throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map state_map; + if (state_manager->get(state_map)) { +auto it = state_map.find("start_push_time"); +if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); +} + } +} + +void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessConte
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439540650 ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::steady_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { +start_push_time_ = {}; +std::unordered_map state; +logger_->log_debug("Reset start push time state"); +state["start_push_time"] = "0"; +state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { +return std::dynamic_pointer_cast(context.getControllerService(*ssl_context)); + } + return std::shared_ptr{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { +throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map state_map; + if (state_manager->get(state_map)) { +auto it = state_map.find("start_push_time"); +if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); Review Comment: Good catch, I would like to keep the state after restart, so
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539966 ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::steady_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { +start_push_time_ = {}; +std::unordered_map state; +logger_->log_debug("Reset start push time state"); +state["start_push_time"] = "0"; +state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { +return std::dynamic_pointer_cast(context.getControllerService(*ssl_context)); + } + return std::shared_ptr{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; Review Comment: Updated with the latter in 083c28ee59f96352e89d5675d635ea3b91bd9942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539574 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote service
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439539348 ## bootstrap.sh: ## @@ -339,6 +339,9 @@ add_option PROMETHEUS_ENABLED ${TRUE} "ENABLE_PROMETHEUS" add_option OPENSSL_ENABLED ${TRUE} "OPENSSL_OFF" add_dependency OPENSSL_ENABLED "opensslbuild" +add_option GRAFANA_LOKI_ENABLED ${TRUE} "ENABLE_GRAFANA_LOKI" +set_dependency GRAFANA_LOKI_ENABLED HTTP_CURL_ENABLED Review Comment: It should be false, updated in 083c28ee59f96352e89d5675d635ea3b91bd9942 ## docker/test/integration/cluster/checkers/GrafanaLokiChecker.py: ## @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import requests +from typing import List +from utils import wait_for + + +class GrafanaLokiChecker: +def __init__(self): +self.url = "localhost:3100/loki/api/v1/query" + +def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, tenant_id: str): +labels = '{job="minifi"}' +prefix = "http://"; +if ssl: +prefix = "https://"; + +query_url = f"{prefix}{self.url}?query={labels}" + +headers = None +if tenant_id: +headers = {'X-Scope-OrgID': tenant_id} + +response = requests.get(query_url, verify=False, timeout=30, headers=headers) +if response.status_code >= 200 and response.status_code < 300: Review Comment: Updated in 083c28ee59f96352e89d5675d635ea3b91bd9942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1439438776 ## extensions/grafana-loki/PushGrafanaLokiREST.h: ## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), +log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") +.withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.";) +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") +.withDescription("Comma separated list of = labels to be sent as stream labels.") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") +.withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") +.build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") +.withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") +.build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") +.withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) +.withDefaultValue("100") +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") +.withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") +.withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") +.withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) +.build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") +.withDescription("Max wait time for connection to the Grafana Loki service.") +.withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) +.withDefaultValue("5 s") +.isRequired(true) +.build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") +.withDescription("Max wait time for response from remote service
Re: [PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
fgerlits commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1425597679 ## docker/test/integration/cluster/checkers/GrafanaLokiChecker.py: ## @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import requests +from typing import List +from utils import wait_for + + +class GrafanaLokiChecker: +def __init__(self): +self.url = "localhost:3100/loki/api/v1/query" + +def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, tenant_id: str): +labels = '{job="minifi"}' +prefix = "http://"; +if ssl: +prefix = "https://"; + +query_url = f"{prefix}{self.url}?query={labels}" + +headers = None +if tenant_id: +headers = {'X-Scope-OrgID': tenant_id} + +response = requests.get(query_url, verify=False, timeout=30, headers=headers) +if response.status_code >= 200 and response.status_code < 300: Review Comment: this conditional could be reversed, so the `return True` happens at the end of the function ## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include +#include +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { +start_push_time_ = std::chrono::steady_clock::now(); +std::unordered_map state; +state["start_push_time"] = std::to_string(std::chrono::duration_cast(start_push_time_.time_since_epoch()).count()); +logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); +state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { +start_push_time_ = {}; +std::unordered_map state; +logger_->log_debug("Reset start push time state"); +state["start_push_time"] = "0"; +state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::
[PR] MINIFICPP-2261 Add processor for pushing logs to Grafana Loki through REST API [nifi-minifi-cpp]
lordgamez opened a new pull request, #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695 https://issues.apache.org/jira/browse/MINIFICPP-2261 - Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFICPP- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org