[ 
https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785634#comment-17785634
 ] 

Samrat Deb commented on FLINK-33536:
------------------------------------

Hello [~prabhujoseph]  and [~martijnvisser] ,

I was able to reproduced the issue and have been investigating FLINK-28513. I'm 
currently uncertain about what might have been overlooked during the initial 
work. I'll proceed with debugging to identify the root cause and will share the 
findings along with any necessary fixes.

 

> Flink Table API CSV streaming sink fails with "IOException: Stream closed"
> --------------------------------------------------------------------------
>
>                 Key: FLINK-33536
>                 URL: https://issues.apache.org/jira/browse/FLINK-33536
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems, Table SQL / API
>    Affects Versions: 1.18.0
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Flink Table API CSV streaming sink fails with "IOException: Stream closed". 
> Prior to Flink 1.18, CSV streaming sink used to fail with 
> "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by 
> [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix 
> seems not complete, it fails with this issue now.
> *Repro*
> {code}
> SET 'execution.runtime-mode' = 'streaming';
> create table dummy_table (
>   id int,
>   data string
> ) with (
>   'connector' = 'filesystem',
>   'path' = 's3://prabhuflinks3/dummy_table/',
>   'format' = 'csv'
> );
> INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
> (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
> {code}
> *Error*
> {code}
> Caused by: java.io.IOException: Stream closed.
>       at 
> org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76)
>       at 
> org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52)
>       at 
> org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)
>       at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209)
>       at 
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379)
>       at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338)
>       at 
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367)
>       at 
> org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>       at java.lang.Thread.run(Thread.java:750)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to