@Svend - that seems to have done the trick, adding the bucket itself as a resource got flink to write to the configured s3 bucket.
@Gil - we manage our kubernetes cluster on aws with kops. But we do assign the iam roles through the deployment annotations. Seems presto is able to use the s3:// schema in our case Thanks both! Den tors 26 aug. 2021 kl 17:59 skrev Gil De Grove <gil.degr...@euranova.eu>: > Hi Jonas, > > > > Just wondering, are you trying to deploy via iam service account > annotations in a AWS eks cluster? > > We noticed that when using presto, the iam service account was using en > ec2 metadata API inside AWS. However, when using eks service account, the > API used is the webtoken auth. > > Not sure if the solution we find is the appropriate one, but switching to > s3a instead of presto, and forcing the aws defaultProviderChain did the > trick. > > Maybe you could try that. > > Regards, > Gil > > On Thu, Aug 26, 2021, 18:45 Svend <stream...@svend.xyz> wrote: > >> Hi Jonas, >> >> Just a thought, could you try this policy? If I recall correctly, I think >> you need ListBucket on the bucket itself, whereas the other can have a path >> prefix like the "/*" you added >> >> " >> { >> "Version": "2012-10-17", >> "Statement": [ >> { >> "Action": [ >> "s3:ListBucket", >> "s3:Get*", >> "s3:Put*", >> "s3:Delete*" >> ], >> "Resource": [ >> "arn:aws:s3:::<company>-flink-dev", >> "arn:aws:s3:::<company>-flink-dev/*" >> ], >> "Effect": "Allow" >> } >> ] >> } >> " >> >> Svend >> >> >> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote: >> >> Hey Matthias, >> >> Yes, I have followed the documentation on the link you provided - and >> decided to go for the recommended approach of using IAM roles. >> The hive.s3.use-instance-credentials configuration parameter I got from >> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which >> says: >> >> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*, >> is based on code from the Presto project <https://prestodb.io/>. You can >> configure it using the same configuration keys as the Presto file system >> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, >> by adding the configurations to your flink-conf.yaml. The Presto S3 >> implementation is the recommended file system for checkpointing to S3.... >> >> Its possible I am misunderstanding it? >> >> Best, >> Jonas >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins >> >> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl < >> matth...@ververica.com>: >> >> Hi Jonas, >> have you included the s3 credentials in the Flink config file like it's >> described in [1]? I'm not sure about this hive.s3.use-instance-credentials >> being a valid configuration parameter. >> >> Best, >> Matthias >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials >> >> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote: >> >> Hey, >> >> I am setting up HA on a standalone Kubernetes Flink application job >> cluster. >> Flink (1.12.5) is used and I am using S3 as the storage backend >> >> * The JobManager shortly fails after starts with the following errors >> (apologies in advance for the length), and I can't understand what's going >> on. >> * First I thought it may be due to missing Delete privileges of the IAM >> role and updated that, but the problem persists. >> * The S3 bucket configured s3://<company>/recovery is empty. >> >> configmap.yaml >> flink-conf.yaml: |+ >> jobmanager.rpc.address: {{ $fullName }}-jobmanager >> jobmanager.rpc.port: 6123 >> jobmanager.memory.process.size: 1600m >> taskmanager.numberOfTaskSlots: 2 >> taskmanager.rpc.port: 6122 >> taskmanager.memory.process.size: 1728m >> blob.server.port: 6124 >> queryable-state.proxy.ports: 6125 >> parallelism.default: 2 >> scheduler-mode: reactive >> execution.checkpointing.interval: 10s >> restart-strategy: fixed-delay >> restart-strategy.fixed-delay.attempts: 10 >> high-availability: >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> kubernetes.cluster-id: {{ $fullName }} >> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment >> }}/recovery >> hive.s3.use-instance-credentials: true >> kubernetes.namespace: {{ $fullName }} # The namespace that will be used >> for running the jobmanager and taskmanager pods >> >> role.yaml >> kind: Role >> apiVersion: rbac.authorization.k8s.io/v1 >> metadata: >> name: {{ $fullName }} >> namespace: {{ $fullName }} >> labels: >> app: {{ $appName }} >> chart: {{ template "thoros.chart" . }} >> release: {{ .Release.Name }} >> heritage: {{ .Release.Service }} >> >> rules: >> - apiGroups: [""] >> resources: ["configmaps"] >> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"] >> >> aws IAM policy >> { >> "Version": "2012-10-17", >> "Statement": [ >> { >> "Action": [ >> "s3:ListBucket", >> "s3:Get*", >> "s3:Put*", >> "s3:Delete*" >> ], >> "Resource": [ >> "arn:aws:s3:::<company>-flink-dev/*" >> ], >> "Effect": "Allow" >> } >> ] >> } >> >> *Error-log:* >> 2021-08-26 13:08:43,439 INFO org.apache.beam.runners.flink.FlinkRunner >> [] - Executing pipeline using FlinkRunner. >> 2021-08-26 13:08:43,444 WARN org.apache.beam.runners.flink.FlinkRunner >> [] - For maximum performance you should set the >> 'fasterCopy' option. See more at >> https://issues.apache.org/jira/browse/BEAM-11146 >> 2021-08-26 13:08:43,451 INFO org.apache.beam.runners.flink.FlinkRunner >> [] - Translating pipeline to Flink program. >> 2021-08-26 13:08:43,456 INFO >> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found >> unbounded PCollection. Switching to streaming execution. >> 2021-08-26 13:08:43,461 INFO >> org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating >> a Streaming Environment. >> 2021-08-26 13:08:43,462 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: jobmanager.rpc.address, thoros-jobmanager >> 2021-08-26 13:08:43,462 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: jobmanager.rpc.port, 6123 >> 2021-08-26 13:08:43,462 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: jobmanager.memory.process.size, 1600m >> 2021-08-26 13:08:43,463 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: taskmanager.numberOfTaskSlots, 2 >> 2021-08-26 13:08:43,463 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: taskmanager.rpc.port, 6122 >> 2021-08-26 13:08:43,463 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: taskmanager.memory.process.size, 1728m >> 2021-08-26 13:08:43,463 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: blob.server.port, 6124 >> 2021-08-26 13:08:43,464 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: queryable-state.proxy.ports, 6125 >> 2021-08-26 13:08:43,464 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: parallelism.default, 2 >> 2021-08-26 13:08:43,465 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: scheduler-mode, reactive >> 2021-08-26 13:08:43,465 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: execution.checkpointing.interval, 10s >> 2021-08-26 13:08:43,466 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: restart-strategy, fixed-delay >> 2021-08-26 13:08:43,466 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: restart-strategy.fixed-delay.attempts, 10 >> 2021-08-26 13:08:43,466 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: high-availability, >> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >> 2021-08-26 13:08:43,467 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: kubernetes.cluster-id, thoros >> 2021-08-26 13:08:43,467 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: high-availability.storageDir, >> s3://<company>-flink-dev/recovery >> 2021-08-26 13:08:43,468 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: hive.s3.use-instance-credentials, true >> 2021-08-26 13:08:43,468 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: kubernetes.namespace, thoros >> 2021-08-26 13:08:45,232 INFO org.apache.beam.runners.flink.FlinkRunner >> [] - Starting execution of Flink program. >> 2021-08-26 13:08:45,444 INFO >> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >> [] - Job 00000000000000000000000000000000 is submitted. >> 2021-08-26 13:08:45,454 INFO >> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor >> [] - Submitting Job with JobId=00000000000000000000000000000000. >> 2021-08-26 13:08:45,486 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received >> JobGraph submission 00000000000000000000000000000000 >> (main0-flink-0826130845-6f3e805f). >> 2021-08-26 13:08:45,498 INFO >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - >> Submitting job 00000000000000000000000000000000 >> (main0-flink-0826130845-6f3e805f). >> 2021-08-26 13:08:46,152 INFO >> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Removed >> job graph 00000000000000000000000000000000 from >> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}. >> 2021-08-26 13:08:46,169 INFO >> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >> Clean up the high availability data for job >> 00000000000000000000000000000000. >> 2021-08-26 13:08:46,213 INFO >> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - >> Finished cleaning up the high availability data for job >> 00000000000000000000000000000000. >> 2021-08-26 13:08:46,231 WARN >> org.apache.flink.runtime.blob.FileSystemBlobStore [] - Failed >> to delete blob at >> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000 >> 2021-08-26 13:08:46,239 ERROR >> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Failed to >> submit job 00000000000000000000000000000000. >> java.lang.RuntimeException: java.lang.Exception: Could not open output >> stream for state backend >> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) >> ~[?:1.8.0_302] >> at >> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646) >> ~[?:1.8.0_302] >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) >> ~[?:1.8.0_302] >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.actor.Actor.aroundReceive(Actor.scala:517) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.actor.Actor.aroundReceive$(Actor.scala:515) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> Caused by: java.lang.Exception: Could not open output stream for state >> backend >> at >> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> ... 27 more >> Caused by: >> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: >> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: >> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: >> V0BWCA4RDVE0EVK8; S3 Extended Request ID: >> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=; >> Proxy: null), S3 Extended Request ID: >> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o= >> (Path: >> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573) >> ~[?:?] >> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?] >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) >> ~[?:?] >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) >> ~[?:?] >> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?] >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356) >> ~[?:?] >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?] >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?] >> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?] >> at >> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154) >> ~[?:?] >> at >> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37) >> ~[?:?] >> at >> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> at >> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93) >> ~[flink-dist_2.12-1.12.5.jar:1.12.5] >> ... 27 more >> -- >> *Med Vänliga Hälsningar* >> *Jonas Eyob* >> >> >> >> -- >> *Med Vänliga Hälsningar* >> *Jonas Eyob* >> >> >> -- *Med Vänliga Hälsningar* *Jonas Eyob*