[FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd273a8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd273a8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd273a8f Branch: refs/heads/master Commit: bd273a8f435b222eb67840fb39b854ec9ef8602f Parents: 5709bf6 Author: kl0u <kklou...@gmail.com> Authored: Fri Jun 24 15:01:44 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Jun 30 14:46:26 2016 +0200 ---------------------------------------------------------------------- .../hdfstests/ContinuousFileMonitoringTest.java | 6 +- .../ContinuousFileMonitoringFunction.java | 105 +++++++++---------- .../source/ContinuousFileReaderOperator.java | 27 +++-- .../source/InputFormatSourceFunction.java | 6 +- ...ontinuousFileProcessingCheckpointITCase.java | 2 + 5 files changed, 78 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java index 87567e3..def9378 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java @@ -166,9 +166,9 @@ public class ContinuousFileMonitoringTest { content.add(element.getValue() +"\n"); } - Assert.assertEquals(actualFileContents.size(), expectedFileContents.size()); + Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); for (Integer fileIdx: expectedFileContents.keySet()) { - Assert.assertTrue(actualFileContents.keySet().contains(fileIdx)); + Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); List<String> cntnt = actualFileContents.get(fileIdx); Collections.sort(cntnt, new Comparator<String>() { @@ -182,7 +182,7 @@ public class ContinuousFileMonitoringTest { for (String line: cntnt) { cntntStr.append(line); } - Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx)); + Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); } for(org.apache.hadoop.fs.Path file: filesCreated) { http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index b97c274..8ff4a2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; @@ -53,7 +52,7 @@ import java.util.Map; */ @Internal public class ContinuousFileMonitoringFunction<OUT> - extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> { + extends RichSourceFunction<FileInputSplit> implements Checkpointed<Long> { private static final long serialVersionUID = 1L; @@ -80,14 +79,12 @@ public class ContinuousFileMonitoringFunction<OUT> /** Which new data to process (see {@link FileProcessingMode}. */ private final FileProcessingMode watchType; - private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime; - - private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd; - - private long globalModificationTime; + private Long globalModificationTime; private FilePathFilter pathFilter; + private transient Object checkpointLock; + private volatile boolean isRunning = true; public ContinuousFileMonitoringFunction( @@ -113,7 +110,7 @@ public class ContinuousFileMonitoringFunction<OUT> @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { LOG.info("Opening File Monitoring Source."); - + super.open(parameters); format.configure(parameters); } @@ -122,17 +119,28 @@ public class ContinuousFileMonitoringFunction<OUT> public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception { FileSystem fileSystem = FileSystem.get(new URI(path)); + checkpointLock = context.getCheckpointLock(); switch (watchType) { case PROCESS_CONTINUOUSLY: while (isRunning) { - monitorDirAndForwardSplits(fileSystem, context); + synchronized (checkpointLock) { + monitorDirAndForwardSplits(fileSystem, context); + } Thread.sleep(interval); } - isRunning = false; + + // here we do not need to set the running to false and the + // globalModificationTime to Long.MAX_VALUE because to arrive here, + // either close() or cancel() have already been called, so this + // is already done. + break; case PROCESS_ONCE: - monitorDirAndForwardSplits(fileSystem, context); - isRunning = false; + synchronized (checkpointLock) { + monitorDirAndForwardSplits(fileSystem, context); + globalModificationTime = Long.MAX_VALUE; + isRunning = false; + } break; default: isRunning = false; @@ -141,41 +149,22 @@ public class ContinuousFileMonitoringFunction<OUT> } private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException { - final Object lock = context.getCheckpointLock(); + assert (Thread.holdsLock(checkpointLock)); - // it may be non-null in the case of a recovery after a failure. - if (currentSplitsToFwd != null) { - synchronized (lock) { - forwardSplits(currentSplitsToFwd, context); - } - } - currentSplitsToFwd = null; - - // it may be non-null in the case of a recovery after a failure. - if (splitsToFwdOrderedAscByModTime == null) { - splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs); - } - - Iterator<Tuple2<Long, List<FileInputSplit>>> it = - splitsToFwdOrderedAscByModTime.iterator(); + List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = getInputSplitSortedOnModTime(fs); + Iterator<Tuple2<Long, List<FileInputSplit>>> it = splitsByModTime.iterator(); while (it.hasNext()) { - synchronized (lock) { - currentSplitsToFwd = it.next(); - it.remove(); - forwardSplits(currentSplitsToFwd, context); - } + forwardSplits(it.next(), context); + it.remove(); } - - // set them to null to distinguish from a restore. - splitsToFwdOrderedAscByModTime = null; - currentSplitsToFwd = null; } private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) { - currentSplitsToFwd = splitsToFwd; - Long modTime = currentSplitsToFwd.f0; - List<FileInputSplit> splits = currentSplitsToFwd.f1; + assert (Thread.holdsLock(checkpointLock)); + + Long modTime = splitsToFwd.f0; + List<FileInputSplit> splits = splitsToFwd.f1; Iterator<FileInputSplit> it = splits.iterator(); while (it.hasNext()) { @@ -284,6 +273,7 @@ public class ContinuousFileMonitoringFunction<OUT> * is the time of the most recent modification found in any of the already processed files. */ private boolean shouldIgnore(Path filePath, long modificationTime) { + assert (Thread.holdsLock(checkpointLock)); boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime); if (shouldIgnore) { LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime); @@ -294,35 +284,36 @@ public class ContinuousFileMonitoringFunction<OUT> @Override public void close() throws Exception { super.close(); - isRunning = false; + synchronized (checkpointLock) { + globalModificationTime = Long.MAX_VALUE; + isRunning = false; + } LOG.info("Closed File Monitoring Source."); } @Override public void cancel() { - isRunning = false; + if (checkpointLock != null) { + // this is to cover the case where cancel() is called before the run() + synchronized (checkpointLock) { + globalModificationTime = Long.MAX_VALUE; + isRunning = false; + } + } else { + globalModificationTime = Long.MAX_VALUE; + isRunning = false; + } } // --------------------- Checkpointing -------------------------- @Override - public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState( - long checkpointId, long checkpointTimestamp) throws Exception { - - if (!isRunning) { - LOG.debug("snapshotState() called on closed source"); - return null; - } - return new Tuple3<>(splitsToFwdOrderedAscByModTime, - currentSplitsToFwd, globalModificationTime); + public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + return globalModificationTime; } @Override - public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, - Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception { - - this.splitsToFwdOrderedAscByModTime = state.f0; - this.currentSplitsToFwd = state.f1; - this.globalModificationTime = state.f2; + public void restoreState(Long state) throws Exception { + this.globalModificationTime = state; } } http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 455c753..1c2da34 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -104,7 +104,14 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A this.collector = new TimestampedCollector<>(output); this.checkpointLock = getContainingTask().getCheckpointLock(); + Preconditions.checkState(reader == null, "The reader is already initialized."); + this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); + + // the readerState is needed for the initialization of the reader + // when recovering from a failure. So after the initialization, + // we can set it to null. + this.readerState = null; this.reader.start(); } @@ -191,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private volatile boolean isSplitOpen = false; - SplitReader(FileInputFormat<OT> format, + private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, TimestampedCollector<OT> collector, Object checkpointLock, @@ -212,18 +219,19 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = restoredState.f2; for (FileInputSplit split : pending) { + Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); pendingSplits.add(split); } this.currentSplit = current; this.restoredFormatState = formatState; } - ContinuousFileReaderOperator.this.readerState = null; } - void addSplit(FileInputSplit split) { + private void addSplit(FileInputSplit split) { Preconditions.checkNotNull(split); synchronized (checkpointLock) { + Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + "."); this.pendingSplits.add(split); } } @@ -323,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } } - Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException { + private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException { List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size()); for (FileInputSplit split: this.pendingSplits) { snapshot.add(split); @@ -334,9 +342,11 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A this.pendingSplits.remove(); } - if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { - S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState(); - return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState); + if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) { + S formatState = this.isSplitOpen ? + (S) ((CheckpointableInputFormat) format).getCurrentState() : + restoredFormatState; + return new Tuple3<>(snapshot, currentSplit, formatState); } else { LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); return new Tuple3<>(snapshot, currentSplit, null); @@ -405,6 +415,9 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A S formatState = (S) ois.readObject(); // set the whole reader state for the open() to find. + Preconditions.checkState(this.readerState == null, + "The reader state has already been initialized."); + this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); div.close(); } http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java index f35cbba..e3e5c54 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java @@ -85,7 +85,11 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); - ctx.collect(nextElement); + if (nextElement != null) { + ctx.collect(nextElement); + } else { + break; + } } format.close(); completedSplitsCounter.inc(); http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index 4c0f648..d540a92 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -219,6 +219,8 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran stream.write(line.getBytes()); } stream.close(); + + Assert.assertTrue("Result file present", !fs.exists(file)); fs.rename(tmp, file); Assert.assertTrue("No result file present", fs.exists(file)); return new Tuple2<>(file, str.toString());