[FLINK-4075] Fix unstable ContinuousFileProcessingCheckpointITCase

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bd273a8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bd273a8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bd273a8f

Branch: refs/heads/master
Commit: bd273a8f435b222eb67840fb39b854ec9ef8602f
Parents: 5709bf6
Author: kl0u <kklou...@gmail.com>
Authored: Fri Jun 24 15:01:44 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Jun 30 14:46:26 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |   6 +-
 .../ContinuousFileMonitoringFunction.java       | 105 +++++++++----------
 .../source/ContinuousFileReaderOperator.java    |  27 +++--
 .../source/InputFormatSourceFunction.java       |   6 +-
 ...ontinuousFileProcessingCheckpointITCase.java |   2 +
 5 files changed, 78 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 87567e3..def9378 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -166,9 +166,9 @@ public class ContinuousFileMonitoringTest {
                        content.add(element.getValue() +"\n");
                }
 
-               Assert.assertEquals(actualFileContents.size(), 
expectedFileContents.size());
+               Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
                for (Integer fileIdx: expectedFileContents.keySet()) {
-                       
Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+                       Assert.assertTrue("file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
 
                        List<String> cntnt = actualFileContents.get(fileIdx);
                        Collections.sort(cntnt, new Comparator<String>() {
@@ -182,7 +182,7 @@ public class ContinuousFileMonitoringTest {
                        for (String line: cntnt) {
                                cntntStr.append(line);
                        }
-                       Assert.assertEquals(cntntStr.toString(), 
expectedFileContents.get(fileIdx));
+                       Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
                }
 
                for(org.apache.hadoop.fs.Path file: filesCreated) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index b97c274..8ff4a2a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
@@ -53,7 +52,7 @@ import java.util.Map;
  */
 @Internal
 public class ContinuousFileMonitoringFunction<OUT>
-       extends RichSourceFunction<FileInputSplit> implements 
Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, 
List<FileInputSplit>>, Long>> {
+       extends RichSourceFunction<FileInputSplit> implements 
Checkpointed<Long> {
 
        private static final long serialVersionUID = 1L;
 
@@ -80,14 +79,12 @@ public class ContinuousFileMonitoringFunction<OUT>
        /** Which new data to process (see {@link FileProcessingMode}. */
        private final FileProcessingMode watchType;
 
-       private List<Tuple2<Long, List<FileInputSplit>>> 
splitsToFwdOrderedAscByModTime;
-
-       private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
-
-       private long globalModificationTime;
+       private Long globalModificationTime;
 
        private FilePathFilter pathFilter;
 
+       private transient Object checkpointLock;
+
        private volatile boolean isRunning = true;
 
        public ContinuousFileMonitoringFunction(
@@ -113,7 +110,7 @@ public class ContinuousFileMonitoringFunction<OUT>
        @SuppressWarnings("unchecked")
        public void open(Configuration parameters) throws Exception {
                LOG.info("Opening File Monitoring Source.");
-               
+
                super.open(parameters);
                format.configure(parameters);
        }
@@ -122,17 +119,28 @@ public class ContinuousFileMonitoringFunction<OUT>
        public void run(SourceFunction.SourceContext<FileInputSplit> context) 
throws Exception {
                FileSystem fileSystem = FileSystem.get(new URI(path));
 
+               checkpointLock = context.getCheckpointLock();
                switch (watchType) {
                        case PROCESS_CONTINUOUSLY:
                                while (isRunning) {
-                                       monitorDirAndForwardSplits(fileSystem, 
context);
+                                       synchronized (checkpointLock) {
+                                               
monitorDirAndForwardSplits(fileSystem, context);
+                                       }
                                        Thread.sleep(interval);
                                }
-                               isRunning = false;
+
+                               // here we do not need to set the running to 
false and the
+                               // globalModificationTime to Long.MAX_VALUE 
because to arrive here,
+                               // either close() or cancel() have already been 
called, so this
+                               // is already done.
+
                                break;
                        case PROCESS_ONCE:
-                               monitorDirAndForwardSplits(fileSystem, context);
-                               isRunning = false;
+                               synchronized (checkpointLock) {
+                                       monitorDirAndForwardSplits(fileSystem, 
context);
+                                       globalModificationTime = Long.MAX_VALUE;
+                                       isRunning = false;
+                               }
                                break;
                        default:
                                isRunning = false;
@@ -141,41 +149,22 @@ public class ContinuousFileMonitoringFunction<OUT>
        }
 
        private void monitorDirAndForwardSplits(FileSystem fs, 
SourceContext<FileInputSplit> context) throws IOException, JobException {
-               final Object lock = context.getCheckpointLock();
+               assert (Thread.holdsLock(checkpointLock));
 
-               // it may be non-null in the case of a recovery after a failure.
-               if (currentSplitsToFwd != null) {
-                       synchronized (lock) {
-                               forwardSplits(currentSplitsToFwd, context);
-                       }
-               }
-               currentSplitsToFwd = null;
-
-               // it may be non-null in the case of a recovery after a failure.
-               if (splitsToFwdOrderedAscByModTime == null) {
-                       splitsToFwdOrderedAscByModTime = 
getInputSplitSortedOnModTime(fs);
-               }
-
-               Iterator<Tuple2<Long, List<FileInputSplit>>> it =
-                       splitsToFwdOrderedAscByModTime.iterator();
+               List<Tuple2<Long, List<FileInputSplit>>> splitsByModTime = 
getInputSplitSortedOnModTime(fs);
 
+               Iterator<Tuple2<Long, List<FileInputSplit>>> it = 
splitsByModTime.iterator();
                while (it.hasNext()) {
-                       synchronized (lock) {
-                               currentSplitsToFwd = it.next();
-                               it.remove();
-                               forwardSplits(currentSplitsToFwd, context);
-                       }
+                       forwardSplits(it.next(), context);
+                       it.remove();
                }
-
-               // set them to null to distinguish from a restore.
-               splitsToFwdOrderedAscByModTime = null;
-               currentSplitsToFwd = null;
        }
 
        private void forwardSplits(Tuple2<Long, List<FileInputSplit>> 
splitsToFwd, SourceContext<FileInputSplit> context) {
-               currentSplitsToFwd = splitsToFwd;
-               Long modTime = currentSplitsToFwd.f0;
-               List<FileInputSplit> splits = currentSplitsToFwd.f1;
+               assert (Thread.holdsLock(checkpointLock));
+
+               Long modTime = splitsToFwd.f0;
+               List<FileInputSplit> splits = splitsToFwd.f1;
 
                Iterator<FileInputSplit> it = splits.iterator();
                while (it.hasNext()) {
@@ -284,6 +273,7 @@ public class ContinuousFileMonitoringFunction<OUT>
         * is the time of the most recent modification found in any of the 
already processed files.
         */
        private boolean shouldIgnore(Path filePath, long modificationTime) {
+               assert (Thread.holdsLock(checkpointLock));
                boolean shouldIgnore = ((pathFilter != null && 
pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
                if (shouldIgnore) {
                        LOG.debug("Ignoring " + filePath + ", with mod time= " 
+ modificationTime + " and global mod time= " + globalModificationTime);
@@ -294,35 +284,36 @@ public class ContinuousFileMonitoringFunction<OUT>
        @Override
        public void close() throws Exception {
                super.close();
-               isRunning = false;
+               synchronized (checkpointLock) {
+                       globalModificationTime = Long.MAX_VALUE;
+                       isRunning = false;
+               }
                LOG.info("Closed File Monitoring Source.");
        }
 
        @Override
        public void cancel() {
-               isRunning = false;
+               if (checkpointLock != null) {
+                       // this is to cover the case where cancel() is called 
before the run()
+                       synchronized (checkpointLock) {
+                               globalModificationTime = Long.MAX_VALUE;
+                               isRunning = false;
+                       }
+               } else {
+                       globalModificationTime = Long.MAX_VALUE;
+                       isRunning = false;
+               }
        }
 
        //      ---------------------                   Checkpointing           
        --------------------------
 
        @Override
-       public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, 
List<FileInputSplit>>, Long> snapshotState(
-               long checkpointId, long checkpointTimestamp) throws Exception {
-
-               if (!isRunning) {
-                       LOG.debug("snapshotState() called on closed source");
-                       return null;
-               }
-               return new Tuple3<>(splitsToFwdOrderedAscByModTime,
-                       currentSplitsToFwd, globalModificationTime);
+       public Long snapshotState(long checkpointId, long checkpointTimestamp) 
throws Exception {
+               return globalModificationTime;
        }
 
        @Override
-       public void restoreState(Tuple3<List<Tuple2<Long, 
List<FileInputSplit>>>,
-               Tuple2<Long, List<FileInputSplit>>, Long> state) throws 
Exception {
-
-               this.splitsToFwdOrderedAscByModTime = state.f0;
-               this.currentSplitsToFwd = state.f1;
-               this.globalModificationTime = state.f2;
+       public void restoreState(Long state) throws Exception {
+               this.globalModificationTime = state;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 455c753..1c2da34 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -104,7 +104,14 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                this.collector = new TimestampedCollector<>(output);
                this.checkpointLock = getContainingTask().getCheckpointLock();
 
+               Preconditions.checkState(reader == null, "The reader is already 
initialized.");
+
                this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock, readerState);
+
+               // the readerState is needed for the initialization of the 
reader
+               // when recovering from a failure. So after the initialization,
+               // we can set it to null.
+               this.readerState = null;
                this.reader.start();
        }
 
@@ -191,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                private volatile boolean isSplitOpen = false;
 
-               SplitReader(FileInputFormat<OT> format,
+               private SplitReader(FileInputFormat<OT> format,
                                        TypeSerializer<OT> serializer,
                                        TimestampedCollector<OT> collector,
                                        Object checkpointLock,
@@ -212,18 +219,19 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                S formatState = restoredState.f2;
 
                                for (FileInputSplit split : pending) {
+                                       
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                        pendingSplits.add(split);
                                }
 
                                this.currentSplit = current;
                                this.restoredFormatState = formatState;
                        }
-                       ContinuousFileReaderOperator.this.readerState = null;
                }
 
-               void addSplit(FileInputSplit split) {
+               private void addSplit(FileInputSplit split) {
                        Preconditions.checkNotNull(split);
                        synchronized (checkpointLock) {
+                               
Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split 
entry to read: " + split + ".");
                                this.pendingSplits.add(split);
                        }
                }
@@ -323,7 +331,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        }
                }
 
-               Tuple3<List<FileInputSplit>, FileInputSplit, S> 
getReaderState() throws IOException {
+               private Tuple3<List<FileInputSplit>, FileInputSplit, S> 
getReaderState() throws IOException {
                        List<FileInputSplit> snapshot = new 
ArrayList<>(this.pendingSplits.size());
                        for (FileInputSplit split: this.pendingSplits) {
                                snapshot.add(split);
@@ -334,9 +342,11 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                this.pendingSplits.remove();
                        }
 
-                       if (this.format instanceof CheckpointableInputFormat && 
this.isSplitOpen) {
-                               S formatState = (S) 
((CheckpointableInputFormat) format).getCurrentState();
-                               return new Tuple3<>(snapshot, currentSplit, 
currentSplit == null ? null : formatState);
+                       if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
+                               S formatState = this.isSplitOpen ?
+                                       (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
+                                       restoredFormatState;
+                               return new Tuple3<>(snapshot, currentSplit, 
formatState);
                        } else {
                                LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
                                return new Tuple3<>(snapshot, currentSplit, 
null);
@@ -405,6 +415,9 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                S formatState = (S) ois.readObject();
 
                // set the whole reader state for the open() to find.
+               Preconditions.checkState(this.readerState == null,
+                       "The reader state has already been initialized.");
+
                this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
                div.close();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index f35cbba..e3e5c54 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -85,7 +85,11 @@ public class InputFormatSourceFunction<OUT> extends 
RichParallelSourceFunction<O
                                
                                while (isRunning && !format.reachedEnd()) {
                                        nextElement = 
format.nextRecord(nextElement);
-                                       ctx.collect(nextElement);
+                                       if (nextElement != null) {
+                                               ctx.collect(nextElement);
+                                       } else {
+                                               break;
+                                       }
                                }
                                format.close();
                                completedSplitsCounter.inc();

http://git-wip-us.apache.org/repos/asf/flink/blob/bd273a8f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index 4c0f648..d540a92 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -219,6 +219,8 @@ public class ContinuousFileProcessingCheckpointITCase 
extends StreamFaultToleran
                        stream.write(line.getBytes());
                }
                stream.close();
+
+               Assert.assertTrue("Result file present", !fs.exists(file));
                fs.rename(tmp, file);
                Assert.assertTrue("No result file present", fs.exists(file));
                return new Tuple2<>(file, str.toString());

Reply via email to