This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 4e65240ddb NIFI-11902: Fix ListHDFS closes FileSystem in first run 4e65240ddb is described below commit 4e65240ddbb6e61cb3674b488251a6c3e675d67c Author: Lehel Boer <lehelb@lehelb-MBP16.local> AuthorDate: Thu Aug 3 00:07:38 2023 +0200 NIFI-11902: Fix ListHDFS closes FileSystem in first run This closes #7565. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../apache/nifi/processors/hadoop/ListHDFS.java | 98 +++++++++++----------- 1 file changed, 48 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java index e706ab65f0..4bba8572ca 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java @@ -257,58 +257,56 @@ public class ListHDFS extends AbstractHadoopProcessor { } // Pull in any file that is newer than the timestamp that we have. - try (final FileSystem hdfs = getFileSystem()) { - final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); - final PathFilter pathFilter = createPathFilter(context); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - - final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles); - final Path rootPath = getNormalizedPath(context, DIRECTORY); - final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation()); - - final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; - final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; - - final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder() - .session(session) - .successRelationship(getSuccessRelationship()) - .fileStatusIterable(fileStatusIterable) - .fileStatusManager(fileStatusManager) - .pathFilter(pathFilter) - .minimumAge(minimumAge) - .maximumAge(maximumAge) - .previousLatestTimestamp(latestTimestamp) - .previousLatestFiles(latestFiles) - .writerFactory(writerFactory) - .hdfsPrefix(getAttributePrefix()) - .logger(getLogger()) - .build(); - - writer.write(); - - getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount()); - - if (writer.getListedFileCount() > 0) { - final Map<String, String> updatedState = new HashMap<>(); - updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp())); - final List<String> files = fileStatusManager.getCurrentLatestFiles(); - for (int i = 0; i < files.size(); i++) { - final String currentFilePath = files.get(i); - updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath); - } - getLogger().debug("New state map: {}", updatedState); - updateState(session, updatedState); - - getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount()); - } else { - getLogger().debug("There is no data to list. Yielding."); - context.yield(); + final FileSystem hdfs = getFileSystem(); + final boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean(); + final PathFilter pathFilter = createPathFilter(context); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + final FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles); + final Path rootPath = getNormalizedPath(context, DIRECTORY); + final FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation()); + + final Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; + final Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; + + final HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder() + .session(session) + .successRelationship(getSuccessRelationship()) + .fileStatusIterable(fileStatusIterable) + .fileStatusManager(fileStatusManager) + .pathFilter(pathFilter) + .minimumAge(minimumAge) + .maximumAge(maximumAge) + .previousLatestTimestamp(latestTimestamp) + .previousLatestFiles(latestFiles) + .writerFactory(writerFactory) + .hdfsPrefix(getAttributePrefix()) + .logger(getLogger()) + .build(); + + writer.write(); + + getLogger().debug("Found a total of {} files in HDFS, {} are listed", fileStatusIterable.getTotalFileCount(), writer.getListedFileCount()); + + if (writer.getListedFileCount() > 0) { + final Map<String, String> updatedState = new HashMap<>(); + updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp())); + final List<String> files = fileStatusManager.getCurrentLatestFiles(); + for (int i = 0; i < files.size(); i++) { + final String currentFilePath = files.get(i); + updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath); } - } catch (IOException e) { - throw new ProcessException("IO error occurred when closing HDFS file system", e); + getLogger().debug("New state map: {}", updatedState); + updateState(session, updatedState); + + getLogger().info("Successfully created listing with {} new files from HDFS", writer.getListedFileCount()); + } else { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); } + } private PathFilter createPathFilter(final ProcessContext context) {