Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread jonas eyob
@Svend - that seems to have done the trick, adding the bucket itself as a
resource got flink to write to the configured s3 bucket.

@Gil - we manage our kubernetes cluster on aws with kops. But we do assign
the iam roles through the deployment annotations. Seems presto is able to
use the s3:// schema in our case

Thanks both!

Den tors 26 aug. 2021 kl 17:59 skrev Gil De Grove :

> Hi Jonas,
>
>
>
> Just wondering, are you trying to deploy via iam service account
> annotations in a AWS eks cluster?
>
> We noticed that when using presto, the iam service account was using en
> ec2 metadata API inside AWS. However, when using eks service account, the
> API used is the webtoken auth.
>
> Not sure if the solution we find is the appropriate one, but switching to
> s3a instead of presto, and forcing the aws defaultProviderChain did the
> trick.
>
> Maybe you could try that.
>
> Regards,
> Gil
>
> On Thu, Aug 26, 2021, 18:45 Svend  wrote:
>
>> Hi Jonas,
>>
>> Just a thought, could you try this policy? If I recall correctly, I think
>> you need ListBucket on the bucket itself, whereas the other can have a path
>> prefix like the "/*" you added
>>
>> "
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::-flink-dev",
>> "arn:aws:s3:::-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>> "
>>
>> Svend
>>
>>
>> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>>
>> Hey Matthias,
>>
>> Yes, I have followed the documentation on the link you provided - and
>> decided to go for the recommended approach of using IAM roles.
>> The hive.s3.use-instance-credentials configuration parameter I got from
>> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
>> says:
>>
>> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
>> is based on code from the Presto project . You can
>> configure it using the same configuration keys as the Presto file system
>> ,
>> by adding the configurations to your flink-conf.yaml. The Presto S3
>> implementation is the recommended file system for checkpointing to S3
>>
>> Its possible I am misunderstanding it?
>>
>> Best,
>> Jonas
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>>
>> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl <
>> matth...@ververica.com>:
>>
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
>> being a valid configuration parameter.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>>
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3:///recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", 

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread Gil De Grove
Hi Jonas,



Just wondering, are you trying to deploy via iam service account
annotations in a AWS eks cluster?

We noticed that when using presto, the iam service account was using en ec2
metadata API inside AWS. However, when using eks service account, the API
used is the webtoken auth.

Not sure if the solution we find is the appropriate one, but switching to
s3a instead of presto, and forcing the aws defaultProviderChain did the
trick.

Maybe you could try that.

Regards,
Gil

On Thu, Aug 26, 2021, 18:45 Svend  wrote:

> Hi Jonas,
>
> Just a thought, could you try this policy? If I recall correctly, I think
> you need ListBucket on the bucket itself, whereas the other can have a path
> prefix like the "/*" you added
>
> "
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::-flink-dev",
> "arn:aws:s3:::-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
> "
>
> Svend
>
>
> On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
>
> Hey Matthias,
>
> Yes, I have followed the documentation on the link you provided - and
> decided to go for the recommended approach of using IAM roles.
> The hive.s3.use-instance-credentials configuration parameter I got from
> [1] (first bullet) since I am using the flink-s3-fs-presto plugin - which
> says:
>
> ..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*,
> is based on code from the Presto project . You can
> configure it using the same configuration keys as the Presto file system
> ,
> by adding the configurations to your flink-conf.yaml. The Presto S3
> implementation is the recommended file system for checkpointing to S3
>
> Its possible I am misunderstanding it?
>
> Best,
> Jonas
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
>
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl  >:
>
> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>
> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3:///recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>  [] - Executing pipeline using FlinkRunner.
> 

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread Svend
Hi Jonas,

Just a thought, could you try this policy? If I recall correctly, I think you 
need ListBucket on the bucket itself, whereas the other can have a path prefix 
like the "/*" you added

"
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:Get*",
"s3:Put*",
"s3:Delete*"
],
"Resource": [
"arn:aws:s3:::-flink-dev",
"arn:aws:s3:::-flink-dev/*"
],
"Effect": "Allow"
}
]
}
"

Svend


On Thu, 26 Aug 2021, at 6:19 PM, jonas eyob wrote:
> Hey Matthias,
> 
> Yes, I have followed the documentation on the link you provided - and decided 
> to go for the recommended approach of using IAM roles. 
> The hive.s3.use-instance-credentials configuration parameter I got from [1] 
> (first bullet) since I am using the flink-s3-fs-presto plugin - which says:
> 
> ..f`link-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is 
> based on code from the Presto project . You can 
> configure it using the same configuration keys as the Presto file system 
> , 
> by adding the configurations to your `flink-conf.yaml`. The Presto S3 
> implementation is the recommended file system for checkpointing to S3
> 
> Its possible I am misunderstanding it?
> 
> Best,
> Jonas
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
> 
> Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl :
>> Hi Jonas,
>> have you included the s3 credentials in the Flink config file like it's 
>> described in [1]? I'm not sure about this hive.s3.use-instance-credentials 
>> being a valid configuration parameter. 
>> 
>> Best,
>> Matthias
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>> 
>> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>>> Hey,
>>> 
>>> I am setting up HA on a standalone Kubernetes Flink application job 
>>> cluster. 
>>> Flink (1.12.5) is used and I am using S3 as the storage backend 
>>> 
>>> * The JobManager shortly fails after starts with the following errors 
>>> (apologies in advance for the length), and I can't understand what's going 
>>> on.
>>> * First I thought it may be due to missing Delete privileges of the IAM 
>>> role and updated that, but the problem persists. 
>>> * The S3 bucket configured s3:///recovery is empty.
>>> 
>>> configmap.yaml
>>> flink-conf.yaml: |+
>>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>>> jobmanager.rpc.port: 6123
>>> jobmanager.memory.process.size: 1600m
>>> taskmanager.numberOfTaskSlots: 2
>>> taskmanager.rpc.port: 6122
>>> taskmanager.memory.process.size: 1728m
>>> blob.server.port: 6124
>>> queryable-state.proxy.ports: 6125
>>> parallelism.default: 2
>>> scheduler-mode: reactive
>>> execution.checkpointing.interval: 10s
>>> restart-strategy: fixed-delay
>>> restart-strategy.fixed-delay.attempts: 10
>>> high-availability: 
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.cluster-id: {{ $fullName }}
>>> high-availability.storageDir: s3://-flink-{{ .Values.environment 
>>> }}/recovery
>>> hive.s3.use-instance-credentials: true
>>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used for 
>>> running the jobmanager and taskmanager pods
>>> 
>>> role.yaml
>>> kind: Role
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> metadata:
>>> name: {{ $fullName }}
>>> namespace: {{ $fullName }}
>>> labels:
>>> app: {{ $appName }}
>>> chart: {{ template "thoros.chart" . }}
>>> release: {{ .Release.Name }}
>>> heritage: {{ .Release.Service }}
>>> 
>>> rules:
>>> - apiGroups: [""]
>>> resources: ["configmaps"]
>>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>> 
>>> aws IAM policy
>>> {
>>> "Version": "2012-10-17",
>>> "Statement": [
>>> {
>>> "Action": [
>>> "s3:ListBucket",
>>> "s3:Get*",
>>> "s3:Put*",
>>> "s3:Delete*"
>>> ],
>>> "Resource": [
>>> "arn:aws:s3:::-flink-dev/*"
>>> ],
>>> "Effect": "Allow"
>>> }
>>> ]
>>> }
>>> 
>>> *Error-log:*
>>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner 
>>>[] - Executing pipeline using FlinkRunner.
>>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner 
>>>[] - For maximum performance you should set the 'fasterCopy' 
>>> option. See more at https://issues.apache.org/jira/browse/BEAM-11146
>>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner 
>>>[] - Translating pipeline to Flink program.
>>> 

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread jonas eyob
Hey Matthias,

Yes, I have followed the documentation on the link you provided - and
decided to go for the recommended approach of using IAM roles.
The hive.s3.use-instance-credentials configuration parameter I got from [1]
(first bullet) since I am using the flink-s3-fs-presto plugin - which says:

..flink-s3-fs-presto, registered under the scheme *s3://* and *s3p://*, is
based on code from the Presto project . You can
configure it using the same configuration keys as the Presto file system
,
by adding the configurations to your flink-conf.yaml. The Presto S3
implementation is the recommended file system for checkpointing to S3

Its possible I am misunderstanding it?

Best,
Jonas

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

Den tors 26 aug. 2021 kl 16:32 skrev Matthias Pohl :

> Hi Jonas,
> have you included the s3 credentials in the Flink config file like it's
> described in [1]? I'm not sure about this hive.s3.use-instance-credentials
> being a valid configuration parameter.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials
>
> On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:
>
>> Hey,
>>
>> I am setting up HA on a standalone Kubernetes Flink application job
>> cluster.
>> Flink (1.12.5) is used and I am using S3 as the storage backend
>>
>> * The JobManager shortly fails after starts with the following errors
>> (apologies in advance for the length), and I can't understand what's going
>> on.
>> * First I thought it may be due to missing Delete privileges of the IAM
>> role and updated that, but the problem persists.
>> * The S3 bucket configured s3:///recovery is empty.
>>
>> configmap.yaml
>> flink-conf.yaml: |+
>> jobmanager.rpc.address: {{ $fullName }}-jobmanager
>> jobmanager.rpc.port: 6123
>> jobmanager.memory.process.size: 1600m
>> taskmanager.numberOfTaskSlots: 2
>> taskmanager.rpc.port: 6122
>> taskmanager.memory.process.size: 1728m
>> blob.server.port: 6124
>> queryable-state.proxy.ports: 6125
>> parallelism.default: 2
>> scheduler-mode: reactive
>> execution.checkpointing.interval: 10s
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>> high-availability:
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>> kubernetes.cluster-id: {{ $fullName }}
>> high-availability.storageDir: s3://-flink-{{ .Values.environment
>> }}/recovery
>> hive.s3.use-instance-credentials: true
>> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
>> for running the jobmanager and taskmanager pods
>>
>> role.yaml
>> kind: Role
>> apiVersion: rbac.authorization.k8s.io/v1
>> metadata:
>> name: {{ $fullName }}
>> namespace: {{ $fullName }}
>> labels:
>> app: {{ $appName }}
>> chart: {{ template "thoros.chart" . }}
>> release: {{ .Release.Name }}
>> heritage: {{ .Release.Service }}
>>
>> rules:
>> - apiGroups: [""]
>> resources: ["configmaps"]
>> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>>
>> aws IAM policy
>> {
>> "Version": "2012-10-17",
>> "Statement": [
>> {
>> "Action": [
>> "s3:ListBucket",
>> "s3:Get*",
>> "s3:Put*",
>> "s3:Delete*"
>> ],
>> "Resource": [
>> "arn:aws:s3:::-flink-dev/*"
>> ],
>> "Effect": "Allow"
>> }
>> ]
>> }
>>
>> *Error-log:*
>> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>>  [] - Executing pipeline using FlinkRunner.
>> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>>  [] - For maximum performance you should set the
>> 'fasterCopy' option. See more at
>> https://issues.apache.org/jira/browse/BEAM-11146
>> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>>  [] - Translating pipeline to Flink program.
>> 2021-08-26 13:08:43,456 INFO
>>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
>> unbounded PCollection. Switching to streaming execution.
>> 2021-08-26 13:08:43,461 INFO
>>  org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
>> a Streaming Environment.
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: jobmanager.rpc.address, thoros-jobmanager
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2021-08-26 13:08:43,462 INFO
>>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: jobmanager.memory.process.size, 1600m
>> 

Re: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden)

2021-08-26 Thread Matthias Pohl
Hi Jonas,
have you included the s3 credentials in the Flink config file like it's
described in [1]? I'm not sure about this hive.s3.use-instance-credentials
being a valid configuration parameter.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials

On Thu, Aug 26, 2021 at 3:43 PM jonas eyob  wrote:

> Hey,
>
> I am setting up HA on a standalone Kubernetes Flink application job
> cluster.
> Flink (1.12.5) is used and I am using S3 as the storage backend
>
> * The JobManager shortly fails after starts with the following errors
> (apologies in advance for the length), and I can't understand what's going
> on.
> * First I thought it may be due to missing Delete privileges of the IAM
> role and updated that, but the problem persists.
> * The S3 bucket configured s3:///recovery is empty.
>
> configmap.yaml
> flink-conf.yaml: |+
> jobmanager.rpc.address: {{ $fullName }}-jobmanager
> jobmanager.rpc.port: 6123
> jobmanager.memory.process.size: 1600m
> taskmanager.numberOfTaskSlots: 2
> taskmanager.rpc.port: 6122
> taskmanager.memory.process.size: 1728m
> blob.server.port: 6124
> queryable-state.proxy.ports: 6125
> parallelism.default: 2
> scheduler-mode: reactive
> execution.checkpointing.interval: 10s
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 10
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.cluster-id: {{ $fullName }}
> high-availability.storageDir: s3://-flink-{{ .Values.environment
> }}/recovery
> hive.s3.use-instance-credentials: true
> kubernetes.namespace: {{ $fullName }} # The namespace that will be used
> for running the jobmanager and taskmanager pods
>
> role.yaml
> kind: Role
> apiVersion: rbac.authorization.k8s.io/v1
> metadata:
> name: {{ $fullName }}
> namespace: {{ $fullName }}
> labels:
> app: {{ $appName }}
> chart: {{ template "thoros.chart" . }}
> release: {{ .Release.Name }}
> heritage: {{ .Release.Service }}
>
> rules:
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["create", "edit", "delete", "watch", "get", "list", "update"]
>
> aws IAM policy
> {
> "Version": "2012-10-17",
> "Statement": [
> {
> "Action": [
> "s3:ListBucket",
> "s3:Get*",
> "s3:Put*",
> "s3:Delete*"
> ],
> "Resource": [
> "arn:aws:s3:::-flink-dev/*"
> ],
> "Effect": "Allow"
> }
> ]
> }
>
> *Error-log:*
> 2021-08-26 13:08:43,439 INFO  org.apache.beam.runners.flink.FlinkRunner
>  [] - Executing pipeline using FlinkRunner.
> 2021-08-26 13:08:43,444 WARN  org.apache.beam.runners.flink.FlinkRunner
>  [] - For maximum performance you should set the
> 'fasterCopy' option. See more at
> https://issues.apache.org/jira/browse/BEAM-11146
> 2021-08-26 13:08:43,451 INFO  org.apache.beam.runners.flink.FlinkRunner
>  [] - Translating pipeline to Flink program.
> 2021-08-26 13:08:43,456 INFO
>  org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment [] - Found
> unbounded PCollection. Switching to streaming execution.
> 2021-08-26 13:08:43,461 INFO
>  org.apache.beam.runners.flink.FlinkExecutionEnvironments [] - Creating
> a Streaming Environment.
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, thoros-jobmanager
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-08-26 13:08:43,462 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 2
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.rpc.port, 6122
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-08-26 13:08:43,463 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: blob.server.port, 6124
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: queryable-state.proxy.ports, 6125
> 2021-08-26 13:08:43,464 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 2
> 2021-08-26 13:08:43,465 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration