This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 85b36cac609f752040bbe8b3a5a676a22988d8d6
Author: Mark Bathori <bathori.m...@gmail.com>
AuthorDate: Wed Feb 1 20:21:51 2023 +0100

    NIFI-11124: Add hadoop.file.url attribute to HDFS processors
    
    This closes #6916.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../processors/hadoop/AbstractHadoopProcessor.java |  1 +
 .../processors/hadoop/AbstractFetchHDFSRecord.java | 31 ++++++++++---------
 .../processors/hadoop/AbstractPutHDFSRecord.java   |  1 +
 .../apache/nifi/processors/hadoop/DeleteHDFS.java  |  2 ++
 .../apache/nifi/processors/hadoop/FetchHDFS.java   |  9 ++++--
 .../apache/nifi/processors/hadoop/MoveHDFS.java    |  6 +++-
 .../org/apache/nifi/processors/hadoop/PutHDFS.java |  2 ++
 .../apache/nifi/processors/hadoop/PutHDFSTest.java |  6 ++++
 .../nifi/processors/hadoop/TestDeleteHDFS.java     |  5 +++
 .../nifi/processors/hadoop/TestFetchHDFS.java      |  4 +++
 .../org/apache/nifi/processors/orc/PutORC.java     |  1 +
 .../org/apache/nifi/processors/orc/PutORCTest.java |  3 ++
 .../nifi/processors/parquet/FetchParquet.java      |  3 +-
 .../apache/nifi/processors/parquet/PutParquet.java |  1 +
 .../nifi/processors/parquet/FetchParquetTest.java  |  2 ++
 .../nifi/processors/parquet/PutParquetTest.java    | 36 ++++++++++++----------
 16 files changed, 77 insertions(+), 36 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index f7c082bf12..a967b9037a 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -158,6 +158,7 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor implemen
 
 
     public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
+    public static final String HADOOP_FILE_URL_ATTRIBUTE = "hadoop.file.url";
 
     protected static final String TARGET_HDFS_DIR_CREATED_ATTRIBUTE = 
"target.dir.created";
 
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 33e762fa31..03770ac56f 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -16,21 +16,6 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import java.io.BufferedOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,6 +42,21 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * Base processor for reading a data from HDFS that can be fetched into 
records.
  */
@@ -234,6 +234,7 @@ public abstract class AbstractFetchHDFSRecord extends 
AbstractHadoopProcessor {
 
 
                 final Path qualifiedPath = 
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                successFlowFile = session.putAttribute(successFlowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                 getLogger().info("Successfully received content from {} for {} 
in {} milliseconds", new Object[] {qualifiedPath, successFlowFile, 
stopWatch.getDuration()});
                 session.getProvenanceReporter().fetch(successFlowFile, 
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                 session.transfer(successFlowFile, REL_SUCCESS);
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index c2fb3bd725..5aef6b4621 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -378,6 +378,7 @@ public abstract class AbstractPutHDFSRecord extends 
AbstractHadoopProcessor {
 
                 // Send a provenance event and transfer to success
                 final Path qualifiedPath = 
destFile.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                putFlowFile = session.putAttribute(putFlowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                 session.getProvenanceReporter().send(putFlowFile, 
qualifiedPath.toString());
                 session.transfer(putFlowFile, REL_SUCCESS);
 
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index 2c9285fab5..6b676f9493 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -72,6 +72,7 @@ import java.util.regex.Pattern;
                 + "If multiple files are deleted, then only the last filename 
is set."),
         @WritesAttribute(attribute="hdfs.path", description="HDFS Path 
specified in the delete request. "
                 + "If multiple paths are deleted, then only the last path is 
set."),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file to be deleted."),
         @WritesAttribute(attribute="hdfs.error.message", description="HDFS 
error message related to the hdfs.error.code")
 })
 @SeeAlso({ListHDFS.class, PutHDFS.class})
@@ -176,6 +177,7 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
                             fileSystem.delete(path, isRecursive(context, 
session));
                             getLogger().debug("For flowfile {} Deleted file at 
path {} with name {}", new Object[]{originalFlowFile, 
path.getParent().toString(), path.getName()});
                             final Path qualifiedPath = 
path.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory());
+                            flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                             
session.getProvenanceReporter().invokeRemoteProcess(flowFile, 
qualifiedPath.toString());
                         } catch (IOException ioe) {
                             // One possible scenario is that the IOException 
is permissions based, however it would be impractical to check every possible
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
index e4acaac109..2a4986cadd 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java
@@ -31,6 +31,7 @@ import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.Restriction;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -61,8 +62,11 @@ import java.util.concurrent.TimeUnit;
 @Tags({"hadoop", "hcfs", "hdfs", "get", "ingest", "fetch", "source"})
 @CapabilityDescription("Retrieves a file from HDFS. The content of the 
incoming FlowFile is replaced by the content of the file in HDFS. "
         + "The file in HDFS is left intact without any changes being made to 
it.")
-@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile 
is routed to 'failure', this attribute is added indicating why the file could "
-        + "not be fetched from HDFS")
+@WritesAttributes({
+    @WritesAttribute(attribute="hdfs.failure.reason", description="When a 
FlowFile is routed to 'failure', this attribute is added indicating why the 
file could "
+        + "not be fetched from HDFS"),
+    @WritesAttribute(attribute = "hadoop.file.url", description = "The hadoop 
url for the file is stored in this attribute.")
+})
 @SeeAlso({ListHDFS.class, GetHDFS.class, PutHDFS.class})
 @Restricted(restrictions = {
     @Restriction(
@@ -173,6 +177,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
 
                     stopWatch.stop();
                     getLogger().info("Successfully received content from {} 
for {} in {}", new Object[] {qualifiedPath, flowFile, stopWatch.getDuration()});
+                    flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                     session.getProvenanceReporter().fetch(flowFile, 
qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                     session.transfer(flowFile, getSuccessRelationship());
                 } catch (final FileNotFoundException | AccessControlException 
e) {
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index a01d236f51..d7a3d13045 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -75,7 +75,9 @@ import java.util.regex.Pattern;
 @ReadsAttribute(attribute = "filename", description = "The name of the file 
written to HDFS comes from the value of this attribute.")
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of 
the file written to HDFS is stored in this attribute."),
-        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute.")})
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute.")
+})
 @SeeAlso({PutHDFS.class, GetHDFS.class})
 @Restricted(restrictions = {
     @Restriction(
@@ -426,6 +428,8 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                         final String hdfsPath = newFile.getParent().toString();
                         flowFile = session.putAttribute(flowFile, 
CoreAttributes.FILENAME.key(), newFilename);
                         flowFile = session.putAttribute(flowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
+                        final Path qualifiedPath = 
newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                        flowFile = session.putAttribute(flowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                         final String transitUri = hdfs.getUri() + 
StringUtils.prependIfMissing(outputPath, "/");
                         session.getProvenanceReporter().send(flowFile, 
transitUri);
                         session.transfer(flowFile, REL_SUCCESS);
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index eb7dfb56bb..91e91ff7b1 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -89,6 +89,7 @@ import java.util.stream.Stream;
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of 
the file written to HDFS is stored in this attribute."),
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file on HDFS is stored in this attribute."),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute."),
         @WritesAttribute(attribute = "target.dir.created", description = "The 
result(true/false) indicates if the folder is created by the processor.")
 })
 @SeeAlso(GetHDFS.class)
@@ -456,6 +457,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     putFlowFile = session.putAttribute(putFlowFile, 
ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
                     putFlowFile = session.putAttribute(putFlowFile, 
TARGET_HDFS_DIR_CREATED_ATTRIBUTE, String.valueOf(targetDirCreated));
                     final Path qualifiedPath = 
copyFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
+                    putFlowFile = session.putAttribute(putFlowFile, 
HADOOP_FILE_URL_ATTRIBUTE, qualifiedPath.toString());
                     session.getProvenanceReporter().send(putFlowFile, 
qualifiedPath.toString());
 
                     session.transfer(putFlowFile, getSuccessRelationship());
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index ff603352ee..ab33df270e 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -233,6 +233,7 @@ public class PutHDFSTest {
         assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
         // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
         assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + 
FILE_NAME));
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY
 + "/" + FILE_NAME));
 
         verify(spyFileSystem, times(1)).rename(any(Path.class), 
any(Path.class));
     }
@@ -267,6 +268,8 @@ public class PutHDFSTest {
         assertEquals(FILE_NAME, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals(TARGET_DIRECTORY, 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
         assertEquals("true", 
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+        // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY
 + "/" + FILE_NAME));
 
         verify(spyFileSystem, Mockito.never()).rename(any(Path.class), 
any(Path.class));
     }
@@ -304,6 +307,7 @@ public class PutHDFSTest {
         assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
         // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
         
assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/test-classes/randombytes-1"));
     }
 
     @Test
@@ -330,6 +334,7 @@ public class PutHDFSTest {
         assertTrue(mockFileSystem.exists(new 
Path("target/test-classes/randombytes-1.gz")));
         assertEquals("randombytes-1.gz", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith(TARGET_DIRECTORY
 + "/" + FILE_NAME + ".gz"));
     }
 
     @Test
@@ -423,6 +428,7 @@ public class PutHDFSTest {
         assertTrue(mockFileSystem.exists(new 
Path("target/data_test/randombytes-1")));
         assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
         assertEquals("target/data_test", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        
assertTrue(flowFile.getAttribute(PutHDFS.HADOOP_FILE_URL_ATTRIBUTE).endsWith("target/data_test/"
 + FILE_NAME));
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index e3472bf331..6b64cca767 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -36,6 +36,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -77,6 +78,10 @@ public class TestDeleteHDFS {
         assertEquals(1, provenanceEvents.size());
         assertEquals(ProvenanceEventType.REMOTE_INVOCATION, 
provenanceEvents.get(0).getEventType());
         assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", 
provenanceEvents.get(0).getTransitUri());
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
+        assertEquals("hdfs://0.example.com:8020/some/path/to/file.txt", 
flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE));
+
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
index 4d79c309a7..43f0c3ce6d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -71,6 +72,9 @@ public class TestFetchHDFS {
         assertEquals(ProvenanceEventType.FETCH, fetchEvent.getEventType());
         // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
         assertTrue(fetchEvent.getTransitUri().endsWith(file));
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(file));
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
index f691038c58..4e5511ac7d 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
@@ -60,6 +60,7 @@ import java.util.List;
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of 
the file is stored in this attribute."),
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file is stored in this attribute."),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute."),
         @WritesAttribute(attribute = "record.count", description = "The number 
of records written to the ORC file"),
         @WritesAttribute(attribute = "hive.ddl", description = "Creates a 
partial Hive DDL statement for creating an external table in Hive from the 
destination folder. "
                 + "This can be used in ReplaceText for setting the content to 
the DDL. To make it valid DDL, add \"LOCATION '<path_to_orc_file_in_hdfs>'\", 
where "
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
index 38338d7c9c..b6390f9fb5 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java
@@ -80,6 +80,7 @@ import java.util.Map;
 import java.util.TimeZone;
 import java.util.function.BiFunction;
 
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -191,6 +192,7 @@ public class PutORCTest {
         mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100");
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
                 "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, 
`favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC");
+        
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
 + "/" + filename));
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
@@ -260,6 +262,7 @@ public class PutORCTest {
         // DDL will be created with field names normalized (lowercased, e.g.) 
for Hive by default
         mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
                 "CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, 
`timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED 
AS ORC");
+        
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
 + "/" + filename));
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index 17e99c6132..6d289da8ba 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -48,7 +48,8 @@ import java.io.IOException;
 @WritesAttributes({
         @WritesAttribute(attribute="fetch.failure.reason", description="When a 
FlowFile is routed to 'failure', this attribute is added " +
                 "indicating why the file could not be fetched from the given 
filesystem."),
-        @WritesAttribute(attribute = "record.count", description = "The number 
of records in the resulting flow file")
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records in the resulting flow file"),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute.")
 })
 @SeeAlso({PutParquet.class})
 @Restricted(restrictions = {
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 2d656f2b10..eab970bd0d 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -66,6 +66,7 @@ import static 
org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
 @WritesAttributes({
         @WritesAttribute(attribute = "filename", description = "The name of 
the file is stored in this attribute."),
         @WritesAttribute(attribute = "absolute.hdfs.path", description = "The 
absolute path to the file is stored in this attribute."),
+        @WritesAttribute(attribute = "hadoop.file.url", description = "The 
hadoop url for the file is stored in this attribute."),
         @WritesAttribute(attribute = "record.count", description = "The number 
of records written to the Parquet file")
 })
 @Restricted(restrictions = {
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 3bb5475bd9..df412c8fe2 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -131,6 +132,7 @@ public class FetchParquetTest {
         final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
         flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, 
String.valueOf(USERS));
         flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+        
assertTrue(flowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY 
+ "/" + parquetFile.getName()));
 
         // the mock record writer will write the header for each record so 
replace those to get down to just the records
         String flowFileContent = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8);
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 302e36a5d9..befc898d4d 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -16,22 +16,6 @@
  */
 package org.apache.nifi.processors.parquet;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.IOUtils;
@@ -42,10 +26,10 @@ import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.parquet.utils.ParquetUtils;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.hadoop.exception.FailureException;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
-import org.apache.nifi.parquet.utils.ParquetUtils;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -70,6 +54,23 @@ import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 import org.mockito.Mockito;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.hadoop.AbstractHadoopProcessor.HADOOP_FILE_URL_ATTRIBUTE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
 @DisabledOnOs(OS.WINDOWS)
 public class PutParquetTest {
 
@@ -135,6 +136,7 @@ public class PutParquetTest {
         
mockFlowFile.assertAttributeEquals(PutParquet.ABSOLUTE_HDFS_PATH_ATTRIBUTE, 
avroParquetFile.getParent().toString());
         mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), 
filename);
         mockFlowFile.assertAttributeEquals(PutParquet.RECORD_COUNT_ATTR, 
"100");
+        
assertTrue(mockFlowFile.getAttribute(HADOOP_FILE_URL_ATTRIBUTE).endsWith(DIRECTORY
 + "/" + filename));
 
         // verify we generated a provenance event
         final List<ProvenanceEventRecord> provEvents = 
testRunner.getProvenanceEvents();

Reply via email to