[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395362#comment-16395362 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 @StephanEwen, thanks for your feedbacks. for our use case we'd like to take the bad file out to continue process instead of fixing the bad file to be able to continue. I think it can be more as job/application specific so we won't have it globally, it will give people flexibility. If that is the decision here, we can do as a connector parameter not via config which affect all the jobs. Please suggest. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395111#comment-16395111 ] ASF GitHub Bot commented on FLINK-8599: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5521 The approach here does not work (there is also no test, otherwise that would have been caught). Flink does not magically load configs like Hadoop, Flink passes configs explicitly. Aside from that, we first should actually decide whether this should be > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393002#comment-16393002 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 @StephanEwen @kl0u Thanks for you feedback, I will put an option for user to choose > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392906#comment-16392906 ] ASF GitHub Bot commented on FLINK-8599: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5521 @ChengzhiZhao I think that we should handle the "bad" splits better. @StephanEwen You are right that we should give the user the option to fail the job or ignore them. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392902#comment-16392902 ] ASF GitHub Bot commented on FLINK-8599: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5521#discussion_r173454031 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -819,6 +819,10 @@ public void open(FileInputSplit fileSplit) throws IOException { this.stream = isot.waitForCompletion(); this.stream = decorateInputStream(this.stream, fileSplit); } + catch (FileNotFoundException e) { + throw (FileNotFoundException)(new FileNotFoundException("Input split " + fileSplit.getPath() + --- End diff -- I don't understand why "skip and continue" is in this message. Not all users of the `FileIputFormat` skip and continue. The interpretation of the exception should not be assumed when creating the exception. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392901#comment-16392901 ] ASF GitHub Bot commented on FLINK-8599: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5521#discussion_r173453379 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -340,6 +341,10 @@ public void run() { } } + } catch (FileNotFoundException e) { + if (LOG.isDebugEnabled()) { --- End diff -- This needs to be more prominently logged than `debug`. Should be at least `info` or `warn`. Please also use the placeholder syntax from log4j, to make the code simpler. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374359#comment-16374359 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 @steveloughran Updated. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371250#comment-16371250 ] ASF GitHub Bot commented on FLINK-8599: --- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/flink/pull/5521#discussion_r169604725 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -819,6 +819,10 @@ public void open(FileInputSplit fileSplit) throws IOException { this.stream = isot.waitForCompletion(); this.stream = decorateInputStream(this.stream, fileSplit); } + catch (FileNotFoundException e) { + throw new FileNotFoundException("Input split " + fileSplit.getPath() + + " doesn't exist, skip and continue: " + e.getMessage()); + } --- End diff -- As this exception doesn't have a constructor which takes a nested exception, can you use `initCause()` to patch it. ```java throw (FileNotFoundException)(new FileNotFoundException(...)).initCause(e) ``` > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16370347#comment-16370347 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao commented on the issue: https://github.com/apache/flink/pull/5521 Thanks @steveloughran for your feedbacks, I updated based on your suggestions. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369902#comment-16369902 ] ASF GitHub Bot commented on FLINK-8599: --- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/flink/pull/5521#discussion_r169275364 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -706,6 +700,9 @@ public void open(FileInputSplit fileSplit) throws IOException { this.stream = isot.waitForCompletion(); this.stream = decorateInputStream(this.stream, fileSplit); } + catch (FileNotFoundException e) { + throw new FileNotFoundException("Input split " + fileSplit.getPath() + " doesn't exist, skip and continue"); + } --- End diff -- I would recommend including the text and stack of the caught ex, for the better stack trace. FNFEs can get raised in odd circumstances in S3; seeing the full stack is what you need when fielding support calls. eg. ```java throw new FileNotFoundException("Input split " + fileSplit.getPath() + " doesn't exist, skip and continue: " + e, e); ``` > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369324#comment-16369324 ] ASF GitHub Bot commented on FLINK-8599: --- Github user steveloughran commented on the issue: https://github.com/apache/flink/pull/5521 This is very inefficient against an object store, potentially adding a few hundred millis and $0.01 per file. I would simply catch FileNotFoundExceptions raised in the open() call and treat them specially See [How long does FileSystem.exists() take against S3?](http://steveloughran.blogspot.co.uk/2016/12/how-long-does-filesystemexists-take.html) > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16369322#comment-16369322 ] ASF GitHub Bot commented on FLINK-8599: --- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/flink/pull/5521#discussion_r169132404 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -691,6 +691,12 @@ public void open(FileInputSplit fileSplit) throws IOException { LOG.debug("Opening input split " + fileSplit.getPath() + " [" + this.splitStart + "," + this.splitLength + "]"); } + if (!exists(fileSplit.getPath())) { --- End diff -- you are doubling the number of checks for file existence here, which, when working with S3 implies three more HTTP requests which takes time and cost money. Better to do the open() call and catch FileNotFoundException, which all filesystems are required to throw if they are given a path which doesn't resolve to a file. > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368678#comment-16368678 ] ASF GitHub Bot commented on FLINK-8599: --- GitHub user ChengzhiZhao opened a pull request: https://github.com/apache/flink/pull/5521 [FLINK-8599] Improve the failure behavior of the FileInputFormat for … ## What is the purpose of the change This pull request is intent to improve the failure behavior of the ContinuousFileReader, currently if a bad file (for example, a different schema been dropped in this folder) came to the path and flink will do several retries. However, since the file path persist in the checkpoint, when people tried to resume from external checkpoint, it threw the following error on no file been found and the process cannot move forward. `java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No such file or directory: s3a://myfile` The change is to check if the path exist before open the file, if error occurs and bad file removed, flink should resume the process and continue. ## Brief change log - *Add a file exist check before open the file * ## Verifying this change - *Manually verified the change by introduce a bad file while continuously monitoring the folder, after remove the bad file, the process continued.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChengzhiZhao/flink Improve_failure_behavior_FileInputFormat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5521.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5521 commit 6fa8ef212c536acee56b2e9831ec92d1059449ff Author: Chengzhi Zhao Date: 2018-02-18T18:23:32Z [FLINK-8599] Improve the failure behavior of the FileInputFormat for bad files > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368614#comment-16368614 ] ASF GitHub Bot commented on FLINK-8599: --- Github user ChengzhiZhao closed the pull request at: https://github.com/apache/flink/pull/5520 > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files
[ https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368547#comment-16368547 ] ASF GitHub Bot commented on FLINK-8599: --- GitHub user ChengzhiZhao opened a pull request: https://github.com/apache/flink/pull/5520 [FLINK-8599] Improve the failure behavior of the FileInputFormat for bad files ## What is the purpose of the change This pull request is intent to improve the failure behavior of the FileInputFormat, currently if a bad file (for example, a different schema been dropped in this folder) came to the path and flink will do several retries. However, since the file path persist in the checkpoint, when people tried to resume from external checkpoint, it threw the following error on no file been found and the process cannot move forward. `java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No such file or directory: s3a://myfile` The change is to check if the path exist before open the file, if error occurs and bad file removed, flink should resume the process and continue. ## Brief change log - *Add a file exist check before open the file * ## Verifying this change - *Manually verified the change by introduce a bad file while continuously monitoring the folder, after remove the bad file, the process continued.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChengzhiZhao/flink Improve_failure_behavior_ContinuousFileReaderOperator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5520 commit e1605306d5f4a7c7c52eb1e1f3d213ea9872c71b Author: Chengzhi Zhao Date: 2018-02-16T16:03:01Z [FLINK-8599] Improve the failure behavior of the ContinuousFileReaderOperator commit d8074e5141e0e6dc9f66e1fab8275cec3803904f Author: Chengzhi Zhao Date: 2018-02-17T21:04:59Z [FLINK-8599] break while loop commit 27c031466e1ea2d31cace5cd6a7f33f0d4c896c6 Author: Chengzhi Zhao Date: 2018-02-18T03:14:05Z [FLINK-8599] Check file exists in FileInputFormat commit a087587d12c5118f7ba8e55692dae6f3e95ef50b Author: Chengzhi Zhao Date: 2018-02-18T04:13:50Z [FLINK-8599] Add debug > Improve the failure behavior of the FileInputFormat for bad files > - > > Key: FLINK-8599 > URL: https://issues.apache.org/jira/browse/FLINK-8599 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.4.0, 1.3.2 >Reporter: Chengzhi Zhao >Priority: Major > > So we have a s3 path that flink is monitoring that path to see new files > available. > {code:java} > val avroInputStream_activity = env.readFile(format, path, > FileProcessingMode.PROCESS_CONTINUOUSLY, 1) {code} > > I am doing both internal and external check pointing and let's say there is a > bad file (for example, a different schema been dropped in this folder) came > to the path and flink will do several retries. I want to take those bad files > and let the process continue. However, since the file path persist in the > checkpoint, when I try to resume from external checkpoint, it threw the > following error on no file been found. > > {code:java} > java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No > such file or directory: s3a://myfile{code} > > As [~fhue...@gmail.com] suggested, we could check if a path exists and before > trying to read a file and ignore the input split instead of throwing an > exception and causing a failure. > > Also, I am thinking about add an error output for bad files as an option to > users. So if there is any bad files exist we could move them in a separated > path and do further analysis. > > Not sure how people feel about it, but I'd like to contribute on it if people > think this can be an improvement. -- This message was se