turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226887367


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/writer/HdfsObjectWriter.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop.util.writer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.util.List;
+
+/**
+ * Interface for common management of writing to records and to flowfiles.
+ */
+public interface HdfsObjectWriter {
+
+    void write();
+
+    long getListedFileCount();
+
+    default boolean determineListable(final FileStatus status, final long 
minimumAge, final long maximumAge, final PathFilter filter,
+                                      final long latestModificationTime, final 
List<String> latestModifiedStatuses) {
+        final boolean isCopyInProgress = 
status.getPath().getName().endsWith("_COPYING_");
+        final boolean isFilterAccepted = filter.accept(status.getPath());
+        if (isCopyInProgress || !isFilterAccepted) {
+            return false;
+        }
+        // If the file was created during the processor's last iteration we 
have to check if it was already listed
+        if (status.getModificationTime() == latestModificationTime) {
+            return 
!latestModifiedStatuses.contains(status.getPath().toString());
+        }
+
+        final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();

Review Comment:
   I see the logic was copied from the original version but calling 
`System.currentTimeMillis()` multiple times (individually for each file) may 
lead to skipped files so it is also a bug which should be fixed.
   
   Example (min age = 5 sec):
   - T0: file1 is created
   - T0+1: file2 is created
   - T0+4999: file1 is listed but it is not old enough (4999 - 0 = 4999 ms), so 
it will be skipped in this iteration
   - T0+5001: file2 is listed and it is old enough (5001 - 1 = 5000 ms), so it 
passes the test and will be processed
   
   The problem is that file2 also sets the latest modification time to T0+1 so 
file1 will not be listed in the next iteration either.
   
   Using a "global" current time (determined before all listings) would solve 
this issue.
   In this example, if the base timestamp is T+4998, then both files are not 
old enough and will be processed in the next iteration only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to