[ https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237563#comment-15237563 ]
ASF GitHub Bot commented on MINIFI-13: -------------------------------------- Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/6#discussion_r59414741 --- Diff: minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.minifi.provenance.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestProvenanceReportingTask { + + @Test + public void testSerializedForm() throws IOException, InitializationException { + final String uuid = "10000000-0000-0000-0000-000000000000"; + final Map<String, String> attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final Map<String, String> prevAttrs = new HashMap<>(); + attributes.put("filename", "1234.xyz"); + + final Set<String> lineageIdentifiers = new HashSet<>(); + lineageIdentifiers.add("123"); + lineageIdentifiers.add("321"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setAttributes(prevAttrs, attributes); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + builder.setLineageIdentifiers(lineageIdentifiers); + final ProvenanceEventRecord event = builder.build(); + + final List<byte[]> dataSent = new ArrayList<>(); + final ProvenanceReportingTask task = new ProvenanceReportingTask() { + @SuppressWarnings("unchecked") + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer<Object>() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + }; + + final List<ProvenanceEventRecord> events = new ArrayList<>(); + events.add(event); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000"); + + final ReportingContext context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getStateManager()) + .thenReturn(new MockStateManager(task)); + Mockito.doAnswer(new Answer<PropertyValue>() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor), null); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() { + @Override + public List<ProvenanceEventRecord> answer(final InvocationOnMock invocation) throws Throwable { + final long startId = invocation.getArgumentAt(0, long.class); + final int maxRecords = invocation.getArgumentAt(1, int.class); + + final List<ProvenanceEventRecord> eventsToReturn = new ArrayList<>(); + for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && i < events.size(); i++) { + eventsToReturn.add(events.get(i)); + } + return eventsToReturn; + } + }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); + + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + + task.initialize(initContext); + task.onTrigger(context); + + assertEquals(1, dataSent.size()); + final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8); + System.out.println(msg); --- End diff -- Either remove or introduce a logger for this instead of system out so we can control these items with the appropriate logging configs. > Create a Reporting Task to Send Provenance data > ----------------------------------------------- > > Key: MINIFI-13 > URL: https://issues.apache.org/jira/browse/MINIFI-13 > Project: Apache NiFi MiNiFi > Issue Type: Sub-task > Components: Data Format, Data Transmission > Reporter: Joseph Percivall > Assignee: Joseph Percivall > Fix For: 0.0.1 > > > With initial effort to re-use as much of NiFi as possible it is not possible > to easily create a ProvenanceReporter to add provenance events as attributes > to FlowFiles as it would require changing the ProvenancenReporter interface. > This will require utilizing a different extension point to transmit the > provenance data back to a core NiFi instance. > Probably the most efficient way to do this is to create a ReportingTask which > reports the provenance events using the S2S protocol. > In the future this will probably be retired as a reporting task as MiNiFi > grows to rely less on NiFi but this Reporting task could also be contributed > back to NiFi. -- This message was sent by Atlassian JIRA (v6.3.4#6332)