This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 9b15bbf6b9 NIFI-12054: PutIceberg should produce a provenance send 
event
9b15bbf6b9 is described below

commit 9b15bbf6b9005f9a3e1aece0932cdf6e517086e5
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Thu Sep 14 14:01:36 2023 +0200

    NIFI-12054: PutIceberg should produce a provenance send event
    
    This closes #7690.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../org/apache/nifi/processors/iceberg/PutIceberg.java   |  3 +++
 .../iceberg/TestPutIcebergWithHiveCatalog.java           | 16 ++++++++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index 360ea17f1b..02bd0b074f 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -234,6 +234,7 @@ public class PutIceberg extends AbstractIcebergProcessor {
 
     @Override
     public void doOnTrigger(ProcessContext context, ProcessSession session, 
FlowFile flowFile) throws ProcessException {
+        final long startNanos = System.nanoTime();
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final String fileFormat = context.getProperty(FILE_FORMAT).getValue();
         final String maximumFileSize = 
context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue();
@@ -281,6 +282,8 @@ public class PutIceberg extends AbstractIcebergProcessor {
         }
 
         flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, 
String.valueOf(recordCount));
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        session.getProvenanceReporter().send(flowFile, table.location(), 
transferMillis);
         session.transfer(flowFile, REL_SUCCESS);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index c672d90e8b..bc159ef470 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -32,6 +32,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
 import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory;
 import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService;
 import org.apache.nifi.processors.iceberg.util.IcebergTestUtils;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.RecordField;
@@ -60,6 +62,8 @@ import static 
org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData;
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles;
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.condition.OS.WINDOWS;
 
 @DisabledOnOs(WINDOWS)
@@ -174,6 +178,7 @@ public class TestPutIcebergWithHiveCatalog {
         validateNumberOfDataFiles(tableLocation, 3);
         validatePartitionFolders(tableLocation, Arrays.asList(
                 "department_bucket=0", "department_bucket=1", 
"department_bucket=2"));
+        assertProvenanceEvents();
     }
 
     @ParameterizedTest
@@ -211,6 +216,7 @@ public class TestPutIcebergWithHiveCatalog {
         validateNumberOfDataFiles(tableLocation, 3);
         validatePartitionFolders(tableLocation, Arrays.asList(
                 "department=Finance", "department=Marketing", 
"department=Sales"));
+        assertProvenanceEvents();
     }
 
     @ParameterizedTest
@@ -253,6 +259,7 @@ public class TestPutIcebergWithHiveCatalog {
                 "name=Joana/department=Sales/",
                 "name=John/department=Finance/"
         ));
+        assertProvenanceEvents();
     }
 
     @ParameterizedTest
@@ -287,5 +294,14 @@ public class TestPutIcebergWithHiveCatalog {
         Assertions.assertEquals("4", 
flowFile.getAttribute(ICEBERG_RECORD_COUNT));
         validateData(table, expectedRecords, 0);
         validateNumberOfDataFiles(new URI(table.location()).getPath(), 1);
+        assertProvenanceEvents();
+    }
+
+    private void assertProvenanceEvents() {
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
+        assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + 
TABLE_NAME));
     }
 }

Reply via email to