Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-24 Thread Robert Metzger
Thanks for opening the ticket. I've asked a committer who knows the
streaming sink well to take a look at the ticket.

On Fri, Apr 24, 2020 at 6:47 AM Lu Niu  wrote:

> Hi, Robert
>
> BTW, I did some field study and I think it's possible to support streaming
> sink using presto s3 filesystem. I think that would help user to use presto
> s3 fs in all access to s3. I created this jira ticket
> https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?
>
> Best
> Lu
>
> On Tue, Apr 21, 2020 at 1:46 PM Lu Niu  wrote:
>
>> Cool, thanks!
>>
>> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger 
>> wrote:
>>
>>> I'm not aware of anything. I think the presto s3 file system is
>>> generally the recommended S3 FS implementation.
>>>
>>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>>>
 Thank you both. Given the debug overhead, I might just try out presto
 s3 file system then. Besides that presto s3 file system doesn't support
 streaming sink, is there anything else I need to keep in mind? Thanks!

 Best
 Lu

 On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
 wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community
>> can help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However,
>>> as I mentioned in original post, there is enough capacity in all disk.
>>> Also, when I switch to presto file system, the problem goes away. 
>>> Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "
 org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
 util.DiskChecker$DiskErrorException: Could not find any valid
 local directory for s3ablock-0001-", and I googled the exception, found
 there is some relative page[1], could you please make sure there is 
 enough
 space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from
> S3AFileSystem. But there is no capacity issue on any disk. we are 
> using
> hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: 
> java.io.IOException: Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for 
> state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-23 Thread Lu Niu
Hi, Robert

BTW, I did some field study and I think it's possible to support streaming
sink using presto s3 filesystem. I think that would help user to use presto
s3 fs in all access to s3. I created this jira ticket
https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?

Best
Lu

On Tue, Apr 21, 2020 at 1:46 PM Lu Niu  wrote:

> Cool, thanks!
>
> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger 
> wrote:
>
>> I'm not aware of anything. I think the presto s3 file system is generally
>> the recommended S3 FS implementation.
>>
>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>>
>>> Thank you both. Given the debug overhead, I might just try out presto s3
>>> file system then. Besides that presto s3 file system doesn't support
>>> streaming sink, is there anything else I need to keep in mind? Thanks!
>>>
>>> Best
>>> Lu
>>>
>>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
>>> wrote:
>>>
 Hey,
 Others have experienced this as well, yes:
 https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
 I have also notified the Hadoop project about this issue:
 https://issues.apache.org/jira/browse/HADOOP-15915

 I agree with Congxian: You could try reaching out to the Hadoop user@
 list for additional help. Maybe logging on DEBUG level helps already?
 If you are up for an adventure, you could also consider adding some
 debugging code into Hadoop's DiskChecker and compile a custom Hadoop
 version.

 Best,
 Robert


 On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
 wrote:

> Hi LU
>
> I'm not familiar with S3 file system, maybe others in Flink community
> can help you in this case, or maybe you can also reach out to s3
> teams/community for help.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午11:05写道:
>
>> Hi, Congxiao
>>
>> Thanks for replying. yeah, I also found those references. However, as
>> I mentioned in original post, there is enough capacity in all disk. Also,
>> when I switch to presto file system, the problem goes away. Wondering
>> whether others encounter similar issue.
>>
>> Best
>> Lu
>>
>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> From the stack, seems the problem is that "
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
>>> util.DiskChecker$DiskErrorException: Could not find any valid local
>>> directory for s3ablock-0001-", and I googled the exception, found there 
>>> is
>>> some relative page[1], could you please make sure there is enough space 
>>> on
>>> the local dis.
>>>
>>> [1]
>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>>
 Hi, flink users

 Did anyone encounter such error? The error comes from
 S3AFileSystem. But there is no capacity issue on any disk. we are using
 hadoop 2.7.1.
 ```

 Caused by: java.util.concurrent.ExecutionException: 
 java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
 Caused by: java.io.IOException: Could not open output stream for state 
 backend
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-21 Thread Lu Niu
Cool, thanks!

On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger  wrote:

> I'm not aware of anything. I think the presto s3 file system is generally
> the recommended S3 FS implementation.
>
> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:
>
>> Thank you both. Given the debug overhead, I might just try out presto s3
>> file system then. Besides that presto s3 file system doesn't support
>> streaming sink, is there anything else I need to keep in mind? Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
>> wrote:
>>
>>> Hey,
>>> Others have experienced this as well, yes:
>>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
>>> I have also notified the Hadoop project about this issue:
>>> https://issues.apache.org/jira/browse/HADOOP-15915
>>>
>>> I agree with Congxian: You could try reaching out to the Hadoop user@
>>> list for additional help. Maybe logging on DEBUG level helps already?
>>> If you are up for an adventure, you could also consider adding some
>>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
>>> version.
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
>>> wrote:
>>>
 Hi LU

 I'm not familiar with S3 file system, maybe others in Flink community
 can help you in this case, or maybe you can also reach out to s3
 teams/community for help.

 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午11:05写道:

> Hi, Congxiao
>
> Thanks for replying. yeah, I also found those references. However, as
> I mentioned in original post, there is enough capacity in all disk. Also,
> when I switch to presto file system, the problem goes away. Wondering
> whether others encounter similar issue.
>
> Best
> Lu
>
> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
> wrote:
>
>> Hi
>> From the stack, seems the problem is that "
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
>> util.DiskChecker$DiskErrorException: Could not find any valid local
>> directory for s3ablock-0001-", and I googled the exception, found there 
>> is
>> some relative page[1], could you please make sure there is enough space 
>> on
>> the local dis.
>>
>> [1]
>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>
>>> Hi, flink users
>>>
>>> Did anyone encounter such error? The error comes from S3AFileSystem.
>>> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
>>> ```
>>>
>>> Caused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not open output stream for state backend
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>> ... 3 more
>>> Caused by: java.io.IOException: Could not open output stream for state 
>>> backend
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>> at 
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at 
>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>> at 
>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>> at 
>>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-21 Thread Robert Metzger
I'm not aware of anything. I think the presto s3 file system is generally
the recommended S3 FS implementation.

On Mon, Apr 13, 2020 at 11:46 PM Lu Niu  wrote:

> Thank you both. Given the debug overhead, I might just try out presto s3
> file system then. Besides that presto s3 file system doesn't support
> streaming sink, is there anything else I need to keep in mind? Thanks!
>
> Best
> Lu
>
> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger 
> wrote:
>
>> Hey,
>> Others have experienced this as well, yes:
>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
>> I have also notified the Hadoop project about this issue:
>> https://issues.apache.org/jira/browse/HADOOP-15915
>>
>> I agree with Congxian: You could try reaching out to the Hadoop user@
>> list for additional help. Maybe logging on DEBUG level helps already?
>> If you are up for an adventure, you could also consider adding some
>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
>> version.
>>
>> Best,
>> Robert
>>
>>
>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
>> wrote:
>>
>>> Hi LU
>>>
>>> I'm not familiar with S3 file system, maybe others in Flink community
>>> can help you in this case, or maybe you can also reach out to s3
>>> teams/community for help.
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>>
 Hi, Congxiao

 Thanks for replying. yeah, I also found those references. However, as I
 mentioned in original post, there is enough capacity in all disk. Also,
 when I switch to presto file system, the problem goes away. Wondering
 whether others encounter similar issue.

 Best
 Lu

 On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
 wrote:

> Hi
> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
> not find any valid local directory for s3ablock-0001-", and I googled the
> exception, found there is some relative page[1], could you please make 
> sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午8:41写道:
>
>> Hi, flink users
>>
>> Did anyone encounter such error? The error comes from S3AFileSystem.
>> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
>> ```
>>
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not open output stream for state backend
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>  ... 3 more
>> Caused by: java.io.IOException: Could not open output stream for state 
>> backend
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>  at 
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>  at 
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>  at 
>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>  at 
>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-13 Thread Lu Niu
Thank you both. Given the debug overhead, I might just try out presto s3
file system then. Besides that presto s3 file system doesn't support
streaming sink, is there anything else I need to keep in mind? Thanks!

Best
Lu

On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger  wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community can
>> help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However, as I
>>> mentioned in original post, there is enough capacity in all disk. Also,
>>> when I switch to presto file system, the problem goes away. Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "org.apache.flink.fs.shaded.
 hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
 not find any valid local directory for s3ablock-0001-", and I googled the
 exception, found there is some relative page[1], could you please make sure
 there is enough space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem.
> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-09 Thread Robert Metzger
Hey,
Others have experienced this as well, yes:
https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
I have also notified the Hadoop project about this issue:
https://issues.apache.org/jira/browse/HADOOP-15915

I agree with Congxian: You could try reaching out to the Hadoop user@ list
for additional help. Maybe logging on DEBUG level helps already?
If you are up for an adventure, you could also consider adding some
debugging code into Hadoop's DiskChecker and compile a custom Hadoop
version.

Best,
Robert


On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu  wrote:

> Hi LU
>
> I'm not familiar with S3 file system, maybe others in Flink community can
> help you in this case, or maybe you can also reach out to s3
> teams/community for help.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午11:05写道:
>
>> Hi, Congxiao
>>
>> Thanks for replying. yeah, I also found those references. However, as I
>> mentioned in original post, there is enough capacity in all disk. Also,
>> when I switch to presto file system, the problem goes away. Wondering
>> whether others encounter similar issue.
>>
>> Best
>> Lu
>>
>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
>>> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
>>> not find any valid local directory for s3ablock-0001-", and I googled the
>>> exception, found there is some relative page[1], could you please make sure
>>> there is enough space on the local dis.
>>>
>>> [1]
>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>>
 Hi, flink users

 Did anyone encounter such error? The error comes from S3AFileSystem.
 But there is no capacity issue on any disk. we are using hadoop 2.7.1.
 ```

 Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
 Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
 Caused by: java.io.IOException: Could not open output stream for state 
 backend
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
 org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at 
 java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
 org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
 java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at 
 java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
at 
 org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
  

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-08 Thread Congxian Qiu
Hi LU

I'm not familiar with S3 file system, maybe others in Flink community can
help you in this case, or maybe you can also reach out to s3
teams/community for help.

Best,
Congxian


Lu Niu  于2020年4月8日周三 上午11:05写道:

> Hi, Congxiao
>
> Thanks for replying. yeah, I also found those references. However, as I
> mentioned in original post, there is enough capacity in all disk. Also,
> when I switch to presto file system, the problem goes away. Wondering
> whether others encounter similar issue.
>
> Best
> Lu
>
> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
> wrote:
>
>> Hi
>> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
>> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
>> find any valid local directory for s3ablock-0001-", and I googled the
>> exception, found there is some relative page[1], could you please make sure
>> there is enough space on the local dis.
>>
>> [1]
>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>
>>> Hi, flink users
>>>
>>> Did anyone encounter such error? The error comes from S3AFileSystem. But
>>> there is no capacity issue on any disk. we are using hadoop 2.7.1.
>>> ```
>>>
>>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>>> Could not open output stream for state backend
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>> at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>> ... 3 more
>>> Caused by: java.io.IOException: Could not open output stream for state 
>>> backend
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>> at 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>> at 
>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>> at 
>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>> at 
>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>> at 
>>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>> at 
>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>> at 
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>> at 
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at 
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>> ... 5 more
>>> Caused by: 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>>  Could not find any valid local directory for s3ablock-0001-
>>> at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>> at 
>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>> at 
>>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Lu Niu
Hi, Congxiao

Thanks for replying. yeah, I also found those references. However, as I
mentioned in original post, there is enough capacity in all disk. Also,
when I switch to presto file system, the problem goes away. Wondering
whether others encounter similar issue.

Best
Lu

On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu  wrote:

> Hi
> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
> find any valid local directory for s3ablock-0001-", and I googled the
> exception, found there is some relative page[1], could you please make sure
> there is enough space on the local dis.
>
> [1]
> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午8:41写道:
>
>> Hi, flink users
>>
>> Did anyone encounter such error? The error comes from S3AFileSystem. But
>> there is no capacity issue on any disk. we are using hadoop 2.7.1.
>> ```
>>
>> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
>> Could not open output stream for state backend
>>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>  at 
>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>  ... 3 more
>> Caused by: java.io.IOException: Could not open output stream for state 
>> backend
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>  at 
>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>  at 
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>  at 
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>  at 
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>  at 
>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>  at 
>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>  at 
>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>  at 
>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>  at 
>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>  at 
>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>  ... 5 more
>> Caused by: 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>  Could not find any valid local directory for s3ablock-0001-
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>>  at 
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>>  at 
>> 

Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-07 Thread Congxian Qiu
Hi
>From the stack, seems the problem is that "org.apache.flink.fs.shaded.
hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not
find any valid local directory for s3ablock-0001-", and I googled the
exception, found there is some relative page[1], could you please make sure
there is enough space on the local dis.

[1]
https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
Best,
Congxian


Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem. But
> there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>   ... 5 more
> Caused by: 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>  Could not find any valid local directory for s3ablock-0001-
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
>   at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:168)
>   at 
>