This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new ea4c2055d6 NIFI-12054: PutIceberg should produce a provenance send event ea4c2055d6 is described below commit ea4c2055d6d884a13e23304fff2f72309587e785 Author: Mark Bathori <bathori.m...@gmail.com> AuthorDate: Thu Sep 14 14:01:36 2023 +0200 NIFI-12054: PutIceberg should produce a provenance send event This closes #7690. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../org/apache/nifi/processors/iceberg/PutIceberg.java | 3 +++ .../iceberg/TestPutIcebergWithHiveCatalog.java | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java index 360ea17f1b..02bd0b074f 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java @@ -234,6 +234,7 @@ public class PutIceberg extends AbstractIcebergProcessor { @Override public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException { + final long startNanos = System.nanoTime(); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final String fileFormat = context.getProperty(FILE_FORMAT).getValue(); final String maximumFileSize = context.getProperty(MAXIMUM_FILE_SIZE).evaluateAttributeExpressions(flowFile).getValue(); @@ -281,6 +282,8 @@ public class PutIceberg extends AbstractIcebergProcessor { } flowFile = session.putAttribute(flowFile, ICEBERG_RECORD_COUNT, String.valueOf(recordCount)); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, table.location(), transferMillis); session.transfer(flowFile, REL_SUCCESS); } diff --git a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java index c672d90e8b..bc159ef470 100644 --- a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java +++ b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java @@ -32,6 +32,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore; import org.apache.nifi.processors.iceberg.catalog.IcebergCatalogFactory; import org.apache.nifi.processors.iceberg.catalog.TestHiveCatalogService; import org.apache.nifi.processors.iceberg.util.IcebergTestUtils; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.RecordField; @@ -60,6 +62,8 @@ import static org.apache.nifi.processors.iceberg.PutIceberg.ICEBERG_RECORD_COUNT import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateData; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateNumberOfDataFiles; import static org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.condition.OS.WINDOWS; @DisabledOnOs(WINDOWS) @@ -174,6 +178,7 @@ public class TestPutIcebergWithHiveCatalog { validateNumberOfDataFiles(tableLocation, 3); validatePartitionFolders(tableLocation, Arrays.asList( "department_bucket=0", "department_bucket=1", "department_bucket=2")); + assertProvenanceEvents(); } @ParameterizedTest @@ -211,6 +216,7 @@ public class TestPutIcebergWithHiveCatalog { validateNumberOfDataFiles(tableLocation, 3); validatePartitionFolders(tableLocation, Arrays.asList( "department=Finance", "department=Marketing", "department=Sales")); + assertProvenanceEvents(); } @ParameterizedTest @@ -253,6 +259,7 @@ public class TestPutIcebergWithHiveCatalog { "name=Joana/department=Sales/", "name=John/department=Finance/" )); + assertProvenanceEvents(); } @ParameterizedTest @@ -287,5 +294,14 @@ public class TestPutIcebergWithHiveCatalog { Assertions.assertEquals("4", flowFile.getAttribute(ICEBERG_RECORD_COUNT)); validateData(table, expectedRecords, 0); validateNumberOfDataFiles(new URI(table.location()).getPath(), 1); + assertProvenanceEvents(); + } + + private void assertProvenanceEvents() { + final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + final ProvenanceEventRecord sendEvent = provenanceEvents.get(0); + assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType()); + assertTrue(sendEvent.getTransitUri().endsWith(CATALOG_NAME + ".db/" + TABLE_NAME)); } }