[ 
https://issues.apache.org/jira/browse/MINIFI-13?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237563#comment-15237563
 ] 

ASF GitHub Bot commented on MINIFI-13:
--------------------------------------

Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi/pull/6#discussion_r59414741
  
    --- Diff: 
minifi-nar-bundles/minifi-provenance-reporting-bundle/minifi-provenance-reporting-task/src/main/test/java/org/apache/nifi/minifi/provenance/reporting/TestProvenanceReportingTask.java
 ---
    @@ -0,0 +1,218 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.minifi.provenance.reporting;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.provenance.ProvenanceEventBuilder;
    +import org.apache.nifi.provenance.ProvenanceEventRecord;
    +import org.apache.nifi.provenance.ProvenanceEventType;
    +import org.apache.nifi.provenance.StandardProvenanceEventRecord;
    +import org.apache.nifi.remote.Transaction;
    +import org.apache.nifi.remote.TransferDirection;
    +import org.apache.nifi.remote.client.SiteToSiteClient;
    +import org.apache.nifi.reporting.EventAccess;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.reporting.ReportingContext;
    +import org.apache.nifi.reporting.ReportingInitializationContext;
    +import org.apache.nifi.state.MockStateManager;
    +import org.apache.nifi.util.MockPropertyValue;
    +import org.junit.Assert;
    +import org.junit.Test;
    +import org.mockito.Mockito;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +public class TestProvenanceReportingTask {
    +
    +    @Test
    +    public void testSerializedForm() throws IOException, 
InitializationException {
    +        final String uuid = "10000000-0000-0000-0000-000000000000";
    +        final Map<String, String> attributes = new HashMap<>();
    +        attributes.put("abc", "xyz");
    +        attributes.put("xyz", "abc");
    +        attributes.put("filename", "file-" + uuid);
    +
    +        final Map<String, String> prevAttrs = new HashMap<>();
    +        attributes.put("filename", "1234.xyz");
    +
    +        final Set<String> lineageIdentifiers = new HashSet<>();
    +        lineageIdentifiers.add("123");
    +        lineageIdentifiers.add("321");
    +
    +        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
    +        builder.setEventTime(System.currentTimeMillis());
    +        builder.setEventType(ProvenanceEventType.RECEIVE);
    +        builder.setTransitUri("nifi://unit-test");
    +        attributes.put("uuid", uuid);
    +        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
    +        builder.setAttributes(prevAttrs, attributes);
    +        builder.setComponentId("1234");
    +        builder.setComponentType("dummy processor");
    +        builder.setLineageIdentifiers(lineageIdentifiers);
    +        final ProvenanceEventRecord event = builder.build();
    +
    +        final List<byte[]> dataSent = new ArrayList<>();
    +        final ProvenanceReportingTask task = new ProvenanceReportingTask() 
{
    +            @SuppressWarnings("unchecked")
    +            @Override
    +            protected SiteToSiteClient getClient() {
    +                final SiteToSiteClient client = 
Mockito.mock(SiteToSiteClient.class);
    +                final Transaction transaction = 
Mockito.mock(Transaction.class);
    +
    +                try {
    +                    Mockito.doAnswer(new Answer<Object>() {
    +                        @Override
    +                        public Object answer(final InvocationOnMock 
invocation) throws Throwable {
    +                            final byte[] data = 
invocation.getArgumentAt(0, byte[].class);
    +                            dataSent.add(data);
    +                            return null;
    +                        }
    +                    }).when(transaction).send(Mockito.any(byte[].class), 
Mockito.any(Map.class));
    +
    +                    
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
    +                } catch (final Exception e) {
    +                    e.printStackTrace();
    +                    Assert.fail(e.toString());
    +                }
    +
    +                return client;
    +            }
    +        };
    +
    +        final List<ProvenanceEventRecord> events = new ArrayList<>();
    +        events.add(event);
    +
    +        final Map<PropertyDescriptor, String> properties = new HashMap<>();
    +        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
    +            properties.put(descriptor, descriptor.getDefaultValue());
    +        }
    +        properties.put(ProvenanceReportingTask.BATCH_SIZE, "1000");
    +
    +        final ReportingContext context = 
Mockito.mock(ReportingContext.class);
    +        Mockito.when(context.getStateManager())
    +                .thenReturn(new MockStateManager(task));
    +        Mockito.doAnswer(new Answer<PropertyValue>() {
    +            @Override
    +            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
    +                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
    +                return new MockPropertyValue(properties.get(descriptor), 
null);
    +            }
    +        
}).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
    +
    +        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
    +        Mockito.doAnswer(new Answer<List<ProvenanceEventRecord>>() {
    +            @Override
    +            public List<ProvenanceEventRecord> answer(final 
InvocationOnMock invocation) throws Throwable {
    +                final long startId = invocation.getArgumentAt(0, 
long.class);
    +                final int maxRecords = invocation.getArgumentAt(1, 
int.class);
    +
    +                final List<ProvenanceEventRecord> eventsToReturn = new 
ArrayList<>();
    +                for (int i = (int) Math.max(0, startId); i < (int) 
(startId + maxRecords) && i < events.size(); i++) {
    +                    eventsToReturn.add(events.get(i));
    +                }
    +                return eventsToReturn;
    +            }
    +        }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), 
Mockito.anyInt());
    +
    +        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
    +
    +        final ComponentLog logger = Mockito.mock(ComponentLog.class);
    +        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
    +        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
    +        Mockito.when(initContext.getLogger()).thenReturn(logger);
    +
    +
    +        task.initialize(initContext);
    +        task.onTrigger(context);
    +
    +        assertEquals(1, dataSent.size());
    +        final String msg = new String(dataSent.get(0), 
StandardCharsets.UTF_8);
    +        System.out.println(msg);
    --- End diff --
    
    Either remove or introduce a logger for this instead of system out so we 
can control these items with the appropriate logging configs.


> Create a Reporting Task to Send Provenance data
> -----------------------------------------------
>
>                 Key: MINIFI-13
>                 URL: https://issues.apache.org/jira/browse/MINIFI-13
>             Project: Apache NiFi MiNiFi
>          Issue Type: Sub-task
>          Components: Data Format, Data Transmission
>            Reporter: Joseph Percivall
>            Assignee: Joseph Percivall
>             Fix For: 0.0.1
>
>
> With initial effort to re-use as much of NiFi as possible it is not possible 
> to easily create a ProvenanceReporter to add provenance events as attributes 
> to FlowFiles as it would require changing the ProvenancenReporter interface. 
> This will require utilizing a different extension point to transmit the 
> provenance data back to a core NiFi instance. 
> Probably the most efficient way to do this is to create a ReportingTask which 
> reports the provenance events using the S2S protocol. 
> In the future this will probably be retired as a reporting task as MiNiFi 
> grows to rely less on NiFi but this Reporting task could also be contributed 
> back to NiFi.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to