Hi, Robert I think you could try to change the "s3://argo-artifacts/" to " s3a://argo-artifacts/". It is because that currently `StreamingFileSink` only supports Hadoop based s3 but not Presto based s3. [1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations Best, Guowei On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen <cinquate...@gmail.com> wrote: > I’m using a local instance of MINIO on my kubernetes cluster for > checkpoint/savepoint storage. I’m using this StreamingFileSync > configuration: > > > final StreamingFileSink<Tuple2<String, Long>> sink = > StreamingFileSink.forRowFormat( > new Path("s3://argo-artifacts/"), > new SimpleStringEncoder<Tuple2<String, > Long>>("UTF-8")) > .withBucketAssigner(new KeyBucketAssigner()) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withOutputFileConfig(config) > .build(); > > Anyone know how to fix this exception? > > java.lang.UnsupportedOperationException: This s3 file system implementation > does not support recoverable writers. > at > org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > > -- > Robert Cullen > 240-475-4490 >