Hi Jonas,


Just wondering, are you trying to deploy via iam service account
annotations in a AWS eks cluster?

We noticed that when using presto, the iam service account was using en ec2
metadata API inside AWS. However, when using eks service account, the API
used is the webtoken auth.

Not sure if the solution we find is the appropriate one, but switching to
s3a instead of presto, and forcing the aws defaultProviderChain did the
trick.

Maybe you could try that.

Regards,
Gil

On Thu, Aug 26, 2021, 18:45 Svend <stream...@svend.xyz> wrote:

> Hi Jonas,
>
> Just a thought, could you try this policy? If I recall correctly, I think
> you need ListBucket on the bucket itself, whereas the other can have a path
> prefix like the "/*" you added
>
> "
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev",
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
> "
>
> Svend
>
>
> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>
> Hey Matthias,
>
> Yes, I have followed the documentation on the link you provided - and
> decided to go for the recommended approach of using IAM roles.
> The hive.s3.use-instance-credentials configuration parameter I got from
> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
> says:
>
> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
> is based on code from the Presto project <https://prestodb.io/>. You can
> configure it using the same configuration keys as the Presto file system
> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>,
> by adding the configurations to your flink-conf.yaml. The Presto S3
> implementation is the recommended file system for checkpointing to S3....
>
> Its possible I am misunderstanding it?
>
> Best,
> Jonas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <matth...@ververica.com
> >:
>
> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote:
>
> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3://<company>/recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
>     "Version": "2012-10-17",
>     "Statement": [
>         {
>             "Action": [
>                 "s3:ListBucket",
>                 "s3:Get*",
>                 "s3:Put*",
>                 "s3:Delete*"
>             ],
>             "Resource": [
>                 "arn:aws:s3:::<company>-flink-dev/*"
>             ],
>             "Effect": "Allow"
>         }
>     ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>                  [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: scheduler-mode, reactive
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: execution.checkpointing.interval, 10s
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy, fixed-delay
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: restart-strategy.fixed-delay.attempts, 10
> 2021-08-26 13:08:43,466 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability,
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.cluster-id, thoros
> 2021-08-26 13:08:43,467 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: high-availability.storageDir,
> s3://<company>-flink-dev/recovery
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: hive.s3.use-instance-credentials, true
> 2021-08-26 13:08:43,468 INFO
>  org.apache.flink.configuration.GlobalConfiguration           [] - Loading
> configuration property: kubernetes.namespace, thoros
> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner
>                  [] - Starting execution of Flink program.
> 2021-08-26 13:08:45,444 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Job 00000000000000000000000000000000 is submitted.
> 2021-08-26 13:08:45,454 INFO
>  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
> [] - Submitting Job with JobId=00000000000000000000000000000000.
> 2021-08-26 13:08:45,486 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received
> JobGraph submission 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:45,498 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] -
> Submitting job 00000000000000000000000000000000
> (main0-flink-0826130845-6f3e805f).
> 2021-08-26 13:08:46,152 INFO
>  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed
> job graph 00000000000000000000000000000000 from
> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
> 2021-08-26 13:08:46,169 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,213 INFO
>  org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> 00000000000000000000000000000000.
> 2021-08-26 13:08:46,231 WARN
>  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed
> to delete blob at
> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
> 2021-08-26 13:08:46,239 ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to
> submit job 00000000000000000000000000000000.
> java.lang.RuntimeException: java.lang.Exception: Could not open output
> stream for state backend
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_302]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> ~[?:1.8.0_302]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> Caused by: java.lang.Exception: Could not open output stream for state
> backend
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> Caused by:
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> V0BWCA4RDVE0EVK8; S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
> Proxy: null), S3 Extended Request ID:
> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
> (Path:
> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
> ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
> ... 27 more
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>
>
>

Reply via email to