Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2618#discussion_r83019134
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
---
@@ -146,102 +152,57 @@ public void
run(SourceFunction.SourceContext<FileInputSplit> context) throws Exc
}
}
- private void monitorDirAndForwardSplits(FileSystem fs,
SourceContext<FileInputSplit> context) throws IOException, JobException {
+ private void monitorDirAndForwardSplits(FileSystem fs,
SourceContext<FileInputSplit> context) throws IOException {
assert (Thread.holdsLock(checkpointLock));
- List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime =
getInputSplitSortedOnModTime(fs);
-
- Iterator<Tuple2<Long, List<FileInputSplit>>> it =
splitsByModTime.iterator();
- while (it.hasNext()) {
- forwardSplits(it.next(), context);
- it.remove();
- }
- }
-
- private void forwardSplits(Tuple2<Long, List<FileInputSplit>>
splitsToFwd, SourceContext<FileInputSplit> context) {
- assert (Thread.holdsLock(checkpointLock));
-
- Long modTime = splitsToFwd.f0;
- List<FileInputSplit> splits = splitsToFwd.f1;
-
- Iterator<FileInputSplit> it = splits.iterator();
- while (it.hasNext()) {
- FileInputSplit split = it.next();
- processSplit(split, context);
- it.remove();
- }
+ List<FileStatus> eligibleFiles = listEligibleFiles(fs);
+ Map<Long, List<FileInputSplit>> splitsSortedByModTime =
getInputSplitsSortedByModTime(eligibleFiles);
- // update the global modification time
- if (modTime >= globalModificationTime) {
- globalModificationTime = modTime;
- }
- }
-
- private void processSplit(FileInputSplit split,
SourceContext<FileInputSplit> context) {
- LOG.info("Forwarding split: " + split);
- context.collect(split);
- }
-
- private List<Tuple2<Long, List<FileInputSplit>>>
getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
- List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
- if (eligibleFiles.isEmpty()) {
- return new ArrayList<>();
- }
-
- Map<Long, List<FileInputSplit>> splitsToForward =
getInputSplits(eligibleFiles);
- List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward
= new ArrayList<>();
-
- for (Map.Entry<Long, List<FileInputSplit>> entry :
splitsToForward.entrySet()) {
- sortedSplitsToForward.add(new Tuple2<>(entry.getKey(),
entry.getValue()));
- }
-
- Collections.sort(sortedSplitsToForward, new
Comparator<Tuple2<Long, List<FileInputSplit>>>() {
- @Override
- public int compare(Tuple2<Long, List<FileInputSplit>>
o1, Tuple2<Long, List<FileInputSplit>> o2) {
- return (int) (o1.f0 - o2.f0);
+ for (Map.Entry<Long, List<FileInputSplit>> splits:
splitsSortedByModTime.entrySet()) {
+ long modificationTime = splits.getKey();
+ for (FileInputSplit split: splits.getValue()) {
+ LOG.info("Forwarding split: " + split);
+ context.collect(split);
}
- });
-
- return sortedSplitsToForward;
+ // update the global modification time
+ globalModificationTime =
Math.max(globalModificationTime, modificationTime);
+ }
}
/**
- * Creates the input splits for the path to be forwarded to the
downstream tasks of the
- * {@link ContinuousFileReaderOperator}. Those tasks are going to read
their contents for further
- * processing. Splits belonging to files in the {@code eligibleFiles}
list are the ones
- * that are shipped for further processing.
+ * Creates the input splits to be forwarded to the downstream tasks of
the
+ * {@link ContinuousFileReaderOperator}. Splits are sorted <b>by
modification time</b> before
+ * being forwarded and only splits belonging to files in the {@code
eligibleFiles}
+ * list will be processed.
* @param eligibleFiles The files to process.
*/
- private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus>
eligibleFiles) throws IOException {
+ private Map<Long, List<FileInputSplit>>
getInputSplitsSortedByModTime(List<FileStatus> eligibleFiles) throws
IOException {
+ Map<Long, List<FileInputSplit>> splitsByModTime = new
TreeMap<>();
if (eligibleFiles.isEmpty()) {
- return new HashMap<>();
+ return splitsByModTime;
}
- FileInputSplit[] inputSplits =
format.createInputSplits(readerParallelism);
-
- Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
- for (FileInputSplit split: inputSplits) {
+ for (FileInputSplit split:
format.createInputSplits(readerParallelism)) {
for (FileStatus file: eligibleFiles) {
--- End diff --
Wouldn't it be better to use a `Set` here? This would give us `O(1)` lookup
time.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---