Hi Jonas, Just a thought, could you try this policy? If I recall correctly, I think you need ListBucket on the bucket itself, whereas the other can have a path prefix like the "/*" you added
" { "Version": "2012-10-17", "Statement": [ { "Action": [ "s3:ListBucket", "s3:Get*", "s3:Put*", "s3:Delete*" ], "Resource": [ "arn:aws:s3:::<company>-flink-dev", "arn:aws:s3:::<company>-flink-dev/*" ], "Effect": "Allow" } ] } " Svend On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote: > Hey Matthias, > > Yes, I have followed the documentation on the link you provided - and decided > to go for the recommended approach of using IAM roles. > The hive.s3.use-instance-credentials configuration parameter I got from [1] > (first bullet) since I am using the flink-s3-fs-presto plugin - which says: > > ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is > based on code from the Presto project <https://prestodb.io/>. You can > configure it using the same configuration keys as the Presto file system > <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, > by adding the configurations to your `flink-conf.yaml`. The Presto S3 > implementation is the recommended file system for checkpointing to S3.... > > Its possible I am misunderstanding it? > > Best, > Jonas > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins > > Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <matth...@ververica.com>: >> Hi Jonas, >> have you included the s3 credentials in the Flink config file like it's >> described in [1]? I'm not sure about this hive.s3.use-instance-credentials >> being a valid configuration parameter. >> >> Best, >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials >> >> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote: >>> Hey, >>> >>> I am setting up HA on a standalone Kubernetes Flink application job >>> cluster. >>> Flink (1.12.5) is used and I am using S3 as the storage backend >>> >>> * The JobManager shortly fails after starts with the following errors >>> (apologies in advance for the length), and I can't understand what's going >>> on. >>> * First I thought it may be due to missing Delete privileges of the IAM >>> role and updated that, but the problem persists. >>> * The S3 bucket configured s3://<company>/recovery is empty. >>> >>> configmap.yaml >>> flink-conf.yaml: |+ >>> jobmanager.rpc.address: {{ $fullName }}-jobmanager >>> jobmanager.rpc.port: 6123 >>> jobmanager.memory.process.size: 1600m >>> taskmanager.numberOfTaskSlots: 2 >>> taskmanager.rpc.port: 6122 >>> taskmanager.memory.process.size: 1728m >>> blob.server.port: 6124 >>> queryable-state.proxy.ports: 6125 >>> parallelism.default: 2 >>> scheduler-mode: reactive >>> execution.checkpointing.interval: 10s >>> restart-strategy: fixed-delay >>> restart-strategy.fixed-delay.attempts: 10 >>> high-availability: >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> kubernetes.cluster-id: {{ $fullName }} >>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment >>> }}/recovery >>> hive.s3.use-instance-credentials: true >>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for >>> running the jobmanager and taskmanager pods >>> >>> role.yaml >>> kind: Role >>> apiVersion: rbac.authorization.k8s.io/v1 >>> metadata: >>> name: {{ $fullName }} >>> namespace: {{ $fullName }} >>> labels: >>> app: {{ $appName }} >>> chart: {{ template "thoros.chart" . }} >>> release: {{ .Release.Name }} >>> heritage: {{ .Release.Service }} >>> >>> rules: >>> - apiGroups: [""] >>> resources: ["configmaps"] >>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"] >>> >>> aws IAM policy >>> { >>> "Version": "2012-10-17", >>> "Statement": [ >>> { >>> "Action": [ >>> "s3:ListBucket", >>> "s3:Get*", >>> "s3:Put*", >>> "s3:Delete*" >>> ], >>> "Resource": [ >>> "arn:aws:s3:::<company>-flink-dev/*" >>> ], >>> "Effect": "Allow" >>> } >>> ] >>> } >>> >>> *Error-log:* >>> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner >>> [] - Executing pipeline using FlinkRunner. >>> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner >>> [] - For maximum performance you should set the 'fasterCopy' >>> option. See more at https://issues.apache.org/jira/browse/BEAM-11146 >>> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner >>> [] - Translating pipeline to Flink program. >>> 2021-08-26 13:08:43,456 INFO >>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found >>> unbounded PCollection. Switching to streaming execution. >>> 2021-08-26 13:08:43,461 INFO >>> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating >>> a Streaming Environment. >>> 2021-08-26 13:08:43,462 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: jobmanager.rpc.address, thoros-jobmanager >>> 2021-08-26 13:08:43,462 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: jobmanager.rpc.port, 6123 >>> 2021-08-26 13:08:43,462 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: jobmanager.memory.process.size, 1600m >>> 2021-08-26 13:08:43,463 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: taskmanager.numberOfTaskSlots, 2 >>> 2021-08-26 13:08:43,463 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: taskmanager.rpc.port, 6122 >>> 2021-08-26 13:08:43,463 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: taskmanager.memory.process.size, 1728m >>> 2021-08-26 13:08:43,463 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: blob.server.port, 6124 >>> 2021-08-26 13:08:43,464 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: queryable-state.proxy.ports, 6125 >>> 2021-08-26 13:08:43,464 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: parallelism.default, 2 >>> 2021-08-26 13:08:43,465 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: scheduler-mode, reactive >>> 2021-08-26 13:08:43,465 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: execution.checkpointing.interval, 10s >>> 2021-08-26 13:08:43,466 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: restart-strategy, fixed-delay >>> 2021-08-26 13:08:43,466 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: restart-strategy.fixed-delay.attempts, 10 >>> 2021-08-26 13:08:43,466 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: high-availability, >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> 2021-08-26 13:08:43,467 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: kubernetes.cluster-id, thoros >>> 2021-08-26 13:08:43,467 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: high-availability.storageDir, >>> s3://<company>-flink-dev/recovery >>> 2021-08-26 13:08:43,468 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: hive.s3.use-instance-credentials, true >>> 2021-08-26 13:08:43,468 INFO >>> org.apache.flink.configuration.GlobalConfiguration [] - Loading >>> configuration property: kubernetes.namespace, thoros >>> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner >>> [] - Starting execution of Flink program. >>> 2021-08-26 13:08:45,444 INFO >>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >>> [] - Job 00000000000000000000000000000000 is submitted. >>> 2021-08-26 13:08:45,454 INFO >>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >>> [] - Submitting Job with JobId=00000000000000000000000000000000. >>> 2021-08-26 13:08:45,486 INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received >>> JobGraph submission 00000000000000000000000000000000 >>> (main0-flink-0826130845-6f3e805f). >>> 2021-08-26 13:08:45,498 INFO >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - >>> Submitting job 00000000000000000000000000000000 >>> (main0-flink-0826130845-6f3e805f). >>> 2021-08-26 13:08:46,152 INFO >>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed >>> job graph 00000000000000000000000000000000 from >>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}. >>> 2021-08-26 13:08:46,169 INFO >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >>> Clean up the high availability data for job >>> 00000000000000000000000000000000. >>> 2021-08-26 13:08:46,213 INFO >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >>> Finished cleaning up the high availability data for job >>> 00000000000000000000000000000000. >>> 2021-08-26 13:08:46,231 WARN >>> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed to >>> delete blob at >>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000 >>> 2021-08-26 13:08:46,239 ERROR >>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to >>> submit job 00000000000000000000000000000000. >>> java.lang.RuntimeException: java.lang.Exception: Could not open output >>> stream for state backend >>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) >>> ~[?:1.8.0_302] >>> at >>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) >>> ~[?:1.8.0_302] >>> at >>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >>> ~[?:1.8.0_302] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.actor.Actor.aroundReceive(Actor.scala:517) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.actor.Actor.aroundReceive$(Actor.scala:515) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> Caused by: java.lang.Exception: Could not open output stream for state >>> backend >>> at >>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> ... 27 more >>> Caused by: >>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: >>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: >>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: >>> V0BWCA4RDVE0EVK8; S3 Extended Request ID: >>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=; >>> Proxy: null), S3 Extended Request ID: >>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o= >>> (Path: >>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6) >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573) >>> ~[?:?] >>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) >>> ~[?:?] >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) >>> ~[?:?] >>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?] >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356) >>> ~[?:?] >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?] >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?] >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?] >>> at >>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154) >>> ~[?:?] >>> at >>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37) >>> ~[?:?] >>> at >>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> at >>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) >>> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >>> ... 27 more >>> -- >>> *Med Vänliga Hälsningar* >>> *Jonas Eyob* > > > -- > *Med Vänliga Hälsningar* > *Jonas Eyob*