[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237592#comment-15237592 ]
ASF GitHub Bot commented on MINIFI-13: -------------------------------------- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59417575 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final String LAST_EVENT_ID_KEY = "last_event_id"; + + static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() + .name("Destination URL") + .description("The URL to post the Provenance Events to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() + .name("Input Port Name") + .description("The name of the Input Port to delivery Provenance Events to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + static final PropertyDescriptor MINIFI_URL = new PropertyDescriptor.Builder() + .name("MiNiFi URL") + .description("The URL of this MiNiFi instance. This is used to include the Content URI to send to the destination.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("http://${hostname(true)}:8080/nifi") + .addValidator(new NiFiUrlValidator()) + .build(); + static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() + .name("Compress Events") + .description("Indicates whether or not to compress the events when being sent.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies how many records to send in a single batch, at most.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + private volatile long firstEventId = -1L; + private volatile SiteToSiteClient siteToSiteClient; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DESTINATION_URL); + properties.add(PORT_NAME); + properties.add(SSL_CONTEXT); + properties.add(MINIFI_URL); + properties.add(COMPRESS); + properties.add(TIMEOUT); + properties.add(BATCH_SIZE); + return properties; + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.REQUIRED); + final EventReporter eventReporter = new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + switch (severity) { + case WARNING: + getLogger().warn(message); + break; + case ERROR: + getLogger().error(message); + break; + default: + break; + } + } + }; + + final String destinationUrlPrefix = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); + final String destinationUrl = destinationUrlPrefix + (destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi"); + + siteToSiteClient = new SiteToSiteClient.Builder() + .url(destinationUrl) + .portName(context.getProperty(PORT_NAME).getValue()) + .useCompression(context.getProperty(COMPRESS).asBoolean()) + .eventReporter(eventReporter) + .sslContext(sslContext) + .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .build(); + } + + @OnStopped + public void shutdown() throws IOException { + final SiteToSiteClient client = getClient(); + if (client != null) { + client.close(); + } + } + + // this getter is intended explicitly for testing purposes + protected SiteToSiteClient getClient() { + return this.siteToSiteClient; + } + + private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) { + if (status == null) { + return null; + } + + final String componentId = event.getComponentId(); + if (status.getId().equals(componentId)) { + return status.getName(); + } + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return procStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { + if (rpgStatus.getId().equals(componentId)) { + return rpgStatus.getName(); + } + } + + for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { + final String componentName = getComponentName(childGroup, event); + if (componentName != null) { + return componentName; + } + } + + return null; + } + + @Override + public void onTrigger(final ReportingContext context) { + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); + final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); + + if (firstEventId < 0) { + Map<String, String> state; + try { + state = context.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + getLogger().error("Failed to get state at start up due to {}", e); + return; + } + if (state.containsKey(LAST_EVENT_ID_KEY)) { --- End diff -- Have to take into consideration how we provide a consistent view of provenance data in terms of provenance implementations such as the Volatile item which resets to 0 on a restart > Create a Reporting Task to Send Provenance data > ----------------------------------------------- > > Key: MINIFI-13 > URL: https://issues.apache.org/jira/browse/MINIFI-13 > Project: Apache NiFi MiNiFi > Issue Type: Sub-task > Components: Data Format, Data Transmission > Reporter: Joseph Percivall > Assignee: Joseph Percivall > Fix For: 0.0.1 > > > With initial effort to re-use as much of NiFi as possible it is not possible > to easily create a ProvenanceReporter to add provenance events as attributes > to FlowFiles as it would require changing the ProvenancenReporter interface. > This will require utilizing a different extension point to transmit the > provenance data back to a core NiFi instance. > Probably the most efficient way to do this is to create a ReportingTask which > reports the provenance events using the S2S protocol. > In the future this will probably be retired as a reporting task as MiNiFi > grows to rely less on NiFi but this Reporting task could also be contributed > back to NiFi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)