This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 4e65240ddb NIFI-11902: Fix ListHDFS closes FileSystem in first run
4e65240ddb is described below

commit 4e65240ddbb6e61cb3674b488251a6c3e675d67c
Author: Lehel Boer <lehelb@lehelb-MBP16.local>
AuthorDate: Thu Aug 3 00:07:38 2023 +0200

    NIFI-11902: Fix ListHDFS closes FileSystem in first run
    
    This closes #7565.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../apache/nifi/processors/hadoop/ListHDFS.java    | 98 +++++++++++-----------
 1 file changed, 48 insertions(+), 50 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index e706ab65f0..4bba8572ca 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -257,58 +257,56 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        try (final FileSystem hdfs = getFileSystem()) {
-            final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-            final PathFilter pathFilter = createPathFilter(context);
-            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-            final FileStatusManager fileStatusManager = new 
FileStatusManager(latestTimestamp, latestFiles);
-            final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            final FileStatusIterable fileStatusIterable = new 
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
-
-            final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
-            final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
-
-            final HadoopFileStatusWriter writer = 
HadoopFileStatusWriter.builder()
-                    .session(session)
-                    .successRelationship(getSuccessRelationship())
-                    .fileStatusIterable(fileStatusIterable)
-                    .fileStatusManager(fileStatusManager)
-                    .pathFilter(pathFilter)
-                    .minimumAge(minimumAge)
-                    .maximumAge(maximumAge)
-                    .previousLatestTimestamp(latestTimestamp)
-                    .previousLatestFiles(latestFiles)
-                    .writerFactory(writerFactory)
-                    .hdfsPrefix(getAttributePrefix())
-                    .logger(getLogger())
-                    .build();
-
-            writer.write();
-
-            getLogger().debug("Found a total of {} files in HDFS, {} are 
listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
-
-            if (writer.getListedFileCount() > 0) {
-                final Map<String, String> updatedState = new HashMap<>();
-                updatedState.put(LATEST_TIMESTAMP_KEY, 
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
-                final List<String> files = 
fileStatusManager.getCurrentLatestFiles();
-                for (int i = 0; i < files.size(); i++) {
-                    final String currentFilePath = files.get(i);
-                    updatedState.put(String.format(LATEST_FILES_KEY, i), 
currentFilePath);
-                }
-                getLogger().debug("New state map: {}", updatedState);
-                updateState(session, updatedState);
-
-                getLogger().info("Successfully created listing with {} new 
files from HDFS", writer.getListedFileCount());
-            } else {
-                getLogger().debug("There is no data to list. Yielding.");
-                context.yield();
+        final FileSystem hdfs = getFileSystem();
+        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+        final PathFilter pathFilter = createPathFilter(context);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        final FileStatusManager fileStatusManager = new 
FileStatusManager(latestTimestamp, latestFiles);
+        final Path rootPath = getNormalizedPath(context, DIRECTORY);
+        final FileStatusIterable fileStatusIterable = new 
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
+
+        final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
+        final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
+
+        final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder()
+                .session(session)
+                .successRelationship(getSuccessRelationship())
+                .fileStatusIterable(fileStatusIterable)
+                .fileStatusManager(fileStatusManager)
+                .pathFilter(pathFilter)
+                .minimumAge(minimumAge)
+                .maximumAge(maximumAge)
+                .previousLatestTimestamp(latestTimestamp)
+                .previousLatestFiles(latestFiles)
+                .writerFactory(writerFactory)
+                .hdfsPrefix(getAttributePrefix())
+                .logger(getLogger())
+                .build();
+
+        writer.write();
+
+        getLogger().debug("Found a total of {} files in HDFS, {} are listed", 
fileStatusIterable.getTotalFileCount(), writer.getListedFileCount());
+
+        if (writer.getListedFileCount() > 0) {
+            final Map<String, String> updatedState = new HashMap<>();
+            updatedState.put(LATEST_TIMESTAMP_KEY, 
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+            final List<String> files = 
fileStatusManager.getCurrentLatestFiles();
+            for (int i = 0; i < files.size(); i++) {
+                final String currentFilePath = files.get(i);
+                updatedState.put(String.format(LATEST_FILES_KEY, i), 
currentFilePath);
             }
-        } catch (IOException e) {
-            throw new ProcessException("IO error occurred when closing HDFS 
file system", e);
+            getLogger().debug("New state map: {}", updatedState);
+            updateState(session, updatedState);
+
+            getLogger().info("Successfully created listing with {} new files 
from HDFS", writer.getListedFileCount());
+        } else {
+            getLogger().debug("There is no data to list. Yielding.");
+            context.yield();
         }
+
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {

Reply via email to