[ 
https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237818#comment-15237818
 ] 

ASF GitHub Bot commented on MINIFI-13:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi/pull/6#discussion_r59438406
  
    --- Diff: 
minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java
 ---
    @@ -0,0 +1,443 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.minifi.provenance.reporting;
    +
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +import javax.net.ssl.SSLContext;
    +
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.controller.status.PortStatus;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.controller.status.ProcessorStatus;
    +import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
    +import org.apache.nifi.events.EventReporter;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.reporting.AbstractReportingTask;
    +import org.apache.nifi.reporting.ReportingContext;
    +import org.apache.nifi.reporting.Severity;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.nifi.ssl.SSLContextService.ClientAuth;
    +
    +@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
    +@CapabilityDescription("Publishes Provenance events using the Site To Site 
protocol.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last event Id so that on restart of NiFi the task knows where it left off.")
    +public class ProvenanceReportingTask extends AbstractReportingTask {
    +    private static final String TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    private static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor DESTINATION_URL = new 
PropertyDescriptor.Builder()
    +        .name("Destination URL")
    +        .description("The URL to post the Provenance Events to.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .addValidator(StandardValidators.URL_VALIDATOR)
    +        .build();
    +    static final PropertyDescriptor PORT_NAME = new 
PropertyDescriptor.Builder()
    +        .name("Input Port Name")
    +        .description("The name of the Input Port to delivery Provenance 
Events to.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +    static final PropertyDescriptor SSL_CONTEXT = new 
PropertyDescriptor.Builder()
    +        .name("SSL Context Service")
    +        .description("The SSL Context Service to use when communicating 
with the destination. If not specified, communications will not be secure.")
    +        .required(false)
    +        .identifiesControllerService(SSLContextService.class)
    +        .build();
    +    static final PropertyDescriptor MINIFI_URL = new 
PropertyDescriptor.Builder()
    +        .name("MiNiFi URL")
    +        .description("The URL of this MiNiFi instance. This is used to 
include the Content URI to send to the destination.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("http://${hostname(true)}:8080/nifi")
    +        .addValidator(new NiFiUrlValidator())
    +        .build();
    +    static final PropertyDescriptor COMPRESS = new 
PropertyDescriptor.Builder()
    +        .name("Compress Events")
    +        .description("Indicates whether or not to compress the events when 
being sent.")
    +        .required(true)
    +        .allowableValues("true", "false")
    +        .defaultValue("true")
    +        .build();
    +    static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
    +        .name("Communications Timeout")
    +        .description("Specifies how long to wait to a response from the 
destination before deciding that an error has occurred and canceling the 
transaction")
    +        .required(true)
    +        .defaultValue("30 secs")
    +        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +        .build();
    +    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +        .name("Batch Size")
    +        .description("Specifies how many records to send in a single 
batch, at most.")
    +        .required(true)
    +        .defaultValue("1000")
    +        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +        .build();
    +
    +    private volatile long firstEventId = -1L;
    +    private volatile SiteToSiteClient siteToSiteClient;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new ArrayList<>();
    +        properties.add(DESTINATION_URL);
    +        properties.add(PORT_NAME);
    +        properties.add(SSL_CONTEXT);
    +        properties.add(MINIFI_URL);
    +        properties.add(COMPRESS);
    +        properties.add(TIMEOUT);
    +        properties.add(BATCH_SIZE);
    +        return properties;
    +    }
    +
    +    @OnScheduled
    +    public void setup(final ConfigurationContext context) throws 
IOException {
    +        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class);
    +        final SSLContext sslContext = sslContextService == null ? null : 
sslContextService.createSSLContext(ClientAuth.REQUIRED);
    +        final EventReporter eventReporter = new EventReporter() {
    +            @Override
    +            public void reportEvent(final Severity severity, final String 
category, final String message) {
    +                switch (severity) {
    +                    case WARNING:
    +                        getLogger().warn(message);
    +                        break;
    +                    case ERROR:
    +                        getLogger().error(message);
    +                        break;
    +                    default:
    +                        break;
    +                }
    +            }
    +        };
    +
    +        final String destinationUrlPrefix = 
context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue();
    +        final String destinationUrl = destinationUrlPrefix + 
(destinationUrlPrefix.endsWith("/") ? "nifi" : "/nifi");
    +
    +        siteToSiteClient = new SiteToSiteClient.Builder()
    +            .url(destinationUrl)
    +            .portName(context.getProperty(PORT_NAME).getValue())
    +            .useCompression(context.getProperty(COMPRESS).asBoolean())
    +            .eventReporter(eventReporter)
    +            .sslContext(sslContext)
    +            
.timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS)
    +            .build();
    +    }
    +
    +    @OnStopped
    +    public void shutdown() throws IOException {
    +        final SiteToSiteClient client = getClient();
    +        if (client != null) {
    +            client.close();
    +        }
    +    }
    +
    +    // this getter is intended explicitly for testing purposes
    +    protected SiteToSiteClient getClient() {
    +        return this.siteToSiteClient;
    +    }
    +
    +    private String getComponentName(final ProcessGroupStatus status, final 
ProvenanceEventRecord event) {
    +        if (status == null) {
    +            return null;
    +        }
    +
    +        final String componentId = event.getComponentId();
    +        if (status.getId().equals(componentId)) {
    +            return status.getName();
    +        }
    +
    +        for (final ProcessorStatus procStatus : 
status.getProcessorStatus()) {
    +            if (procStatus.getId().equals(componentId)) {
    +                return procStatus.getName();
    +            }
    +        }
    +
    +        for (final PortStatus portStatus : status.getInputPortStatus()) {
    +            if (portStatus.getId().equals(componentId)) {
    +                return portStatus.getName();
    +            }
    +        }
    +
    +        for (final PortStatus portStatus : status.getOutputPortStatus()) {
    +            if (portStatus.getId().equals(componentId)) {
    +                return portStatus.getName();
    +            }
    +        }
    +
    +        for (final RemoteProcessGroupStatus rpgStatus : 
status.getRemoteProcessGroupStatus()) {
    +            if (rpgStatus.getId().equals(componentId)) {
    +                return rpgStatus.getName();
    +            }
    +        }
    +
    +        for (final ProcessGroupStatus childGroup : 
status.getProcessGroupStatus()) {
    +            final String componentName = getComponentName(childGroup, 
event);
    +            if (componentName != null) {
    +                return componentName;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +        final ProcessGroupStatus procGroupStatus = 
context.getEventAccess().getControllerStatus();
    +        final String rootGroupName = procGroupStatus == null ? null : 
procGroupStatus.getName();
    +
    +        if (firstEventId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = 
context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to 
{}", e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    --- End diff --
    
    Ah good catch, I think the best we can do is to check whether the current 
max id in provenance is less than the last id queried (stored in state).


> Create a Reporting Task to Send Provenance data
> -----------------------------------------------
>
>                 Key: MINIFI-13
>                 URL: https://issues.apache.org/jira/browse/MINIFI-13
>             Project: Apache NiFi MiNiFi
>          Issue Type: Sub-task
>          Components: Data Format, Data Transmission
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>             Fix For: 0.0.1
>
>
> With initial effort to re-use as much of NiFi as possible it is not possible 
> to easily create a ProvenanceReporter to add provenance events as attributes 
> to FlowFiles as it would require changing the ProvenancenReporter interface. 
> This will require utilizing a different extension point to transmit the 
> provenance data back to a core NiFi instance. 
> Probably the most efficient way to do this is to create a ReportingTask which 
> reports the provenance events using the S2S protocol. 
> In the future this will probably be retired as a reporting task as MiNiFi 
> grows to rely less on NiFi but this Reporting task could also be contributed 
> back to NiFi.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to