Remove polluting log message in ContinuousFileReaderOperator

Before, when snapshotting, we printed a log message about the file
input format not being checkpointable when the current split was
"null". Now, we only print the message when when appropriate.

This closes #2174


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c6b17b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c6b17b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c6b17b4

Branch: refs/heads/master
Commit: 6c6b17b4d47d281b0e5dcf4413fd1ad53ce49eee
Parents: a9733a9
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Thu Jun 30 11:46:52 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Jun 30 15:31:10 2016 +0200

----------------------------------------------------------------------
 .../source/ContinuousFileReaderOperator.java    | 33 ++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c6b17b4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 1c2da34..0daa7ad 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -65,6 +65,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends 
AbstractStreamOperator<OUT>
        implements OneInputStreamOperator<FileInputSplit, OUT>, 
OutputTypeConfigurable<OUT> {
 
+       private static final long serialVersionUID = 1L;
+
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
        private static final FileInputSplit EOS = new FileInputSplit(-1, null, 
-1, -1, null);
@@ -75,7 +77,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        private FileInputFormat<OUT> format;
        private TypeSerializer<OUT> serializer;
 
-       private Object checkpointLock;
+       private transient Object checkpointLock;
 
        private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
@@ -259,7 +261,12 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                                        }
 
                                                        if (this.format 
instanceof CheckpointableInputFormat && restoredFormatState != null) {
-                                                               
((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+
+                                                               
@SuppressWarnings("unchecked")
+                                                               
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+                                                                               
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+                                                               
checkpointableFormat.reopen(currentSplit, restoredFormatState);
                                                        } else {
                                                                // this is the 
case of a non-checkpointable input format that will reprocess the last split.
                                                                
LOG.info("Format " + this.format.getClass().getName() + " used is not 
checkpointable.");
@@ -342,14 +349,22 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                                this.pendingSplits.remove();
                        }
 
-                       if (this.format instanceof CheckpointableInputFormat && 
this.currentSplit != null) {
-                               S formatState = this.isSplitOpen ?
-                                       (S) ((CheckpointableInputFormat) 
format).getCurrentState() :
-                                       restoredFormatState;
-                               return new Tuple3<>(snapshot, currentSplit, 
formatState);
+                       if (this.currentSplit != null) {
+                               if (this.format instanceof 
CheckpointableInputFormat) {
+                                       @SuppressWarnings("unchecked")
+                                       
CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
+                                                       
(CheckpointableInputFormat<FileInputSplit, S>) this.format;
+
+                                       S formatState = this.isSplitOpen ?
+                                                       
checkpointableFormat.getCurrentState() :
+                                                       restoredFormatState;
+                                       return new Tuple3<>(snapshot, 
currentSplit, formatState);
+                               } else {
+                                       LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
+                                       return new Tuple3<>(snapshot, 
currentSplit, null);
+                               }
                        } else {
-                               LOG.info("The format used is not 
checkpointable. The current input split will be restarted upon recovery.");
-                               return new Tuple3<>(snapshot, currentSplit, 
null);
+                               return new Tuple3<>(snapshot, null, null);
                        }
                }
 

Reply via email to