This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 65ca8e6 NIFI-6942 This closes #3934. Added a reporting task to push
provenance data to azure log analytics.
65ca8e6 is described below
commit 65ca8e6c8defbc82bbac8c510cf16621a34418e9
Author: Ubuntu
<bahl...@ubuntu.bsz2odxhlxmeloyl4zwvdtk5oh.xx.internal.cloudapp.net>
AuthorDate: Fri Dec 13 20:12:28 2019 +0000
NIFI-6942 This closes #3934. Added a reporting task to push provenance data
to azure log analytics.
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi-azure-reporting-task/pom.xml | 6 +
.../AbstractAzureLogAnalyticsReportingTask.java | 165 ++++++++
.../AzureLogAnalyticsProvenanceReportingTask.java | 470 +++++++++++++++++++++
.../AzureLogAnalyticsReportingTask.java | 233 +++-------
.../org.apache.nifi.reporting.ReportingTask | 3 +-
...stAzureLogAnalyticsProvenanceReportingTask.java | 96 +++++
6 files changed, 788 insertions(+), 185 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
index 79eb37d..3e14afc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/pom.xml
@@ -59,5 +59,11 @@
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-reporting-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
new file mode 100644
index 0000000..d48c765
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AbstractAzureLogAnalyticsReportingTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.text.MessageFormat;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+
+/**
+ * Abstract ReportingTask to send metrics from Apache NiFi and JVM to Azure
+ * Monitor.
+ */
+public abstract class AbstractAzureLogAnalyticsReportingTask extends
AbstractReportingTask {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String HMAC_SHA256_ALG = "HmacSHA256";
+ private static final DateTimeFormatter RFC_1123_DATE_TIME =
DateTimeFormatter
+ .ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
+
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new
PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Id").description("Log Analytics
Workspace Id").required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+ static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new
PropertyDescriptor.Builder()
+ .name("Log Analytics Workspace Key").description("Azure Log
Analytic Worskspace Key").required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
+ static final PropertyDescriptor APPLICATION_ID = new
PropertyDescriptor.Builder().name("Application ID")
+ .description("The Application ID to be included in the metrics
sent to Azure Log Analytics WS")
+
.required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor INSTANCE_ID = new
PropertyDescriptor.Builder().name("Instance ID")
+ .description("Id of this NiFi instance to be included in the
metrics sent to Azure Log Analytics WS")
+
.required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor PROCESS_GROUP_IDS = new
PropertyDescriptor.Builder().name("Process group ID(s)")
+ .description(
+ "If specified, the reporting task will send metrics the
configured ProcessGroup(s) only. Multiple IDs should be separated by a comma.
If"
+ + " none of the group-IDs could be found or no IDs
are defined, the Root Process Group is used and global metrics are sent.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.createListValidator(true, true,
+
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
+ .build();
+ static final PropertyDescriptor JOB_NAME = new
PropertyDescriptor.Builder().name("Job Name")
+ .description("The name of the exporting
job").defaultValue("nifi_reporting_job")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new
PropertyDescriptor.Builder()
+ .name("Log Analytics URL Endpoint Format").description("Log
Analytics URL Endpoint Format").required(false)
+
.defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+ protected String createAuthorization(String workspaceId, String key, int
contentLength, String rfc1123Date) {
+ try {
+ String signature =
String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs",
contentLength,
+ rfc1123Date);
+ Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
+ mac.init(new
SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
+ String hmac =
DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
+ return String.format("SharedKey %s:%s", workspaceId, hmac);
+ } catch (NoSuchAlgorithmException | InvalidKeyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+ properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+ properties.add(APPLICATION_ID);
+ properties.add(INSTANCE_ID);
+ properties.add(PROCESS_GROUP_IDS);
+ properties.add(JOB_NAME);
+ properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+ return properties;
+ }
+
+ /**
+ * Construct HttpPost and return it
+ *
+ * @param urlFormat URL format to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
+ * @param logName log table name where metrics will be pushed
+ * @return HttpsURLConnection to your azure log analytics workspace
+ * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
+ */
+ protected HttpPost getHttpPost(final String urlFormat, final String
workspaceId, final String logName)
+ throws IllegalArgumentException {
+ String dataCollectorEndpoint = MessageFormat.format(urlFormat,
workspaceId);
+ HttpPost post = new HttpPost(dataCollectorEndpoint);
+ post.addHeader("Content-Type", "application/json");
+ post.addHeader("Log-Type", logName);
+ return post;
+ }
+
+ protected void sendToLogAnalytics(final HttpPost request, final String
workspaceId, final String linuxPrimaryKey,
+ final String rawJson) throws IllegalArgumentException,
RuntimeException, IOException {
+ final int bodyLength = rawJson.getBytes(UTF8).length;
+ final String nowRfc1123 =
RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
+ final String createAuthorization = createAuthorization(workspaceId,
linuxPrimaryKey, bodyLength, nowRfc1123);
+ request.addHeader("Authorization", createAuthorization);
+ request.addHeader("x-ms-date", nowRfc1123);
+ request.setEntity(new StringEntity(rawJson));
+ request.setEntity(new StringEntity(rawJson));
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ postRequest(httpClient, request);
+ }
+ }
+
+ /**
+ * post request with httpClient and httpPost
+ *
+ * @param httpClient HttpClient
+ * @param request HttpPost
+ * @throws IOException if httpClient.execute fails
+ * @throws RuntimeException if post request status return other than 200
+ */
+ protected void postRequest(final CloseableHttpClient httpClient, final
HttpPost request)
+ throws IOException, RuntimeException {
+
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ if (response != null && response.getStatusLine().getStatusCode()
!= 200) {
+ throw new
RuntimeException(response.getStatusLine().toString());
+ }
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..da8cfa9
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.MessageFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
+
+@Tags({ "azure", "provenace", "reporting", "log analytics" })
+@CapabilityDescription("Publishes Provenance events to to a Azure Log
Analytics workspace.")
+public class AzureLogAnalyticsProvenanceReportingTask extends
AbstractAzureLogAnalyticsReportingTask {
+
+ protected static final String LAST_EVENT_ID_KEY = "last_event_id";
+ protected static final String DESTINATION_URL_PATH = "/nifi";
+ protected static final String TIMESTAMP_FORMAT =
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+ static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new
PropertyDescriptor.Builder()
+ .name("Log Analytics Custom Log
Name").description("Log Analytics Custom Log Name").required(false)
+
.defaultValue("nifiprovenance").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
+
+ static final AllowableValue BEGINNING_OF_STREAM = new
AllowableValue("beginning-of-stream",
+ "Beginning of Stream",
+ "Start reading provenance Events from the beginning of
the stream (the oldest event first)");
+
+ static final AllowableValue END_OF_STREAM = new
AllowableValue("end-of-stream", "End of Stream",
+ "Start reading provenance Events from the end of the
stream, ignoring old events");
+
+ static final PropertyDescriptor FILTER_EVENT_TYPE = new
PropertyDescriptor.Builder()
+ .name("s2s-prov-task-event-filter").displayName("Event
Type to Include")
+ .description("Comma-separated list of event types that
will be used to filter the provenance events sent by the reporting task. "
+ + "Available event types are "
+ +
Arrays.deepToString(ProvenanceEventType.values())
+ + ". If no filter is set, all the
events are sent. If "
+ + "multiple filters are set, the
filters are cumulative.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude")
+ .description("Comma-separated list of event types that
will be used to exclude the provenance events sent by the reporting task. "
+ + "Available event types are "
+ +
Arrays.deepToString(ProvenanceEventType.values())
+ + ". If no filter is set, all the
events are sent. If "
+ + "multiple filters are set, the
filters are cumulative. If an event type is included in Event Type to Include
and excluded here, then the "
+ + "exclusion takes precedence and the
event will not be sent.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_TYPE = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-type-filter").displayName("Component Type to Include")
+ .description("Regular expression to filter the
provenance events based on the component type. Only the events matching the
regular "
+ + "expression will be sent. If no
filter is set, all the events are sent. If multiple filters are set, the
filters are cumulative.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-type-filter-exclude").displayName("Component Type to
Exclude")
+ .description("Regular expression to exclude the
provenance events based on the component type. The events matching the regular "
+ + "expression will not be sent. If no
filter is set, all the events are sent. If multiple filters are set, the
filters are cumulative. "
+ + "If a component type is included in
Component Type to Include and excluded here, then the exclusion takes
precedence and the event will not be sent.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_ID = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-id-filter").displayName("Component ID to Include")
+ .description("Comma-separated list of component UUID
that will be used to filter the provenance events sent by the reporting task.
If no "
+ + "filter is set, all the events are
sent. If multiple filters are set, the filters are cumulative.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude")
+ .description("Comma-separated list of component UUID
that will be used to exclude the provenance events sent by the reporting task.
If no "
+ + "filter is set, all the events are
sent. If multiple filters are set, the filters are cumulative. If a component
UUID is included in "
+ + "Component ID to Include and
excluded here, then the exclusion takes precedence and the event will not be
sent.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_NAME = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-name-filter").displayName("Component Name to Include")
+ .description("Regular expression to filter the
provenance events based on the component name. Only the events matching the
regular "
+ + "expression will be sent. If no
filter is set, all the events are sent. If multiple filters are set, the
filters are cumulative.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new
PropertyDescriptor.Builder()
+
.name("s2s-prov-task-name-filter-exclude").displayName("Component Name to
Exclude")
+ .description("Regular expression to exclude the
provenance events based on the component name. The events matching the regular "
+ + "expression will not be sent. If no
filter is set, all the events are sent. If multiple filters are set, the
filters are cumulative. "
+ + "If a component name is included in
Component Name to Include and excluded here, then the exclusion takes
precedence and the event will not be sent.")
+
.required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
+
+ static final PropertyDescriptor START_POSITION = new
PropertyDescriptor.Builder().name("start-position")
+ .displayName("Start Position")
+ .description("If the Reporting Task has never been
run, or if its state has been reset by a user, "
+ + "specifies where in the stream of
Provenance Events the Reporting Task should start")
+ .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM)
+
.defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
+
+ static final PropertyDescriptor ALLOW_NULL_VALUES = new
PropertyDescriptor.Builder().name("include-null-values")
+ .displayName("Include Null Values")
+ .description("Indicate if null values should be
included in records. Default will be false")
+ .required(true).allowableValues("true",
"false").defaultValue("false").build();
+
+ static final PropertyDescriptor PLATFORM = new
PropertyDescriptor.Builder().name("Platform")
+ .description("The value to use for the platform field
in each event.").required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor INSTANCE_URL = new
PropertyDescriptor.Builder().name("Instance URL")
+ .displayName("Instance URL")
+ .description("The URL of this instance to use in the
Content URI of each event.").required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("http://${hostname(true)}:8080/nifi")
+
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+
+ static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder().name("Batch Size")
+ .displayName("Batch Size")
+ .description("Specifies how many records to send in a
single batch, at most.").required(true)
+
.defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
+
+ private ConfigurationContext context;
+
+ private volatile ProvenanceEventConsumer consumer;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(LOG_ANALYTICS_WORKSPACE_ID);
+ properties.add(LOG_ANALYTICS_CUSTOM_LOG_NAME);
+ properties.add(LOG_ANALYTICS_WORKSPACE_KEY);
+ properties.add(APPLICATION_ID);
+ properties.add(INSTANCE_ID);
+ properties.add(JOB_NAME);
+ properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
+ properties.add(FILTER_EVENT_TYPE);
+ properties.add(FILTER_EVENT_TYPE_EXCLUDE);
+ properties.add(FILTER_COMPONENT_TYPE);
+ properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
+ properties.add(FILTER_COMPONENT_ID);
+ properties.add(FILTER_COMPONENT_ID_EXCLUDE);
+ properties.add(FILTER_COMPONENT_NAME);
+ properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
+ properties.add(START_POSITION);
+ properties.add(ALLOW_NULL_VALUES);
+ properties.add(PLATFORM);
+ properties.add(INSTANCE_URL);
+ properties.add(BATCH_SIZE);
+ return properties;
+ }
+
+ public void CreateConsumer(final ReportingContext context) {
+ if (consumer != null)
+ return;
+ consumer = new ProvenanceEventConsumer();
+
consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
+
consumer.setBatchSize(context.getProperty(BATCH_SIZE).asInteger());
+ consumer.setLogger(getLogger());
+ // initialize component type filtering
+ consumer.setComponentTypeRegex(
+
context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
+
consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE)
+ .evaluateAttributeExpressions().getValue());
+ consumer.setComponentNameRegex(
+
context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
+
consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE)
+ .evaluateAttributeExpressions().getValue());
+
+ final String[] targetEventTypes =
StringUtils.stripAll(StringUtils.split(
+
context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(),
','));
+ if (targetEventTypes != null) {
+ for (final String type : targetEventTypes) {
+ try {
+
consumer.addTargetEventType(ProvenanceEventType.valueOf(type));
+ } catch (final Exception e) {
+ getLogger().warn(type
+ + " is not a correct
event type, removed from the filtering.");
+ }
+ }
+ }
+
+ final String[] targetEventTypesExclude = StringUtils
+
.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE)
+
.evaluateAttributeExpressions().getValue(), ','));
+ if (targetEventTypesExclude != null) {
+ for (final String type : targetEventTypesExclude) {
+ try {
+
consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+ } catch (final Exception e) {
+ getLogger().warn(type
+ + " is not a correct
event type, removed from the exclude filtering.");
+ }
+ }
+ }
+
+ // initialize component ID filtering
+ final String[] targetComponentIds =
StringUtils.stripAll(StringUtils.split(
+
context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(),
+ ','));
+ if (targetComponentIds != null) {
+ consumer.addTargetComponentId(targetComponentIds);
+ }
+
+ final String[] targetComponentIdsExclude = StringUtils
+
.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE)
+
.evaluateAttributeExpressions().getValue(), ','));
+ if (targetComponentIdsExclude != null) {
+
consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+ }
+
+ consumer.setScheduled(true);
+ }
+
+ @Override
+ public void onTrigger(ReportingContext context) {
+ final boolean isClustered = context.isClustered();
+ final String nodeId = context.getClusterNodeIdentifier();
+ if (nodeId == null && isClustered) {
+ getLogger().debug(
+ "This instance of NiFi is configured
for clustering, but the Cluster Node Identifier is not yet available. "
+ + "Will wait for Node
Identifier to be established.");
+ return;
+ }
+
+ try {
+ processProvenanceData(context);
+
+ } catch (final Exception e) {
+ getLogger().error("Failed to publish metrics to Azure
Log Analytics", e);
+ }
+ }
+
+ public void processProvenanceData(final ReportingContext context)
throws IOException {
+ getLogger().debug("Starting to process provenance data");
+ final String workspaceId =
context.getProperty(LOG_ANALYTICS_WORKSPACE_ID)
+ .evaluateAttributeExpressions().getValue();
+ final String linuxPrimaryKey =
context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY)
+ .evaluateAttributeExpressions().getValue();
+ final String logName =
context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+ .getValue();
+ final String urlEndpointFormat =
context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+ .evaluateAttributeExpressions().getValue();
+ final Integer batchSize =
context.getProperty(BATCH_SIZE).asInteger();
+ final String dataCollectorEndpoint =
MessageFormat.format(urlEndpointFormat, workspaceId);
+ final ProcessGroupStatus procGroupStatus =
context.getEventAccess().getControllerStatus();
+ final String nodeId = context.getClusterNodeIdentifier();
+ final String rootGroupName = procGroupStatus == null ? null :
procGroupStatus.getName();
+ final String platform =
context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
+ final Boolean allowNullValues =
context.getProperty(ALLOW_NULL_VALUES).asBoolean();
+ final String nifiUrl =
context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
+ URL url;
+ try {
+ url = new URL(nifiUrl);
+ } catch (final MalformedURLException e1) {
+ throw new AssertionError();
+ }
+
+ final String hostname = url.getHost();
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory =
Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder =
factory.createObjectBuilder();
+ final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone("Z"));
+ CreateConsumer(context);
+ consumer.consumeEvents(context, (mapHolder, events) -> {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append('[');
+ for (final ProvenanceEventRecord event : events) {
+ final String componentName =
mapHolder.getComponentName(event.getComponentId());
+ final String processGroupId =
mapHolder.getProcessGroupId(event.getComponentId(),
+ event.getComponentType());
+ final String processGroupName =
mapHolder.getComponentName(processGroupId);
+ final JsonObject jo = serialize(factory,
builder, event, df, componentName,
+ processGroupId,
processGroupName, hostname, url, rootGroupName,
+ platform, nodeId,
allowNullValues);
+ stringBuilder.append(jo.toString());
+ stringBuilder.append(',');
+ }
+ if (stringBuilder.charAt(stringBuilder.length() - 1)
== ',')
+
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ stringBuilder.append(']');
+ String str = stringBuilder.toString();
+ if (!str.equals("[]")) {
+ final HttpPost httpPost = new
HttpPost(dataCollectorEndpoint);
+ httpPost.addHeader("Content-Type",
"application/json");
+ httpPost.addHeader("Log-Type", logName);
+ getLogger().debug("Sending " + batchSize + "
events of length " + str.length() + " to azure log analytics " + logName);
+ try {
+ sendToLogAnalytics(httpPost,
workspaceId, linuxPrimaryKey, str);
+
+ } catch (final Exception e) {
+ getLogger().error("Failed to publish
provenance data to Azure Log Analytics", e);
+ }
+ }
+ });
+ getLogger().debug("Done processing provenance data");
+ }
+
+ private JsonObject serialize(final JsonBuilderFactory factory, final
JsonObjectBuilder builder,
+ final ProvenanceEventRecord event, final DateFormat
df, final String componentName,
+ final String processGroupId, final String
processGroupName, final String hostname,
+ final URL nifiUrl, final String applicationName, final
String platform,
+ final String nodeIdentifier, Boolean allowNullValues) {
+ addField(builder, "eventId", UUID.randomUUID().toString(),
allowNullValues);
+ addField(builder, "eventOrdinal", event.getEventId(),
allowNullValues);
+ addField(builder, "eventType", event.getEventType().name(),
allowNullValues);
+ addField(builder, "timestampMillis", event.getEventTime(),
allowNullValues);
+ addField(builder, "timestamp",
df.format(event.getEventTime()), allowNullValues);
+ addField(builder, "durationMillis", event.getEventDuration(),
allowNullValues);
+ addField(builder, "lineageStart", event.getLineageStartDate(),
allowNullValues);
+ addField(builder, "details", event.getDetails(),
allowNullValues);
+ addField(builder, "componentId", event.getComponentId(),
allowNullValues);
+ addField(builder, "componentType", event.getComponentType(),
allowNullValues);
+ addField(builder, "componentName", componentName,
allowNullValues);
+ addField(builder, "processGroupId", processGroupId,
allowNullValues);
+ addField(builder, "processGroupName", processGroupName,
allowNullValues);
+ addField(builder, "entityId", event.getFlowFileUuid(),
allowNullValues);
+ addField(builder, "entityType",
"org.apache.nifi.flowfile.FlowFile", allowNullValues);
+ addField(builder, "entitySize", event.getFileSize(),
allowNullValues);
+ addField(builder, "previousEntitySize",
event.getPreviousFileSize(), allowNullValues);
+ addField(builder, factory, "updatedAttributes",
event.getUpdatedAttributes(), allowNullValues);
+ addField(builder, factory, "previousAttributes",
event.getPreviousAttributes(), allowNullValues);
+
+ addField(builder, "actorHostname", hostname, allowNullValues);
+ if (nifiUrl != null) {
+ // TO get URL Prefix, we just remove the /nifi from
the end of the URL. We know
+ // that the URL ends with
+ // "/nifi" because the Property Validator enforces it
+ final String urlString = nifiUrl.toString();
+ final String urlPrefix = urlString.substring(0,
+ urlString.length() -
DESTINATION_URL_PATH.length());
+
+ final String contentUriBase = urlPrefix +
"/nifi-api/provenance-events/" + event.getEventId()
+ + "/content/";
+ final String nodeIdSuffix = nodeIdentifier == null ?
"" : "?clusterNodeId=" + nodeIdentifier;
+ addField(builder, "contentURI", contentUriBase +
"output" + nodeIdSuffix, allowNullValues);
+ addField(builder, "previousContentURI", contentUriBase
+ "input" + nodeIdSuffix,
+ allowNullValues);
+ }
+
+ addField(builder, factory, "parentIds",
event.getParentUuids(), allowNullValues);
+ addField(builder, factory, "childIds", event.getChildUuids(),
allowNullValues);
+ addField(builder, "transitUri", event.getTransitUri(),
allowNullValues);
+ addField(builder, "remoteIdentifier",
event.getSourceSystemFlowFileIdentifier(), allowNullValues);
+ addField(builder, "alternateIdentifier",
event.getAlternateIdentifierUri(), allowNullValues);
+ addField(builder, "platform", platform, allowNullValues);
+ addField(builder, "application", applicationName,
allowNullValues);
+ return builder.build();
+ }
+
+ @OnUnscheduled
+ public void onUnscheduled() {
+ if (consumer != null) {
+ getLogger().debug("Disabling schedule to consume
provenance data.");
+ consumer.setScheduled(false);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final
String key, final Object value,
+ boolean allowNullValues) {
+ if (value != null) {
+ if (value instanceof String) {
+ builder.add(key, (String) value);
+ } else if (value instanceof Integer) {
+ builder.add(key, (Integer) value);
+ } else if (value instanceof Boolean) {
+ builder.add(key, (Boolean) value);
+ } else if (value instanceof Long) {
+ builder.add(key, (Long) value);
+ } else {
+ builder.add(key, value.toString());
+ }
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final
JsonBuilderFactory factory, final String key,
+ final Map<String, String> values, Boolean
allowNullValues) {
+ if (values != null) {
+ final JsonObjectBuilder mapBuilder =
factory.createObjectBuilder();
+ for (final Map.Entry<String, String> entry :
values.entrySet()) {
+
+ if (entry.getKey() == null) {
+ continue;
+ } else if (entry.getValue() == null) {
+ if (allowNullValues) {
+ mapBuilder.add(entry.getKey(),
JsonValue.NULL);
+ }
+ } else {
+ mapBuilder.add(entry.getKey(),
entry.getValue());
+ }
+ }
+
+ builder.add(key, mapBuilder);
+
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ public static void addField(final JsonObjectBuilder builder, final
JsonBuilderFactory factory, final String key,
+ final Collection<String> values, Boolean
allowNullValues) {
+ if (values != null) {
+ builder.add(key, createJsonArray(factory, values));
+ } else if (allowNullValues) {
+ builder.add(key, JsonValue.NULL);
+ }
+ }
+
+ private static JsonArrayBuilder createJsonArray(JsonBuilderFactory
factory, final Collection<String> values) {
+ final JsonArrayBuilder builder = factory.createArrayBuilder();
+ for (final String value : values) {
+ if (value != null) {
+ builder.add(value);
+ }
+ }
+ return builder;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
index 8d64d2f..f9d9a82 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/java/org/apache/nifi/reporting/azure/loganalytics/AzureLogAnalyticsReportingTask.java
@@ -17,29 +17,13 @@
package org.apache.nifi.reporting.azure.loganalytics;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.text.MessageFormat;
-import java.time.ZoneOffset;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
-import java.util.regex.Pattern;
-
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import javax.xml.bind.DatatypeConverter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -51,112 +35,29 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.metrics.jvm.JvmMetrics;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import
org.apache.nifi.reporting.azure.loganalytics.api.AzureLogAnalyticsMetricsFactory;
import org.apache.nifi.scheduling.SchedulingStrategy;
-
/**
* ReportingTask to send metrics from Apache NiFi and JVM to Azure Monitor.
*/
-@Tags({"reporting", "loganalytics", "metrics"})
-@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a
Azure Log Analytics workspace." +
- "Apache NiFi-metrics can be either configured global or on
process-group level.")
+@Tags({ "azure", "metrics", "reporting", "log analytics" })
+@CapabilityDescription("Sends JVM-metrics as well as Apache NiFi-metrics to a
Azure Log Analytics workspace."
+ + "Apache NiFi-metrics can be either configured global or on
process-group level.")
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
-public class AzureLogAnalyticsReportingTask extends AbstractReportingTask {
+public class AzureLogAnalyticsReportingTask extends
AbstractAzureLogAnalyticsReportingTask {
private static final String JVM_JOB_NAME = "jvm_global";
- private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final String HMAC_SHA256_ALG = "HmacSHA256";
- private static final DateTimeFormatter RFC_1123_DATE_TIME =
DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O");
private final JvmMetrics virtualMachineMetrics =
JmxJvmMetrics.getInstance();
-
- static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_ID = new
PropertyDescriptor.Builder()
- .name("Log Analytics Workspace Id")
- .description("Log Analytics Workspace Id")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
+ static final PropertyDescriptor SEND_JVM_METRICS = new
PropertyDescriptor.Builder().name("Send JVM Metrics")
+ .description("Send JVM Metrics in addition to the
NiFi-metrics").allowableValues("true", "false")
+ .defaultValue("false").required(true).build();
static final PropertyDescriptor LOG_ANALYTICS_CUSTOM_LOG_NAME = new
PropertyDescriptor.Builder()
- .name("Log Analytics Custom Log Name")
- .description("Log Analytics Custom Log Name")
- .required(false)
- .defaultValue("nifimetrics")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
- static final PropertyDescriptor LOG_ANALYTICS_WORKSPACE_KEY = new
PropertyDescriptor.Builder()
- .name("Log Analytics Workspace Key")
- .description("Azure Log Analytic Worskspace Key")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
- static final PropertyDescriptor APPLICATION_ID = new
PropertyDescriptor.Builder()
- .name("Application ID")
- .description("The Application ID to be included in the metrics
sent to Azure Log Analytics WS")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .defaultValue("nifi")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor INSTANCE_ID = new
PropertyDescriptor.Builder()
- .name("Instance ID")
- .description("Id of this NiFi instance to be included in the
metrics sent to Azure Log Analytics WS")
- .required(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .defaultValue("${hostname(true)}")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor PROCESS_GROUP_IDS = new
PropertyDescriptor.Builder()
- .name("Process group ID(s)")
- .description("If specified, the reporting task will send metrics
the configured ProcessGroup(s) only. Multiple IDs should be separated by a
comma. If"
- + " none of the group-IDs could be found or no IDs are
defined, the Root Process Group is used and global metrics are sent.")
- .required(false)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators
- .createListValidator(true, true,
StandardValidators.createRegexMatchingValidator(Pattern.compile("[0-9a-z-]+"))))
- .build();
- static final PropertyDescriptor JOB_NAME = new PropertyDescriptor.Builder()
- .name("Job Name")
- .description("The name of the exporting job")
- .defaultValue("nifi_reporting_job")
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- static final PropertyDescriptor SEND_JVM_METRICS = new
PropertyDescriptor.Builder()
- .name("Send JVM Metrics")
- .description("Send JVM Metrics in addition to the NiFi-metrics")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
- static final PropertyDescriptor LOG_ANALYTICS_URL_ENDPOINT_FORMAT = new
PropertyDescriptor.Builder()
- .name("Log Analytics URL Endpoint Format")
- .description("Log Analytics URL Endpoint Format")
- .required(false)
-
.defaultValue("https://{0}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- private String createAuthorization(String workspaceId, String key, int
contentLength, String rfc1123Date) {
- try {
- // Documentation:
https://docs.microsoft.com/en-us/rest/api/loganalytics/create-request
- String signature =
String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs",
contentLength, rfc1123Date);
- Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
- mac.init(new
SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG));
- String hmac =
DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8)));
- return String.format("SharedKey %s:%s", workspaceId, hmac);
- } catch (NoSuchAlgorithmException | InvalidKeyException e) {
- throw new RuntimeException(e);
- }
- }
+ .name("Log Analytics Custom Log Name").description("Log Analytics
Custom Log Name").required(false)
+
.defaultValue("nifimetrics").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -168,31 +69,34 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
properties.add(INSTANCE_ID);
properties.add(PROCESS_GROUP_IDS);
properties.add(JOB_NAME);
- properties.add(SEND_JVM_METRICS);
properties.add(LOG_ANALYTICS_URL_ENDPOINT_FORMAT);
return properties;
}
@Override
- public void onTrigger(final ReportingContext context){
- final String workspaceId =
context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions().getValue();
- final String linuxPrimaryKey =
context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions().getValue();
+ public void onTrigger(final ReportingContext context) {
+ final String workspaceId =
context.getProperty(LOG_ANALYTICS_WORKSPACE_ID).evaluateAttributeExpressions()
+ .getValue();
+ final String linuxPrimaryKey =
context.getProperty(LOG_ANALYTICS_WORKSPACE_KEY).evaluateAttributeExpressions()
+ .getValue();
final boolean jvmMetricsCollected =
context.getProperty(SEND_JVM_METRICS).asBoolean();
-
- final String logName =
context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions().getValue();
+ final String logName =
context.getProperty(LOG_ANALYTICS_CUSTOM_LOG_NAME).evaluateAttributeExpressions()
+ .getValue();
final String instanceId =
context.getProperty(INSTANCE_ID).evaluateAttributeExpressions().getValue();
final String groupIds =
context.getProperty(PROCESS_GROUP_IDS).evaluateAttributeExpressions().getValue();
- final String urlEndpointFormat =
context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT).evaluateAttributeExpressions().getValue();
+ final String urlEndpointFormat =
context.getProperty(LOG_ANALYTICS_URL_ENDPOINT_FORMAT)
+ .evaluateAttributeExpressions().getValue();
+
try {
List<Metric> allMetrics = null;
- if(groupIds == null || groupIds.isEmpty()) {
+ if (groupIds == null || groupIds.isEmpty()) {
ProcessGroupStatus status =
context.getEventAccess().getControllerStatus();
String processGroupName = status.getName();
allMetrics = collectMetrics(instanceId, status,
processGroupName, jvmMetricsCollected);
} else {
allMetrics = new ArrayList<>();
- for(String groupId: groupIds.split(",")) {
- groupId =groupId.trim();
+ for (String groupId : groupIds.split(",")) {
+ groupId = groupId.trim();
ProcessGroupStatus status =
context.getEventAccess().getGroupStatus(groupId);
String processGroupName = status.getName();
allMetrics.addAll(collectMetrics(instanceId, status,
processGroupName, jvmMetricsCollected));
@@ -206,35 +110,21 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
}
/**
- * Construct HttpPost and return it
- * @param urlFormat URL format to Azure Log Analytics Endpoint
- * @param workspaceId your azure log analytics workspace id
- * @param logName log table name where metrics will be pushed
- * @return HttpsURLConnection to your azure log analytics workspace
- * @throws IllegalArgumentException if dataCollectorEndpoint url is invalid
- */
- protected HttpPost getHttpPost(final String urlFormat, final String
workspaceId, final String logName)
- throws IllegalArgumentException {
- String dataCollectorEndpoint =
- MessageFormat.format(urlFormat, workspaceId);
- HttpPost post = new HttpPost(dataCollectorEndpoint);
- post.addHeader("Content-Type", "application/json");
- post.addHeader("Log-Type", logName);
- return post;
- }
- /**
* send collected metrics to azure log analytics workspace
- * @param request HttpPost to Azure Log Analytics Endpoint
- * @param workspaceId your azure log analytics workspace id
+ *
+ * @param request HttpPost to Azure Log Analytics Endpoint
+ * @param workspaceId your azure log analytics workspace id
* @param linuxPrimaryKey your azure log analytics workspace key
- * @param allMetrics collected metrics to be sent
- * @throws IOException when there is an error in https url connection or
read/write to the onnection
- * @throws IllegalArgumentException when there a exception in converting
metrics to json string with Gson.toJson
- * @throws RuntimeException when httpPost fails with none 200 status code
+ * @param allMetrics collected metrics to be sent
+ * @throws IOException when there is an error in https url
+ * connection or read/write to the
onnection
+ * @throws IllegalArgumentException when there a exception in converting
metrics
+ * to json string with Gson.toJson
+ * @throws RuntimeException when httpPost fails with none 200
status
+ * code
*/
- protected void sendMetrics(final HttpPost request, final String
workspaceId, final String linuxPrimaryKey, final List<Metric> allMetrics)
- throws IOException, IllegalArgumentException, RuntimeException {
-
+ protected void sendMetrics(final HttpPost request, final String
workspaceId, final String linuxPrimaryKey,
+ final List<Metric> allMetrics) throws IOException,
IllegalArgumentException, RuntimeException {
Gson gson = new GsonBuilder().create();
StringBuilder builder = new StringBuilder();
builder.append('[');
@@ -243,45 +133,20 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
builder.append(',');
}
builder.append(']');
-
- final String rawJson = builder.toString();
- final int bodyLength = rawJson.getBytes(UTF8).length;
- final String nowRfc1123 =
RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
- final String createAuthorization = createAuthorization(workspaceId,
linuxPrimaryKey, bodyLength, nowRfc1123);
- request.addHeader("Authorization", createAuthorization);
- request.addHeader("x-ms-date", nowRfc1123);
- request.setEntity(new StringEntity(rawJson));
- try(CloseableHttpClient httpClient = HttpClients.createDefault()){
- postRequest(httpClient, request);
- }
+ sendToLogAnalytics(request, workspaceId, linuxPrimaryKey,
builder.toString());
}
/**
- * post request with httpClient and httpPost
- * @param httpClient HttpClient
- * @param request HttpPost
- * @throws IOException if httpClient.execute fails
- * @throws RuntimeException if post request status return other than 200
- */
- protected void postRequest(final CloseableHttpClient httpClient, final
HttpPost request)
- throws IOException, RuntimeException {
-
- try (CloseableHttpResponse response = httpClient.execute(request)) {
- if(response != null && response.getStatusLine().getStatusCode() !=
200) {
- throw new
RuntimeException(response.getStatusLine().toString());
- }
- }
- }
- /**
* collect metrics to be sent to azure log analytics workspace
- * @param instanceId instance id
- * @param status process group status
- * @param processGroupName process group name
+ *
+ * @param instanceId instance id
+ * @param status process group status
+ * @param processGroupName process group name
* @param jvmMetricsCollected whether we want to collect jvm metrics or not
* @return list of metrics collected
*/
- private List<Metric> collectMetrics(final String instanceId,
- final ProcessGroupStatus status, final String processGroupName,
final boolean jvmMetricsCollected) {
+ protected List<Metric> collectMetrics(final String instanceId, final
ProcessGroupStatus status,
+ final String processGroupName, final boolean jvmMetricsCollected) {
List<Metric> allMetrics = new ArrayList<>();
// dataflow process group level metrics
@@ -290,9 +155,9 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
// connections process group level metrics
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(status, connectionStatuses);
- for (ConnectionStatus connectionStatus: connectionStatuses) {
- allMetrics.addAll(
-
AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus,
instanceId, processGroupName));
+ for (ConnectionStatus connectionStatus : connectionStatuses) {
+
allMetrics.addAll(AzureLogAnalyticsMetricsFactory.getConnectionStatusMetrics(connectionStatus,
instanceId,
+ processGroupName));
}
// processor level metrics
@@ -300,13 +165,12 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
populateProcessorStatuses(status, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
allMetrics.addAll(
-
AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus,
instanceId, processGroupName)
- );
+
AzureLogAnalyticsMetricsFactory.getProcessorMetrics(processorStatus,
instanceId, processGroupName));
}
if (jvmMetricsCollected) {
allMetrics.addAll(
-
AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics,
instanceId, JVM_JOB_NAME));
+
AzureLogAnalyticsMetricsFactory.getJvmMetrics(virtualMachineMetrics,
instanceId, JVM_JOB_NAME));
}
return allMetrics;
@@ -319,7 +183,8 @@ public class AzureLogAnalyticsReportingTask extends
AbstractReportingTask {
}
}
- private void populateConnectionStatuses(final ProcessGroupStatus
groupStatus, final List<ConnectionStatus> statuses) {
+ private void populateConnectionStatuses(final ProcessGroupStatus
groupStatus,
+ final List<ConnectionStatus> statuses) {
statuses.addAll(groupStatus.getConnectionStatus());
for (final ProcessGroupStatus childGroupStatus :
groupStatus.getProcessGroupStatus()) {
populateConnectionStatuses(childGroupStatus, statuses);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index bbb8692..cbb3dc1 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -12,4 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsReportingTask
+org.apache.nifi.reporting.azure.loganalytics.AzureLogAnalyticsProvenanceReportingTask
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
new file mode 100644
index 0000000..ee7f8b9
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-reporting-task/src/test/java/org/apache/nifi/reporting/azure/loganalytics/TestAzureLogAnalyticsProvenanceReportingTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.azure.loganalytics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObjectBuilder;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.junit.Test;
+
+public class TestAzureLogAnalyticsProvenanceReportingTask {
+
+ @Test
+ public void testAddField1() throws IOException, InterruptedException,
InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyString", "StringValue", true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyInteger", 2674440, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyLong", 1289904147324L, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyBoolean", true, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyNotSupportedObject", 1.25, true);
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder,
"TestKeyNull", null, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString = "{" +
+ "\"TestKeyString\": \"StringValue\"," +
+ "\"TestKeyInteger\": 2674440," +
+ "\"TestKeyLong\": 1289904147324," +
+ "\"TestKeyBoolean\": true," +
+ "\"TestKeyNotSupportedObject\":
\"1.25\"," +
+ "\"TestKeyNull\": null" +
+ "}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString,
JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+
+ @Test
+ public void testAddField2() throws IOException, InterruptedException,
InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ Map<String, String> values = new HashMap<String, String>();
+ values.put("TestKeyString1", "StringValue1");
+ values.put("TestKeyString2", "StringValue2");
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory,
"TestKeyString", values, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString =
"{\"TestKeyString\":{\"TestKeyString2\":\"StringValue2\",\"TestKeyString1\":\"StringValue1\"}}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString,
JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+
+ @Test
+ public void testAddField3() throws IOException, InterruptedException,
InitializationException {
+
+ final Map<String, Object> config = Collections.emptyMap();
+ final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+ final JsonObjectBuilder builder = factory.createObjectBuilder();
+ Collection<String> values = new ArrayList<String>();
+ values.add("TestValueString1");
+ values.add("TestValueString2");
+ AzureLogAnalyticsProvenanceReportingTask.addField(builder, factory,
"TestKeyString", values, true);
+ javax.json.JsonObject actualJson = builder.build();
+ String expectedjsonString =
"{\"TestKeyString\":[\"TestValueString1\",\"TestValueString2\"]}";
+ JsonObject expectedJson = new Gson().fromJson(expectedjsonString,
JsonObject.class);
+ assertEquals(expectedJson.toString(), actualJson.toString());
+ }
+}
\ No newline at end of file