Hi, Robert
I think you could try to change the "s3://argo-artifacts/" to "
s3a://argo-artifacts/".
It is because that currently `StreamingFileSink` only supports Hadoop based
s3 but not Presto based s3. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations

Best,
Guowei


On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen <cinquate...@gmail.com> wrote:

> I’m using a local instance of MINIO on my kubernetes cluster for
> checkpoint/savepoint storage. I’m using this StreamingFileSync
> configuration:
>
>
>         final StreamingFileSink<Tuple2<String, Long>> sink =
>                 StreamingFileSink.forRowFormat(
>                         new Path("s3://argo-artifacts/"),
>                         new SimpleStringEncoder<Tuple2<String, 
> Long>>("UTF-8"))
>                         .withBucketAssigner(new KeyBucketAssigner())
>                         .withRollingPolicy(OnCheckpointRollingPolicy.build())
>                         .withOutputFileConfig(config)
>                         .build();
>
> Anyone know how to fix this exception?
>
> java.lang.UnsupportedOperationException: This s3 file system implementation 
> does not support recoverable writers.
>     at 
> org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
>  ~[?:?]
>     at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>  ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
> ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
>
> --
> Robert Cullen
> 240-475-4490
>

Reply via email to