Hi Jonas,

Just a thought, could you try this policy? If I recall correctly, I think you 
need ListBucket on the bucket itself, whereas the other can have a path prefix 
like the "/*" you added

"
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "s3:ListBucket",
                "s3:Get*",
                "s3:Put*",
                "s3:Delete*"
            ],
            "Resource": [
                "arn:aws:s3:::<company>-flink-dev",
                "arn:aws:s3:::<company>-flink-dev/*"
            ],
            "Effect": "Allow"
        }
    ]
}
"

Svend


On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
> Hey Matthias,
> 
> Yes, I have followed the documentation on the link you provided - and decided 
> to go for the recommended approach of using IAM roles. 
> The hive.s3.use-instance-credentials configuration parameter I got from [1] 
> (first bullet) since I am using the flink-s3-fs-presto plugin - which says:
> 
> ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is 
> based on code from the Presto project <https://prestodb.io/>. You can 
> configure it using the same configuration keys as the Presto file system 
> <https://prestodb.io/docs/0.187/connector/hive.html#amazon-s3-configuration>, 
> by adding the configurations to your `flink-conf.yaml`. The Presto S3 
> implementation is the recommended file system for checkpointing to S3....
> 
> Its possible I am misunderstanding it?
> 
> Best,
> Jonas
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> 
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <matth...@ververica.com>:
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's 
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials 
>> being a valid configuration parameter. 
>> 
>> Best,
>> Matthias
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> 
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob <jonas.e...@gmail.com> wrote:
>>> Hey,
>>> 
>>> I am setting up HA on a standalone Kubernetes Flink application job 
>>> cluster. 
>>> Flink (1.12.5) is used and I am using S3 as the storage backend 
>>> 
>>> * The JobManager shortly fails after starts with the following errors 
>>> (apologies in advance for the length), and I can't understand what's going 
>>> on.
>>> * First I thought it may be due to missing Delete privileges of the IAM 
>>> role and updated that, but the problem persists. 
>>> * The S3 bucket configured s3://<company>/recovery is empty.
>>> 
>>> configmap.yaml
>>> flink-conf.yaml: |+
>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.numberOfTaskSlots: 2
>>> taskmanager.rpc.port: 6122
>>> taskmanager.memory.process.size: 1728m
>>> blob.server.port: 6124
>>> queryable-state.proxy.ports: 6125
>>> parallelism.default: 2
>>> scheduler-mode: reactive
>>> execution.checkpointing.interval: 10s
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>> high-availability: 
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.cluster-id: {{ $fullName }}
>>> high-availability.storageDir: s3://<company>-flink-{{ .Values.environment 
>>> }}/recovery
>>> hive.s3.use-instance-credentials: true
>>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for 
>>> running the jobmanager and taskmanager pods
>>> 
>>> role.yaml
>>> kind: Role
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> metadata:
>>> name: {{ $fullName }}
>>> namespace: {{ $fullName }}
>>> labels:
>>> app: {{ $appName }}
>>> chart: {{ template "thoros.chart" . }}
>>> release: {{ .Release.Name }}
>>> heritage: {{ .Release.Service }}
>>> 
>>> rules:
>>> - apiGroups: [""]
>>> resources: ["configmaps"]
>>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>> 
>>> aws IAM policy
>>> {
>>>     "Version": "2012-10-17",
>>>     "Statement": [
>>>         {
>>>             "Action": [
>>>                 "s3:ListBucket",
>>>                 "s3:Get*",
>>>                 "s3:Put*",
>>>                 "s3:Delete*"
>>>             ],
>>>             "Resource": [
>>>                 "arn:aws:s3:::<company>-flink-dev/*"
>>>             ],
>>>             "Effect": "Allow"
>>>         }
>>>     ]
>>> }
>>> 
>>> *Error-log:*
>>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner     
>>>                [] - Executing pipeline using FlinkRunner.
>>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner     
>>>                [] - For maximum performance you should set the 'fasterCopy' 
>>> option. See more at https://issues.apache.org/jira/browse/BEAM-11146
>>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner     
>>>                [] - Translating pipeline to Flink program.
>>> 2021-08-26 13:08:43,456 INFO  
>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found 
>>> unbounded PCollection. Switching to streaming execution.
>>> 2021-08-26 13:08:43,461 INFO  
>>> org.apache.beam.runners.flink.FlinkExecutionEnvironments     [] - Creating 
>>> a Streaming Environment.
>>> 2021-08-26 13:08:43,462 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>>> 2021-08-26 13:08:43,462 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: jobmanager.rpc.port, 6123
>>> 2021-08-26 13:08:43,462 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: jobmanager.memory.process.size, 1600m
>>> 2021-08-26 13:08:43,463 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: taskmanager.numberOfTaskSlots, 2
>>> 2021-08-26 13:08:43,463 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: taskmanager.rpc.port, 6122
>>> 2021-08-26 13:08:43,463 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: taskmanager.memory.process.size, 1728m
>>> 2021-08-26 13:08:43,463 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: blob.server.port, 6124
>>> 2021-08-26 13:08:43,464 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: queryable-state.proxy.ports, 6125
>>> 2021-08-26 13:08:43,464 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: parallelism.default, 2
>>> 2021-08-26 13:08:43,465 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: scheduler-mode, reactive
>>> 2021-08-26 13:08:43,465 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: execution.checkpointing.interval, 10s
>>> 2021-08-26 13:08:43,466 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: restart-strategy, fixed-delay
>>> 2021-08-26 13:08:43,466 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: restart-strategy.fixed-delay.attempts, 10
>>> 2021-08-26 13:08:43,466 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: high-availability, 
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> 2021-08-26 13:08:43,467 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: kubernetes.cluster-id, thoros
>>> 2021-08-26 13:08:43,467 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: high-availability.storageDir, 
>>> s3://<company>-flink-dev/recovery
>>> 2021-08-26 13:08:43,468 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: hive.s3.use-instance-credentials, true
>>> 2021-08-26 13:08:43,468 INFO  
>>> org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
>>> configuration property: kubernetes.namespace, thoros
>>> 2021-08-26 13:08:45,232 INFO  org.apache.beam.runners.flink.FlinkRunner     
>>>                [] - Starting execution of Flink program.
>>> 2021-08-26 13:08:45,444 INFO  
>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor 
>>> [] - Job 00000000000000000000000000000000 is submitted.
>>> 2021-08-26 13:08:45,454 INFO  
>>> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor 
>>> [] - Submitting Job with JobId=00000000000000000000000000000000.
>>> 2021-08-26 13:08:45,486 INFO  
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received 
>>> JobGraph submission 00000000000000000000000000000000 
>>> (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:45,498 INFO  
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - 
>>> Submitting job 00000000000000000000000000000000 
>>> (main0-flink-0826130845-6f3e805f).
>>> 2021-08-26 13:08:46,152 INFO  
>>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Removed 
>>> job graph 00000000000000000000000000000000 from 
>>> KubernetesStateHandleStore{configMapName='thoros-dispatcher-leader'}.
>>> 2021-08-26 13:08:46,169 INFO  
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - 
>>> Clean up the high availability data for job 
>>> 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,213 INFO  
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - 
>>> Finished cleaning up the high availability data for job 
>>> 00000000000000000000000000000000.
>>> 2021-08-26 13:08:46,231 WARN  
>>> org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Failed to 
>>> delete blob at 
>>> s3://<company>-flink-dev/recovery/default/blob/job_00000000000000000000000000000000
>>> 2021-08-26 13:08:46,239 ERROR 
>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Failed to 
>>> submit job 00000000000000000000000000000000.
>>> java.lang.RuntimeException: java.lang.Exception: Could not open output 
>>> stream for state backend
>>> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:95)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>>>  ~[?:1.8.0_302]
>>> at 
>>> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
>>>  ~[?:1.8.0_302]
>>> at 
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>>>  ~[?:1.8.0_302]
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive(Actor.scala:517) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>>> ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> Caused by: java.lang.Exception: Could not open output stream for state 
>>> backend
>>> at 
>>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:72)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> Caused by: 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
>>>  com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: 
>>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 
>>> V0BWCA4RDVE0EVK8; S3 Extended Request ID: 
>>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=;
>>>  Proxy: null), S3 Extended Request ID: 
>>> yVIcc5k0SoQcKcQ+7+CMHw9vZPmwgbTJmto05Eaixu5RzKRmKPfNJ8M254UN5qrqXoiyycx897o=
>>>  (Path: 
>>> s3://<company>-flink-dev/recovery/default/submittedJobGraphe95ce29174c6)
>>> at 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
>>>  ~[?:?]
>>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
>>> at 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>>>  ~[?:?]
>>> at 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>>>  ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) ~[?:?]
>>> at 
>>> com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:356)
>>>  ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) ~[?:?]
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) ~[?:?]
>>> at 
>>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
>>>  ~[?:?]
>>> at 
>>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
>>>  ~[?:?]
>>> at 
>>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper.store(FileSystemStateStorageHelper.java:64)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:131)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.putJobGraph(DefaultJobGraphStore.java:212)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:392)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$waitForTerminatingJob$29(Dispatcher.java:971)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> at 
>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedConsumer$3(FunctionUtils.java:93)
>>>  ~[flink-dist_2.12-1.12.5.jar:1.12.5]
>>> ... 27 more
>>> -- 
>>> *Med Vänliga Hälsningar*
>>> *Jonas Eyob*
> 
> 
> -- 
> *Med Vänliga Hälsningar*
> *Jonas Eyob*

Reply via email to