Remove polluting log message in ContinuousFileReaderOperator Before, when snapshotting, we printed a log message about the file input format not being checkpointable when the current split was "null". Now, we only print the message when when appropriate.
This closes #2174 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4 Branch: refs/heads/master Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee Parents: a9733a9 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Thu Jun 30 11:46:52 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Jun 30 15:31:10 2016 +0200 ---------------------------------------------------------------------- .../source/ContinuousFileReaderOperator.java | 33 ++++++++++++++------ 1 file changed, 24 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 1c2da34..0daa7ad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -65,6 +65,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null); @@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A private FileInputFormat<OUT> format; private TypeSerializer<OUT> serializer; - private Object checkpointLock; + private transient Object checkpointLock; private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState; @@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A } if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) { - ((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState); + + @SuppressWarnings("unchecked") + CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat = + (CheckpointableInputFormat<FileInputSplit, S>) this.format; + + checkpointableFormat.reopen(currentSplit, restoredFormatState); } else { // this is the case of a non-checkpointable input format that will reprocess the last split. LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable."); @@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A this.pendingSplits.remove(); } - if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) { - S formatState = this.isSplitOpen ? - (S) ((CheckpointableInputFormat) format).getCurrentState() : - restoredFormatState; - return new Tuple3<>(snapshot, currentSplit, formatState); + if (this.currentSplit != null) { + if (this.format instanceof CheckpointableInputFormat) { + @SuppressWarnings("unchecked") + CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat = + (CheckpointableInputFormat<FileInputSplit, S>) this.format; + + S formatState = this.isSplitOpen ? + checkpointableFormat.getCurrentState() : + restoredFormatState; + return new Tuple3<>(snapshot, currentSplit, formatState); + } else { + LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); + return new Tuple3<>(snapshot, currentSplit, null); + } } else { - LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery."); - return new Tuple3<>(snapshot, currentSplit, null); + return new Tuple3<>(snapshot, null, null); } }