[ https://issues.apache.org/jira/browse/NIFI-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276769#comment-15276769 ]
ASF GitHub Bot commented on NIFI-1858: -------------------------------------- Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi/pull/419#discussion_r62548156 --- Diff: nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java --- @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.") +public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { + + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final String LAST_EVENT_ID_KEY = "last_event_id"; + + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() + .name("Platform") + .description("The value to use for the platform field in each provenance event.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private volatile long firstEventId = -1L; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PLATFORM); + return properties; + } + + private String getComponentName(final ProcessGroupStatus status, final ProvenanceEventRecord event) { + if (status == null) { + return null; + } + + final String componentId = event.getComponentId(); + if (status.getId().equals(componentId)) { + return status.getName(); + } + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + if (procStatus.getId().equals(componentId)) { + return procStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getInputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final PortStatus portStatus : status.getOutputPortStatus()) { + if (portStatus.getId().equals(componentId)) { + return portStatus.getName(); + } + } + + for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { + if (rpgStatus.getId().equals(componentId)) { + return rpgStatus.getName(); + } + } + + for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { + final String componentName = getComponentName(childGroup, event); + if (componentName != null) { + return componentName; + } + } + + return null; + } + + @Override + public void onTrigger(final ReportingContext context) { + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); + final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); + + Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId(); + + if(currMaxId == null) { + getLogger().debug("No events to send because no events have been created yet."); + return; + } + + if (firstEventId < 0) { + Map<String, String> state; + try { + state = context.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e); + return; + } + if (state.containsKey(LAST_EVENT_ID_KEY)) { + firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; + } + + if(currMaxId < firstEventId){ + getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + + "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId}); + firstEventId = -1; + } + } + + if (currMaxId == (firstEventId - 1)) { + getLogger().debug("No events to send due to the current max id being equal to the last id that was queried."); + return; + } + + List<ProvenanceEventRecord> events; + try { + events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); + return; + } + + if (events == null || events.isEmpty()) { + getLogger().debug("No events to send due to 'events' being null or empty."); + return; + } + + final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue(); + URL url; + try { + url = new URL(nifiUrl); + } catch (final MalformedURLException e1) { + // already validated + throw new AssertionError(); + } + + final String hostname = url.getHost(); + final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + + final Map<String, ?> config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + final JsonObjectBuilder builder = factory.createObjectBuilder(); + + while (events != null && !events.isEmpty()) { + final long start = System.nanoTime(); + + // Create a JSON array of all the events in the current batch + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + for (final ProvenanceEventRecord event : events) { + arrayBuilder.add(serialize(factory, builder, event, getComponentName(procGroupStatus, event), hostname, url, rootGroupName, platform)); + } + final JsonArray jsonArray = arrayBuilder.build(); + + // Send the JSON document for the current batch + try { + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map<String, String> attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + + final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", --- End diff -- Wondering if we should default this to debug? > Add ReportingTask for sending provenance events over Site-To-Site > ----------------------------------------------------------------- > > Key: NIFI-1858 > URL: https://issues.apache.org/jira/browse/NIFI-1858 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Bryan Bende > Assignee: Bryan Bende > Priority: Minor > Fix For: 0.7.0 > > > Currently if someone wants to export Provenance events they can do so through > a ReportingTask. Rather than creating specialized ReportingTasks for > different destinations, another approach would be to send provenance events > to another NiFi instance over site-to-site and then use the standard > processors in that other instance to store the events in the desired > destination. -- This message was sent by Atlassian JIRA (v6.3.4#6332)