[ https://issues.apache.org/jira/browse/FLINK-33536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786318#comment-17786318 ]
Samrat Deb commented on FLINK-33536: ------------------------------------ [~prabhujoseph] , [~martijnvisser] I have tested the changes and raised a PR. Please review the changes whenever time > Flink Table API CSV streaming sink fails with "IOException: Stream closed" > -------------------------------------------------------------------------- > > Key: FLINK-33536 > URL: https://issues.apache.org/jira/browse/FLINK-33536 > Project: Flink > Issue Type: Bug > Components: FileSystems, Table SQL / API > Affects Versions: 1.18.0 > Reporter: Prabhu Joseph > Priority: Major > Labels: pull-request-available > > Flink Table API CSV streaming sink fails with "IOException: Stream closed". > Prior to Flink 1.18, CSV streaming sink used to fail with > "S3RecoverableFsDataOutputStream cannot sync state to S3" which is fixed by > [FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513]. The fix > seems not complete, it fails with this issue now. > *Repro* > {code} > SET 'execution.runtime-mode' = 'streaming'; > create table dummy_table ( > id int, > data string > ) with ( > 'connector' = 'filesystem', > 'path' = 's3://prabhuflinks3/dummy_table/', > 'format' = 'csv' > ); > INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), > (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); > {code} > *Error* > {code} > Caused by: java.io.IOException: Stream closed. > at > org.apache.flink.core.fs.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:76) > at > org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:52) > at > org.apache.flink.core.fs.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104) > at > org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:209) > at > org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:177) > at > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:75) > at > org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:65) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:379) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:338) > at > org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput(AbstractStreamingWriter.java:155) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:149) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:149) > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:156) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:619) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.lambda$completeProcessing$0(SourceStreamTask.java:367) > at > org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:126) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:750) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)