Thanks for opening the ticket. I've asked a committer who knows the
streaming sink well to take a look at the ticket.

On Fri, Apr 24, 2020 at 6:47 AM Lu Niu <qqib...@gmail.com> wrote:

> Hi, Robert
>
> BTW, I did some field study and I think it's possible to support streaming
> sink using presto s3 filesystem. I think that would help user to use presto
> s3 fs in all access to s3. I created this jira ticket
> https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?
>
> Best
> Lu
>
> On Tue, Apr 21, 2020 at 1:46 PM Lu Niu <qqib...@gmail.com> wrote:
>
>> Cool, thanks!
>>
>> On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> I'm not aware of anything. I think the presto s3 file system is
>>> generally the recommended S3 FS implementation.
>>>
>>> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu <qqib...@gmail.com> wrote:
>>>
>>>> Thank you both. Given the debug overhead, I might just try out presto
>>>> s3 file system then. Besides that presto s3 file system doesn't support
>>>> streaming sink, is there anything else I need to keep in mind? Thanks!
>>>>
>>>> Best
>>>> Lu
>>>>
>>>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>> Others have experienced this as well, yes:
>>>>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
>>>>> I have also notified the Hadoop project about this issue:
>>>>> https://issues.apache.org/jira/browse/HADOOP-15915
>>>>>
>>>>> I agree with Congxian: You could try reaching out to the Hadoop user@
>>>>> list for additional help. Maybe logging on DEBUG level helps already?
>>>>> If you are up for an adventure, you could also consider adding some
>>>>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
>>>>> version.
>>>>>
>>>>> Best,
>>>>> Robert
>>>>>
>>>>>
>>>>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu <qcx978132...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi LU
>>>>>>
>>>>>> I'm not familiar with S3 file system, maybe others in Flink community
>>>>>> can help you in this case, or maybe you can also reach out to s3
>>>>>> teams/community for help.
>>>>>>
>>>>>> Best,
>>>>>> Congxian
>>>>>>
>>>>>>
>>>>>> Lu Niu <qqib...@gmail.com> 于2020年4月8日周三 上午11:05写道:
>>>>>>
>>>>>>> Hi, Congxiao
>>>>>>>
>>>>>>> Thanks for replying. yeah, I also found those references. However,
>>>>>>> as I mentioned in original post, there is enough capacity in all disk.
>>>>>>> Also, when I switch to presto file system, the problem goes away. 
>>>>>>> Wondering
>>>>>>> whether others encounter similar issue.
>>>>>>>
>>>>>>> Best
>>>>>>> Lu
>>>>>>>
>>>>>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu <qcx978132...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>> From the stack, seems the problem is that "
>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.
>>>>>>>> util.DiskChecker$DiskErrorException: Could not find any valid
>>>>>>>> local directory for s3ablock-0001-", and I googled the exception, found
>>>>>>>> there is some relative page[1], could you please make sure there is 
>>>>>>>> enough
>>>>>>>> space on the local dis.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt-xxxx-xxxx-m-x-file-out
>>>>>>>> Best,
>>>>>>>> Congxian
>>>>>>>>
>>>>>>>>
>>>>>>>> Lu Niu <qqib...@gmail.com> 于2020年4月8日周三 上午8:41写道:
>>>>>>>>
>>>>>>>>> Hi, flink users
>>>>>>>>>
>>>>>>>>> Did anyone encounter such error? The error comes from
>>>>>>>>> S3AFileSystem. But there is no capacity issue on any disk. we are 
>>>>>>>>> using
>>>>>>>>> hadoop 2.7.1.
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Caused by: java.util.concurrent.ExecutionException: 
>>>>>>>>> java.io.IOException: Could not open output stream for state backend
>>>>>>>>>       at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>>>>>>>>       at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>>>>>>>>>       ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Could not open output stream for 
>>>>>>>>> state backend
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>>>>>>>>>       at 
>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>>>>>>>>       at 
>>>>>>>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>>>>>>>>>       at 
>>>>>>>>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>>>>>>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
>>>>>>>>>       ... 5 more
>>>>>>>>> Caused by: 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException:
>>>>>>>>>  Could not find any valid local directory for s3ablock-0001-
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
>>>>>>>>>       ... 22 more
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Best
>>>>>>>>> Lu
>>>>>>>>>
>>>>>>>>

Reply via email to