I’m using a local instance of MINIO on my kubernetes cluster for
checkpoint/savepoint storage. I’m using this StreamingFileSync
configuration:


        final StreamingFileSink<Tuple2<String, Long>> sink =
                StreamingFileSink.forRowFormat(
                        new Path("s3://argo-artifacts/"),
                        new SimpleStringEncoder<Tuple2<String, Long>>("UTF-8"))
                        .withBucketAssigner(new KeyBucketAssigner())
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .withOutputFileConfig(config)
                        .build();

Anyone know how to fix this exception?

java.lang.UnsupportedOperationException: This s3 file system
implementation does not support recoverable writers.
    at 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136)
~[?:?]
    at 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1]

-- 
Robert Cullen
240-475-4490

Reply via email to