Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"
Hi, Robert It seems that your AccessKeyId is not valid. I think you could find more detailed from [1] about how to configure the s3' access key. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/ Best, Guowei On Thu, Apr 1, 2021 at 9:19 PM Robert Cullen wrote: > Guowei, > > I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm > using the example playground from here: > > [1] https://docs.ververica.com/getting_started/installation.html > > org.apache.flink.util.SerializedThrowable: > 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate MultiPartUpload > on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: > com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you > provided does not exist in our records. (Service: Amazon S3; Status Code: > 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 > Extended Request ID: > VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; > Proxy: null), S3 Extended Request ID: > VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId > at > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) ~[?:?] > at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?] > at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) > ~[?:?] > at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > ~[?:?] > at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?] > at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?] > at > org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) > ~[?:?] > at > org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198) > ~[?:?] > at > org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62) > ~[?:?] > at > org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253) > ~[?:?] > at > org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68) > ~[?:?] > at > org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78) > ~[?:?] > at > org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90) > ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34) > ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at >
Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"
Guowei, I changed to “s3a://argo-artifacts/“ but now I get this error: BTW: I'm using the example playground from here: [1] https://docs.ververica.com/getting_started/installation.html org.apache.flink.util.SerializedThrowable: 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: initiate MultiPartUpload on 4bcbea1c-254c-4a7f-9348-14bf5f3e1f1a/cmdaa-0-0.ext: com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: RMD85E1G3WAK18VE; S3 Extended Request ID: VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=; Proxy: null), S3 Extended Request ID: VTqAVlDCrM+mPrP1XetM7eM9dgfxBcOqu7qNLgsjla8QglvLMHLr5wuxca8yOstIx6AwHZcz/No=:InvalidAccessKeyId at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) ~[?:?] at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) ~[?:?] at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) ~[?:?] at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) ~[?:?] at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) ~[?:?] at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) ~[?:?] at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) ~[?:?] at org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198) ~[?:?] at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:62) ~[?:?] at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:253) ~[?:?] at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68) ~[?:?] at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:78) ~[?:?] at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:90) ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34) ~[flink-table-blink_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:243) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator.processElement(StreamGroupedReduceOperator.java:67) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at
Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"
Hi, Robert I think you could try to change the "s3://argo-artifacts/" to " s3a://argo-artifacts/". It is because that currently `StreamingFileSink` only supports Hadoop based s3 but not Presto based s3. [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html#important-considerations Best, Guowei On Thu, Apr 1, 2021 at 3:56 AM Robert Cullen wrote: > I’m using a local instance of MINIO on my kubernetes cluster for > checkpoint/savepoint storage. I’m using this StreamingFileSync > configuration: > > > final StreamingFileSink> sink = > StreamingFileSink.forRowFormat( > new Path("s3://argo-artifacts/"), > new SimpleStringEncoder Long>>("UTF-8")) > .withBucketAssigner(new KeyBucketAssigner()) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withOutputFileConfig(config) > .build(); > > Anyone know how to fix this exception? > > java.lang.UnsupportedOperationException: This s3 file system implementation > does not support recoverable writers. > at > org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) > ~[?:?] > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] > > -- > Robert Cullen > 240-475-4490 >
s3 FileSystem Error "s3 file system implementation does not support recoverable writers"
I’m using a local instance of MINIO on my kubernetes cluster for checkpoint/savepoint storage. I’m using this StreamingFileSync configuration: final StreamingFileSink> sink = StreamingFileSink.forRowFormat( new Path("s3://argo-artifacts/"), new SimpleStringEncoder>("UTF-8")) .withBucketAssigner(new KeyBucketAssigner()) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(config) .build(); Anyone know how to fix this exception? java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers. at org.apache.flink.fs.s3.common.FlinkS3FileSystem.createRecoverableWriter(FlinkS3FileSystem.java:136) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:132) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:70) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.12-1.12.2-stream1.jar:1.12.2-stream1] -- Robert Cullen 240-475-4490