Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2618#discussion_r83019134
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
    @@ -146,102 +152,57 @@ public void 
run(SourceFunction.SourceContext<FileInputSplit> context) throws Exc
                }
        }
     
    -   private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext<FileInputSplit> context) throws IOException, JobException {
    +   private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext<FileInputSplit> context) throws IOException {
                assert (Thread.holdsLock(checkpointLock));
     
    -           List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = 
getInputSplitSortedOnModTime(fs);
    -
    -           Iterator<Tuple2<Long, List<FileInputSplit>>> it = 
splitsByModTime.iterator();
    -           while (it.hasNext()) {
    -                   forwardSplits(it.next(), context);
    -                   it.remove();
    -           }
    -   }
    -
    -   private void forwardSplits(Tuple2<Long, List<FileInputSplit>> 
splitsToFwd, SourceContext<FileInputSplit> context) {
    -           assert (Thread.holdsLock(checkpointLock));
    -
    -           Long modTime = splitsToFwd.f0;
    -           List<FileInputSplit> splits = splitsToFwd.f1;
    -
    -           Iterator<FileInputSplit> it = splits.iterator();
    -           while (it.hasNext()) {
    -                   FileInputSplit split = it.next();
    -                   processSplit(split, context);
    -                   it.remove();
    -           }
    +           List<FileStatus> eligibleFiles = listEligibleFiles(fs);
    +           Map<Long, List<FileInputSplit>> splitsSortedByModTime = 
getInputSplitsSortedByModTime(eligibleFiles);
     
    -           // update the global modification time
    -           if (modTime >= globalModificationTime) {
    -                   globalModificationTime = modTime;
    -           }
    -   }
    -
    -   private void processSplit(FileInputSplit split, 
SourceContext<FileInputSplit> context) {
    -           LOG.info("Forwarding split: " + split);
    -           context.collect(split);
    -   }
    -
    -   private List<Tuple2<Long, List<FileInputSplit>>> 
getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
    -           List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
    -           if (eligibleFiles.isEmpty()) {
    -                   return new ArrayList<>();
    -           }
    -
    -           Map<Long, List<FileInputSplit>> splitsToForward = 
getInputSplits(eligibleFiles);
    -           List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward 
= new ArrayList<>();
    -
    -           for (Map.Entry<Long, List<FileInputSplit>> entry : 
splitsToForward.entrySet()) {
    -                   sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), 
entry.getValue()));
    -           }
    -
    -           Collections.sort(sortedSplitsToForward, new 
Comparator<Tuple2<Long, List<FileInputSplit>>>() {
    -                   @Override
    -                   public int compare(Tuple2<Long, List<FileInputSplit>> 
o1, Tuple2<Long, List<FileInputSplit>> o2) {
    -                           return (int) (o1.f0 - o2.f0);
    +           for (Map.Entry<Long, List<FileInputSplit>> splits: 
splitsSortedByModTime.entrySet()) {
    +                   long modificationTime = splits.getKey();
    +                   for (FileInputSplit split: splits.getValue()) {
    +                           LOG.info("Forwarding split: " + split);
    +                           context.collect(split);
                        }
    -           });
    -
    -           return sortedSplitsToForward;
    +                   // update the global modification time
    +                   globalModificationTime = 
Math.max(globalModificationTime, modificationTime);
    +           }
        }
     
        /**
    -    * Creates the input splits for the path to be forwarded to the 
downstream tasks of the
    -    * {@link ContinuousFileReaderOperator}. Those tasks are going to read 
their contents for further
    -    * processing. Splits belonging to files in the {@code eligibleFiles} 
list are the ones
    -    * that are shipped for further processing.
    +    * Creates the input splits to be forwarded to the downstream tasks of 
the
    +    * {@link ContinuousFileReaderOperator}. Splits are sorted <b>by 
modification time</b> before
    +    * being forwarded and only splits belonging to files in the {@code 
eligibleFiles}
    +    * list will be processed.
         * @param eligibleFiles The files to process.
         */
    -   private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> 
eligibleFiles) throws IOException {
    +   private Map<Long, List<FileInputSplit>> 
getInputSplitsSortedByModTime(List<FileStatus> eligibleFiles) throws 
IOException {
    +           Map<Long, List<FileInputSplit>> splitsByModTime = new 
TreeMap<>();
                if (eligibleFiles.isEmpty()) {
    -                   return new HashMap<>();
    +                   return splitsByModTime;
                }
     
    -           FileInputSplit[] inputSplits = 
format.createInputSplits(readerParallelism);
    -
    -           Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
    -           for (FileInputSplit split: inputSplits) {
    +           for (FileInputSplit split: 
format.createInputSplits(readerParallelism)) {
                        for (FileStatus file: eligibleFiles) {
    --- End diff --
    
    Wouldn't it be better to use a `Set` here? This would give us `O(1)` lookup 
time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to