Repository: nifi Updated Branches: refs/heads/master 1663a6c09 -> 1d21e3baf
NIFI-5122 - Add Record Writer for S2S RTs This closes #2663 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1d21e3ba Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1d21e3ba Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1d21e3ba Branch: refs/heads/master Commit: 1d21e3baf85ec622fd9d0fb9d3f18c27802a0978 Parents: 1663a6c Author: Pierre Villard <pierre.villard...@gmail.com> Authored: Fri Apr 27 17:41:39 2018 +0200 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Wed May 30 06:58:01 2018 -0400 ---------------------------------------------------------------------- .../nifi-site-to-site-reporting-task/pom.xml | 3 + .../AbstractSiteToSiteReportingTask.java | 21 +++- .../SiteToSiteBulletinReportingTask.java | 13 +- .../SiteToSiteMetricsReportingTask.java | 1 - .../SiteToSiteProvenanceReportingTask.java | 12 +- .../SiteToSiteStatusReportingTask.java | 14 ++- .../additionalDetails.html | 65 ++++++++++ .../additionalDetails.html | 9 +- .../additionalDetails.html | 122 +++++++++++++++++++ .../src/main/resources/schema-bulletins.avsc | 21 ++++ .../src/main/resources/schema-provenance.avsc | 35 ++++++ .../src/main/resources/schema-status.avsc | 78 ++++++++++++ .../TestSiteToSiteBulletinReportingTask.java | 4 + .../TestSiteToSiteMetricsReportingTask.java | 3 - .../TestSiteToSiteProvenanceReportingTask.java | 4 + .../TestSiteToSiteStatusReportingTask.java | 6 +- 16 files changed, 393 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index 93a3196..d60893e 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -120,6 +120,9 @@ <configuration> <excludes combine.children="append"> <exclude>src/main/resources/schema-metrics.avsc</exclude> + <exclude>src/main/resources/schema-bulletins.avsc</exclude> + <exclude>src/main/resources/schema-provenance.avsc</exclude> + <exclude>src/main/resources/schema-status.avsc</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index e755354..21bb397 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.reporting; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +31,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import javax.json.JsonArray; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; import javax.net.ssl.SSLContext; @@ -46,6 +49,7 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; @@ -208,6 +212,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT properties.add(HTTP_PROXY_PORT); properties.add(HTTP_PROXY_USERNAME); properties.add(HTTP_PROXY_PASSWORD); + properties.add(RECORD_WRITER); return properties; } @@ -264,6 +269,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT return this.siteToSiteClient; } + protected void sendData(final ReportingContext context, final Transaction transaction, Map<String, String> attributes, final JsonArray jsonArray) throws IOException { + if(context.getProperty(RECORD_WRITER).isSet()) { + transaction.send(getData(context, new ByteArrayInputStream(jsonArray.toString().getBytes(StandardCharsets.UTF_8)), attributes), attributes); + } else { + transaction.send(jsonArray.toString().getBytes(StandardCharsets.UTF_8), attributes); + } + } + protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) { try (final JsonRecordReader reader = new JsonRecordReader(in, recordSchema)) { @@ -387,8 +400,14 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT if (firstObjectConsumed && !array) { return null; } + + JsonNode nextNode = getNextJsonNode(); + if(nextNode == null) { + return null; + } + try { - return convertJsonNodeToRecord(getNextJsonNode(), getSchema(), null, coerceTypes, dropUnknownFields); + return convertJsonNodeToRecord(nextNode, getSchema(), null, coerceTypes, dropUnknownFields); } catch (final MalformedRecordException mre) { throw mre; } catch (final IOException ioe) { http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index d026aa1..ac60d8a 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -17,11 +17,13 @@ package org.apache.nifi.reporting; +import org.apache.avro.Schema; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -37,8 +39,9 @@ import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; + import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.io.InputStream; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -76,6 +79,11 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting private volatile long lastSentBulletinId = -1L; + public SiteToSiteBulletinReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-bulletins.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); @@ -153,8 +161,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("mime.type", "application/json"); - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + sendData(context, transaction, attributes, jsonArray); transaction.confirm(); transaction.complete(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 20416e1..e17c2c8 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -113,7 +113,6 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT properties.add(HOSTNAME); properties.add(APPLICATION_ID); properties.add(FORMAT); - properties.add(RECORD_WRITER); properties.remove(BATCH_SIZE); return properties; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index ec1414d..ec45c59 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -17,6 +17,7 @@ package org.apache.nifi.reporting; +import org.apache.avro.Schema; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restriction; @@ -25,6 +26,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; @@ -47,9 +49,9 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -160,6 +162,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti private volatile ProvenanceEventConsumer consumer; + public SiteToSiteProvenanceReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-provenance.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @OnScheduled public void onScheduled(final ConfigurationContext context) throws IOException { consumer = new ProvenanceEventConsumer(); @@ -287,8 +294,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("mime.type", "application/json"); - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + sendData(context, transaction, attributes, jsonArray); transaction.confirm(); transaction.complete(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index c419d5c..618c40c 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -18,9 +18,9 @@ package org.apache.nifi.reporting; import java.io.IOException; +import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -41,8 +41,10 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; +import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; @@ -92,6 +94,11 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa private volatile Pattern componentTypeFilter; private volatile Pattern componentNameFilter; + public SiteToSiteStatusReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-status.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); @@ -168,10 +175,9 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa for(JsonValue jsonValue : jsonBatch) { jsonBatchArrayBuilder.add(jsonValue); } - final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build(); - final byte[] data = jsonBatchArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); + final JsonArray jsonBatchArray = jsonBatchArrayBuilder.build(); + sendData(context, transaction, attributes, jsonBatchArray); transaction.confirm(); transaction.complete(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html new file mode 100644 index 0000000..c76c138 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteBulletinReportingTask/additionalDetails.html @@ -0,0 +1,65 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>SiteToSiteBulletinReportingTask</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + The Site-to-Site Bulletin Reporting Task allows the user to publish Bulletin events using the Site To Site protocol. Note: + only up to 5 bulletins are stored per component and up to 10 bulletins at controller level for a duration of up to 5 minutes. + If this reporting task is not scheduled frequently enough some bulletins may not be sent. + </p> + + <h2>Record writer</h2> + + <p> + The user can define a Record Writer and directly specify the output format and data with the assumption that the input schema + is the following: + </p> + + <pre> + <code> +{ + "type" : "record", + "name" : "bulletins", + "namespace" : "bulletins", + "fields" : [ + { "name" : "objectId", "type" : "string" }, + { "name" : "platform", "type" : "string" }, + { "name" : "bulletinId", "type" : "long" }, + { "name" : "bulletinCategory", "type" : ["string", "null"] }, + { "name" : "bulletinGroupId", "type" : ["string", "null"] }, + { "name" : "bulletinGroupName", "type" : ["string", "null"] }, + { "name" : "bulletinLevel", "type" : ["string", "null"] }, + { "name" : "bulletinMessage", "type" : ["string", "null"] }, + { "name" : "bulletinNodeAddress", "type" : ["string", "null"] }, + { "name" : "bulletinNodeId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceName", "type" : ["string", "null"] }, + { "name" : "bulletinSourceType", "type" : ["string", "null"] }, + { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" } + ] +} + </code> + </pre> + + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html index 86736a6..676674e 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html @@ -33,8 +33,8 @@ </p> <p> - When published to a NiFi instance, the Provenance data is sent as a JSON array. Quite often, it can be useful to work with this data using - a schema. As such, the schema for this Provenance data can be defined as follows: + By default, when published to a NiFi instance, the Provenance data is sent as a JSON array. However, the user can define a Record Writer + and directly specify the output format and data with the assumption that the input schema is defined as follows: </p> <pre> @@ -53,6 +53,9 @@ { "name": "details", "type": "string" }, { "name": "componentId", "type": "string" }, { "name": "componentType", "type": "string" }, + { "name": "componentName", "type": "string" }, + { "name": "processGroupId", "type": "string" }, + { "name": "processGroupName", "type": "string" }, { "name": "entityId", "type": "string" }, { "name": "entityType", "type": "string" }, { "name": "entitySize", "type": ["null", "long"] }, @@ -66,6 +69,8 @@ { "name": "childIds", "type": { "type": "array", "items": "string" } }, { "name": "platform", "type": "string" }, { "name": "application", "type": "string" }, + { "name": "remoteIdentifier", "type": "string" }, + { "name": "alternateIdentifier", "type": "string" }, { "name": "transitUri", "type": ["null", "string"] } ] } http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html new file mode 100644 index 0000000..2d0be38 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteStatusReportingTask/additionalDetails.html @@ -0,0 +1,122 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>SiteToSiteStatusReportingTask</title> + + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + The Site-to-Site Status Reporting Task allows the user to publish Status events using the Site To Site protocol. + The component type and name filter regexes form a union: only components matching both regexes will be reported. + However, all process groups are recursively searched for matching components, regardless of whether the process + group matches the component filters. + </p> + + <h2>Record writer</h2> + + <p> + The user can define a Record Writer and directly specify the output format and data with the assumption that the + input schema is the following: + </p> + + <pre> + <code> +{ + "type" : "record", + "name" : "status", + "namespace" : "status", + "fields" : [ + // common fields for all components + { "name" : "statusId", "type" : "string"}, + { "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } }, + { "name" : "timestamp", "type" : "string"}, + { "name" : "actorHostname", "type" : "string"}, + { "name" : "componentType", "type" : "string"}, + { "name" : "componentName", "type" : "string"}, + { "name" : "parentId", "type" : ["string", "null"]}, + { "name" : "platform", "type" : "string"}, + { "name" : "application", "type" : "string"}, + { "name" : "componentId", "type" : "string"}, + + // PG + RPG + Ports + Processors + { "name" : "activeThreadCount", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "flowFilesReceived", "type" : ["long", "null"]}, + { "name" : "flowFilesSent", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "bytesReceived", "type" : ["long", "null"]}, + { "name" : "bytesSent", "type" : ["long", "null"]}, + + // PG + Connections + { "name" : "queuedCount", "type" : ["long", "null"]}, + + // PG + Processors + { "name" : "bytesRead", "type" : ["long", "null"]}, + { "name" : "bytesWritten", "type" : ["long", "null"]}, + + // fields for process group status + { "name" : "bytesTransferred", "type" : ["long", "null"]}, + { "name" : "flowFilesTransferred", "type" : ["long", "null"]}, + { "name" : "inputContentSize", "type" : ["long", "null"]}, + { "name" : "outputContentSize", "type" : ["long", "null"]}, + { "name" : "queuedContentSize", "type" : ["long", "null"]}, + + // fields for remote process groups + { "name" : "activeRemotePortCount", "type" : ["long", "null"]}, + { "name" : "inactiveRemotePortCount", "type" : ["long", "null"]}, + { "name" : "receivedContentSize", "type" : ["long", "null"]}, + { "name" : "receivedCount", "type" : ["long", "null"]}, + { "name" : "sentContentSize", "type" : ["long", "null"]}, + { "name" : "sentCount", "type" : ["long", "null"]}, + { "name" : "averageLineageDuration", "type" : ["long", "null"]}, + + // fields for input/output ports + connections + PG + { "name" : "inputBytes", "type" : ["long", "null"]}, + { "name" : "inputCount", "type" : ["long", "null"]}, + { "name" : "outputBytes", "type" : ["long", "null"]}, + { "name" : "outputCount", "type" : ["long", "null"]}, + + // fields for connections + { "name" : "sourceId", "type" : ["string", "null"]}, + { "name" : "sourceName", "type" : ["string", "null"]}, + { "name" : "destinationId", "type" : ["string", "null"]}, + { "name" : "destinationName", "type" : ["string", "null"]}, + { "name" : "maxQueuedBytes", "type" : ["long", "null"]}, + { "name" : "maxQueuedCount", "type" : ["long", "null"]}, + { "name" : "queuedBytes", "type" : ["long", "null"]}, + { "name" : "backPressureBytesThreshold", "type" : ["long", "null"]}, + { "name" : "backPressureObjectThreshold", "type" : ["long", "null"]}, + { "name" : "isBackPressureEnabled", "type" : ["string", "null"]}, + + // fields for processors + { "name" : "processorType", "type" : ["string", "null"]}, + { "name" : "averageLineageDurationMS", "type" : ["long", "null"]}, + { "name" : "flowFilesRemoved", "type" : ["long", "null"]}, + { "name" : "invocations", "type" : ["long", "null"]}, + { "name" : "processingNanos", "type" : ["long", "null"]} + ] +} + </code> + </pre> + + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc new file mode 100644 index 0000000..01b0f33 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-bulletins.avsc @@ -0,0 +1,21 @@ +{ + "type" : "record", + "name" : "bulletins", + "namespace" : "bulletins", + "fields" : [ + { "name" : "objectId", "type" : "string" }, + { "name" : "platform", "type" : "string" }, + { "name" : "bulletinId", "type" : "long" }, + { "name" : "bulletinCategory", "type" : ["string", "null"] }, + { "name" : "bulletinGroupId", "type" : ["string", "null"] }, + { "name" : "bulletinGroupName", "type" : ["string", "null"] }, + { "name" : "bulletinLevel", "type" : ["string", "null"] }, + { "name" : "bulletinMessage", "type" : ["string", "null"] }, + { "name" : "bulletinNodeAddress", "type" : ["string", "null"] }, + { "name" : "bulletinNodeId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceId", "type" : ["string", "null"] }, + { "name" : "bulletinSourceName", "type" : ["string", "null"] }, + { "name" : "bulletinSourceType", "type" : ["string", "null"] }, + { "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" } + ] +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc new file mode 100644 index 0000000..840bde6 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-provenance.avsc @@ -0,0 +1,35 @@ +{ + "namespace": "nifi", + "name": "provenanceEvent", + "type": "record", + "fields": [ + { "name": "eventId", "type": "string" }, + { "name": "eventOrdinal", "type": "long" }, + { "name": "eventType", "type": "string" }, + { "name": "timestampMillis", "type": "long" }, + { "name": "durationMillis", "type": "long" }, + { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } }, + { "name": "details", "type": "string" }, + { "name": "componentId", "type": "string" }, + { "name": "componentType", "type": "string" }, + { "name": "componentName", "type": "string" }, + { "name": "processGroupId", "type": "string" }, + { "name": "processGroupName", "type": "string" }, + { "name": "entityId", "type": "string" }, + { "name": "entityType", "type": "string" }, + { "name": "entitySize", "type": ["null", "long"] }, + { "name": "previousEntitySize", "type": ["null", "long"] }, + { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } }, + { "name": "previousAttributes", "type": { "type": "map", "values": "string" } }, + { "name": "actorHostname", "type": "string" }, + { "name": "contentURI", "type": "string" }, + { "name": "previousContentURI", "type": "string" }, + { "name": "parentIds", "type": { "type": "array", "items": "string" } }, + { "name": "childIds", "type": { "type": "array", "items": "string" } }, + { "name": "platform", "type": "string" }, + { "name": "application", "type": "string" }, + { "name": "remoteIdentifier", "type": "string" }, + { "name": "alternateIdentifier", "type": "string" }, + { "name": "transitUri", "type": ["null", "string"] } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc new file mode 100644 index 0000000..6f16d0e --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-status.avsc @@ -0,0 +1,78 @@ +{ + "type" : "record", + "name" : "status", + "namespace" : "status", + "fields" : [ + + // common fields for all components + { "name" : "statusId", "type" : "string"}, + { "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } }, + { "name" : "timestamp", "type" : "string"}, + { "name" : "actorHostname", "type" : "string"}, + { "name" : "componentType", "type" : "string"}, + { "name" : "componentName", "type" : "string"}, + { "name" : "parentId", "type" : ["string", "null"]}, + { "name" : "platform", "type" : "string"}, + { "name" : "application", "type" : "string"}, + { "name" : "componentId", "type" : "string"}, + + // PG + RPG + Ports + Processors + { "name" : "activeThreadCount", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "flowFilesReceived", "type" : ["long", "null"]}, + { "name" : "flowFilesSent", "type" : ["long", "null"]}, + + // PG + Ports + Processors + { "name" : "bytesReceived", "type" : ["long", "null"]}, + { "name" : "bytesSent", "type" : ["long", "null"]}, + + // PG + Connections + { "name" : "queuedCount", "type" : ["long", "null"]}, + + // PG + Processors + { "name" : "bytesRead", "type" : ["long", "null"]}, + { "name" : "bytesWritten", "type" : ["long", "null"]}, + + // fields for process group status + { "name" : "bytesTransferred", "type" : ["long", "null"]}, + { "name" : "flowFilesTransferred", "type" : ["long", "null"]}, + { "name" : "inputContentSize", "type" : ["long", "null"]}, + { "name" : "outputContentSize", "type" : ["long", "null"]}, + { "name" : "queuedContentSize", "type" : ["long", "null"]}, + + // fields for remote process groups + { "name" : "activeRemotePortCount", "type" : ["long", "null"]}, + { "name" : "inactiveRemotePortCount", "type" : ["long", "null"]}, + { "name" : "receivedContentSize", "type" : ["long", "null"]}, + { "name" : "receivedCount", "type" : ["long", "null"]}, + { "name" : "sentContentSize", "type" : ["long", "null"]}, + { "name" : "sentCount", "type" : ["long", "null"]}, + { "name" : "averageLineageDuration", "type" : ["long", "null"]}, + + // fields for input/output ports + connections + PG + { "name" : "inputBytes", "type" : ["long", "null"]}, + { "name" : "inputCount", "type" : ["long", "null"]}, + { "name" : "outputBytes", "type" : ["long", "null"]}, + { "name" : "outputCount", "type" : ["long", "null"]}, + + // fields for connections + { "name" : "sourceId", "type" : ["string", "null"]}, + { "name" : "sourceName", "type" : ["string", "null"]}, + { "name" : "destinationId", "type" : ["string", "null"]}, + { "name" : "destinationName", "type" : ["string", "null"]}, + { "name" : "maxQueuedBytes", "type" : ["long", "null"]}, + { "name" : "maxQueuedCount", "type" : ["long", "null"]}, + { "name" : "queuedBytes", "type" : ["long", "null"]}, + { "name" : "backPressureBytesThreshold", "type" : ["long", "null"]}, + { "name" : "backPressureObjectThreshold", "type" : ["long", "null"]}, + { "name" : "isBackPressureEnabled", "type" : ["string", "null"]}, + + // fields for processors + { "name" : "processorType", "type" : ["string", "null"]}, + { "name" : "averageLineageDurationMS", "type" : ["long", "null"]}, + { "name" : "flowFilesRemoved", "type" : ["long", "null"]}, + { "name" : "invocations", "type" : ["long", "null"]}, + { "name" : "processingNanos", "type" : ["long", "null"]} + ] +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java index 6d70442..4f6bd5f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteBulletinReportingTask.java @@ -125,6 +125,10 @@ public class TestSiteToSiteBulletinReportingTask { private static final class MockSiteToSiteBulletinReportingTask extends SiteToSiteBulletinReportingTask { + public MockSiteToSiteBulletinReportingTask() throws IOException { + super(); + } + final List<byte[]> dataSent = new ArrayList<>(); @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java index c699a1c..e4f24cb 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java @@ -17,7 +17,6 @@ package org.apache.nifi.reporting; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -53,7 +52,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.state.MockStateManager; import org.apache.nifi.util.MockPropertyValue; -import org.apache.nifi.util.TestRunner; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -65,7 +63,6 @@ public class TestSiteToSiteMetricsReportingTask { private ReportingContext context; private ProcessGroupStatus status; - private TestRunner runner; @Before public void setup() { http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index 31054c2..d39df59 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -604,6 +604,10 @@ public class TestSiteToSiteProvenanceReportingTask { private static final class MockSiteToSiteProvenanceReportingTask extends SiteToSiteProvenanceReportingTask { + public MockSiteToSiteProvenanceReportingTask() throws IOException { + super(); + } + final List<byte[]> dataSent = new ArrayList<>(); @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/1d21e3ba/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java index 79ba213..6fe795f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteStatusReportingTask.java @@ -60,7 +60,7 @@ public class TestSiteToSiteStatusReportingTask { private ReportingContext context; public MockSiteToSiteStatusReportingTask initTask(Map<PropertyDescriptor, String> customProperties, - ProcessGroupStatus pgStatus) throws InitializationException { + ProcessGroupStatus pgStatus) throws InitializationException, IOException { final MockSiteToSiteStatusReportingTask task = new MockSiteToSiteStatusReportingTask(); Map<PropertyDescriptor, String> properties = new HashMap<>(); for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { @@ -338,6 +338,10 @@ public class TestSiteToSiteStatusReportingTask { private static final class MockSiteToSiteStatusReportingTask extends SiteToSiteStatusReportingTask { + public MockSiteToSiteStatusReportingTask() throws IOException { + super(); + } + final List<byte[]> dataSent = new ArrayList<>(); @Override