[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237999#comment-15237999 ] ASF GitHub Bot commented on MINIFI-13: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi-minifi/pull/6 > Create a Reporting Task to Send Provenance data > --- > > Key: MINIFI-13 > URL: https://issues.apache.org/jira/browse/MINIFI-13 > Project: Apache NiFi MiNiFi > Issue Type: Sub-task > Components: Data Format, Data Transmission >Reporter: Joseph Percivall >Assignee: Joseph Percivall > Fix For: 0.0.1 > > > With initial effort to re-use as much of NiFi as possible it is not possible > to easily create a ProvenanceReporter to add provenance events as attributes > to FlowFiles as it would require changing the ProvenancenReporter interface. > This will require utilizing a different extension point to transmit the > provenance data back to a core NiFi instance. > Probably the most efficient way to do this is to create a ReportingTask which > reports the provenance events using the S2S protocol. > In the future this will probably be retired as a reporting task as MiNiFi > grows to rely less on NiFi but this Reporting task could also be contributed > back to NiFi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237933#comment-15237933 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59449408 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -251,6 +254,17 @@ public void onTrigger(final ReportingContext context) { if (state.containsKey(LAST_EVENT_ID_KEY)) { firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; } + +if(currMaxId < firstEventId){ +getLogger().debug("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted it's " + --- End diff -- super minor: it's -> its > Create a Reporting Task to Send Provenance data > --- > > Key: MINIFI-13 > URL: https://issues.apache.org/jira/browse/MINIFI-13 > Project: Apache NiFi MiNiFi > Issue Type: Sub-task > Components: Data Format, Data Transmission >Reporter: Joseph Percivall >Assignee: Joseph Percivall > Fix For: 0.0.1 > > > With initial effort to re-use as much of NiFi as possible it is not possible > to easily create a ProvenanceReporter to add provenance events as attributes > to FlowFiles as it would require changing the ProvenancenReporter interface. > This will require utilizing a different extension point to transmit the > provenance data back to a core NiFi instance. > Probably the most efficient way to do this is to create a ReportingTask which > reports the provenance events using the S2S protocol. > In the future this will probably be retired as a reporting task as MiNiFi > grows to rely less on NiFi but this Reporting task could also be contributed > back to NiFi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237890#comment-15237890 ] ASF GitHub Bot commented on MINIFI-13: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59445052 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestProvenanceReportingTask { + +@Test +public void testSerializedForm() throws IOException, InitializationException { +final String uuid = "1000----"; +final Map attributes = new HashMap<>(); +attributes.put("abc", "xyz"); +attributes.put("xyz", "abc"); +attributes.put("filename", "file-" + uuid); + +final Map prevAttrs = new HashMap<>(); +attributes.put("filename", "1234.xyz"); + +final Set lineageIdentifiers = new HashSet<>(); +lineageIdentifiers.add("123"); +lineageIdentifiers.add("321"); + +final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); +builder.setEventTime(System.currentTimeMillis()); +builder.setEventType(ProvenanceEventType.RECEIVE); +builder.setTransitUri("nifi://unit-test"); +attributes.put("uuid", uuid); +builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); +builder.setAttributes(prevAttrs, attributes); +builder.setComponentId("1234"); +builder.setComponentType("dummy processor"); +builder.setLineageIdentifiers(lineageIdentifiers); +final ProvenanceEventRecord event = builder.build(); + +final List dataSent = new ArrayList<>(); +final ProvenanceReportingTask task = new ProvenanceReportingTask() { +@SuppressWarnings("unchecked") +@Override +protected SiteToSiteClient getClient() { +final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); +final Transaction transaction = Mockito.mock(Transaction.class); + +try { +Mockito.doAnswer(new
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237823#comment-15237823 ] ASF GitHub Bot commented on MINIFI-13: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59438870 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to post the Provenance Events to.") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); +static
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237818#comment-15237818 ] ASF GitHub Bot commented on MINIFI-13: -- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59438406 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to post the Provenance Events to.") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); +static
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237593#comment-15237593 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59417825 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to post the Provenance Events to.") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); +static final
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237592#comment-15237592 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59417575 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to post the Provenance Events to.") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); +static final
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237588#comment-15237588 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59417236 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/java/org/apache/nifi/minifi/provenance/reporting/ProvenanceReportingTask.java --- @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart of NiFi the task knows where it left off.") +public class ProvenanceReportingTask extends AbstractReportingTask { +private static final String TIMESTAMP_FORMAT = "-MM-dd'T'HH:mm:ss.SSS'Z'"; +private static final String LAST_EVENT_ID_KEY = "last_event_id"; + +static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() +.name("Destination URL") +.description("The URL to post the Provenance Events to.") +.required(true) +.expressionLanguageSupported(true) +.addValidator(StandardValidators.URL_VALIDATOR) +.build(); +static final
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237563#comment-15237563 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59414741 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestProvenanceReportingTask { + +@Test +public void testSerializedForm() throws IOException, InitializationException { +final String uuid = "1000----"; +final Map attributes = new HashMap<>(); +attributes.put("abc", "xyz"); +attributes.put("xyz", "abc"); +attributes.put("filename", "file-" + uuid); + +final Map prevAttrs = new HashMap<>(); +attributes.put("filename", "1234.xyz"); + +final Set lineageIdentifiers = new HashSet<>(); +lineageIdentifiers.add("123"); +lineageIdentifiers.add("321"); + +final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); +builder.setEventTime(System.currentTimeMillis()); +builder.setEventType(ProvenanceEventType.RECEIVE); +builder.setTransitUri("nifi://unit-test"); +attributes.put("uuid", uuid); +builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); +builder.setAttributes(prevAttrs, attributes); +builder.setComponentId("1234"); +builder.setComponentType("dummy processor"); +builder.setLineageIdentifiers(lineageIdentifiers); +final ProvenanceEventRecord event = builder.build(); + +final List dataSent = new ArrayList<>(); +final ProvenanceReportingTask task = new ProvenanceReportingTask() { +@SuppressWarnings("unchecked") +@Override +protected SiteToSiteClient getClient() { +final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); +final Transaction transaction = Mockito.mock(Transaction.class); + +try { +Mockito.doAnswer(new Answ
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237555#comment-15237555 ] ASF GitHub Bot commented on MINIFI-13: -- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59413942 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestProvenanceReportingTask { + +@Test +public void testSerializedForm() throws IOException, InitializationException { +final String uuid = "1000----"; +final Map attributes = new HashMap<>(); +attributes.put("abc", "xyz"); +attributes.put("xyz", "abc"); +attributes.put("filename", "file-" + uuid); + +final Map prevAttrs = new HashMap<>(); +attributes.put("filename", "1234.xyz"); + +final Set lineageIdentifiers = new HashSet<>(); +lineageIdentifiers.add("123"); +lineageIdentifiers.add("321"); + +final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); +builder.setEventTime(System.currentTimeMillis()); +builder.setEventType(ProvenanceEventType.RECEIVE); +builder.setTransitUri("nifi://unit-test"); +attributes.put("uuid", uuid); +builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); +builder.setAttributes(prevAttrs, attributes); +builder.setComponentId("1234"); +builder.setComponentType("dummy processor"); +builder.setLineageIdentifiers(lineageIdentifiers); +final ProvenanceEventRecord event = builder.build(); + +final List dataSent = new ArrayList<>(); +final ProvenanceReportingTask task = new ProvenanceReportingTask() { +@SuppressWarnings("unchecked") +@Override +protected SiteToSiteClient getClient() { +final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); +final Transaction transaction = Mockito.mock(Transaction.class); + +try { +Mockito.doAnswer(new Answ
[jira] [Commented] (MINIFI-13) Create a Reporting Task to Send Provenance data
[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15232508#comment-15232508 ] ASF GitHub Bot commented on MINIFI-13: -- GitHub user JPercivall opened a pull request: https://github.com/apache/nifi-minifi/pull/6 MINIFI-13 created a provenance reporting task to send provenance info… …rmation via S2S You can merge this pull request into a Git repository by running: $ git pull https://github.com/JPercivall/nifi-minifi MINIFI-13 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi-minifi/pull/6.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6 commit 6695855449bd076ddbdd76676c435b2cf9ccf2c1 Author: Joseph Percivall Date: 2016-04-08T17:17:33Z MINIFI-13 created a provenance reporting task to send provenance information via S2S > Create a Reporting Task to Send Provenance data > --- > > Key: MINIFI-13 > URL: https://issues.apache.org/jira/browse/MINIFI-13 > Project: Apache NiFi MiNiFi > Issue Type: Sub-task > Components: Data Format, Data Transmission >Reporter: Joseph Percivall >Assignee: Joseph Percivall > Fix For: 0.0.1 > > > With initial effort to re-use as much of NiFi as possible it is not possible > to easily create a ProvenanceReporter to add provenance events as attributes > to FlowFiles as it would require changing the ProvenancenReporter interface. > This will require utilizing a different extension point to transmit the > provenance data back to a core NiFi instance. > Probably the most efficient way to do this is to create a ReportingTask which > reports the provenance events using the S2S protocol. > In the future this will probably be retired as a reporting task as MiNiFi > grows to rely less on NiFi but this Reporting task could also be contributed > back to NiFi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)