[ 
https://issues.apache.org/jira/browse/NIFI-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276769#comment-15276769
 ] 

ASF GitHub Bot commented on NIFI-1858:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/419#discussion_r62548156
  
    --- Diff: 
nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 ---
    @@ -0,0 +1,354 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.reporting;
    +
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.controller.status.PortStatus;
    +import org.apache.nifi.controller.status.ProcessGroupStatus;
    +import org.apache.nifi.controller.status.ProcessorStatus;
    +import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +
    +import javax.json.Json;
    +import javax.json.JsonArray;
    +import javax.json.JsonArrayBuilder;
    +import javax.json.JsonBuilderFactory;
    +import javax.json.JsonObject;
    +import javax.json.JsonObjectBuilder;
    +import java.io.IOException;
    +import java.net.MalformedURLException;
    +import java.net.URL;
    +import java.nio.charset.StandardCharsets;
    +import java.text.DateFormat;
    +import java.text.SimpleDateFormat;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TimeZone;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"provenance", "lineage", "tracking", "site", "site to site"})
    +@CapabilityDescription("Publishes Provenance events using the Site To Site 
protocol.")
    +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's 
last event Id so that on restart the task knows where it left off.")
    +public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReportingTask {
    +
    +    private static final String TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
    +    private static final String LAST_EVENT_ID_KEY = "last_event_id";
    +
    +    static final PropertyDescriptor PLATFORM = new 
PropertyDescriptor.Builder()
    +        .name("Platform")
    +        .description("The value to use for the platform field in each 
provenance event.")
    +        .required(true)
    +        .expressionLanguageSupported(true)
    +        .defaultValue("nifi")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    private volatile long firstEventId = -1L;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
    +        properties.add(PLATFORM);
    +        return properties;
    +    }
    +
    +    private String getComponentName(final ProcessGroupStatus status, final 
ProvenanceEventRecord event) {
    +        if (status == null) {
    +            return null;
    +        }
    +
    +        final String componentId = event.getComponentId();
    +        if (status.getId().equals(componentId)) {
    +            return status.getName();
    +        }
    +
    +        for (final ProcessorStatus procStatus : 
status.getProcessorStatus()) {
    +            if (procStatus.getId().equals(componentId)) {
    +                return procStatus.getName();
    +            }
    +        }
    +
    +        for (final PortStatus portStatus : status.getInputPortStatus()) {
    +            if (portStatus.getId().equals(componentId)) {
    +                return portStatus.getName();
    +            }
    +        }
    +
    +        for (final PortStatus portStatus : status.getOutputPortStatus()) {
    +            if (portStatus.getId().equals(componentId)) {
    +                return portStatus.getName();
    +            }
    +        }
    +
    +        for (final RemoteProcessGroupStatus rpgStatus : 
status.getRemoteProcessGroupStatus()) {
    +            if (rpgStatus.getId().equals(componentId)) {
    +                return rpgStatus.getName();
    +            }
    +        }
    +
    +        for (final ProcessGroupStatus childGroup : 
status.getProcessGroupStatus()) {
    +            final String componentName = getComponentName(childGroup, 
event);
    +            if (componentName != null) {
    +                return componentName;
    +            }
    +        }
    +
    +        return null;
    +    }
    +
    +    @Override
    +    public void onTrigger(final ReportingContext context) {
    +        final ProcessGroupStatus procGroupStatus = 
context.getEventAccess().getControllerStatus();
    +        final String rootGroupName = procGroupStatus == null ? null : 
procGroupStatus.getName();
    +
    +        Long currMaxId = 
context.getEventAccess().getProvenanceRepository().getMaxEventId();
    +
    +        if(currMaxId == null) {
    +            getLogger().debug("No events to send because no events have 
been created yet.");
    +            return;
    +        }
    +
    +        if (firstEventId < 0) {
    +            Map<String, String> state;
    +            try {
    +                state = 
context.getStateManager().getState(Scope.LOCAL).toMap();
    +            } catch (IOException e) {
    +                getLogger().error("Failed to get state at start up due to 
{}:"+e.getMessage(), e);
    +                return;
    +            }
    +            if (state.containsKey(LAST_EVENT_ID_KEY)) {
    +                firstEventId = 
Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1;
    +            }
    +
    +            if(currMaxId < firstEventId){
    +                getLogger().debug("Current provenance max id is {} which 
is less than what was stored in state as the last queried event, which was {}. 
This means the provenance restarted its " +
    +                        "ids. Restarting querying from the beginning.", 
new Object[]{currMaxId, firstEventId});
    +                firstEventId = -1;
    +            }
    +        }
    +
    +        if (currMaxId == (firstEventId - 1)) {
    +            getLogger().debug("No events to send due to the current max id 
being equal to the last id that was queried.");
    +            return;
    +        }
    +
    +        List<ProvenanceEventRecord> events;
    +        try {
    +            events = 
context.getEventAccess().getProvenanceEvents(firstEventId, 
context.getProperty(BATCH_SIZE).asInteger());
    +        } catch (final IOException ioe) {
    +            getLogger().error("Failed to retrieve Provenance Events from 
repository due to: " + ioe.getMessage(), ioe);
    +            return;
    +        }
    +
    +        if (events == null || events.isEmpty()) {
    +            getLogger().debug("No events to send due to 'events' being 
null or empty.");
    +            return;
    +        }
    +
    +        final String nifiUrl = 
context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
    +        URL url;
    +        try {
    +            url = new URL(nifiUrl);
    +        } catch (final MalformedURLException e1) {
    +            // already validated
    +            throw new AssertionError();
    +        }
    +
    +        final String hostname = url.getHost();
    +        final String platform = 
context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
    +
    +        final Map<String, ?> config = Collections.emptyMap();
    +        final JsonBuilderFactory factory = 
Json.createBuilderFactory(config);
    +        final JsonObjectBuilder builder = factory.createObjectBuilder();
    +
    +        while (events != null && !events.isEmpty()) {
    +            final long start = System.nanoTime();
    +
    +            // Create a JSON array of all the events in the current batch
    +            final JsonArrayBuilder arrayBuilder = 
factory.createArrayBuilder();
    +            for (final ProvenanceEventRecord event : events) {
    +                arrayBuilder.add(serialize(factory, builder, event, 
getComponentName(procGroupStatus, event), hostname, url, rootGroupName, 
platform));
    +            }
    +            final JsonArray jsonArray = arrayBuilder.build();
    +
    +            // Send the JSON document for the current batch
    +            try {
    +                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
    +                if (transaction == null) {
    +                    getLogger().debug("All destination nodes are 
penalized; will attempt to send data later");
    +                    return;
    +                }
    +
    +                final Map<String, String> attributes = new HashMap<>();
    +                final String transactionId = UUID.randomUUID().toString();
    +                attributes.put("reporting.task.transaction.id", 
transactionId);
    +
    +                final byte[] data = 
jsonArray.toString().getBytes(StandardCharsets.UTF_8);
    +                transaction.send(data, attributes);
    +                transaction.confirm();
    +                transaction.complete();
    +
    +                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    +                getLogger().info("Successfully sent {} Provenance Events 
to destination in {} ms; Transaction ID = {}; First Event ID = {}",
    --- End diff --
    
    Wondering if we should default this to debug?


> Add ReportingTask for sending provenance events over Site-To-Site
> -----------------------------------------------------------------
>
>                 Key: NIFI-1858
>                 URL: https://issues.apache.org/jira/browse/NIFI-1858
>             Project: Apache NiFi
>          Issue Type: Improvement
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 0.7.0
>
>
> Currently if someone wants to export Provenance events they can do so through 
> a ReportingTask. Rather than creating specialized ReportingTasks for 
> different destinations, another approach would be to send provenance events 
> to another NiFi instance over site-to-site and then use the standard 
> processors  in that other instance to store the events in the desired 
> destination.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to