[
https://issues.apache.org/jira/browse/FLINK-38509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029455#comment-18029455
]
Royston Tauro edited comment on FLINK-38509 at 10/13/25 10:41 AM:
------------------------------------------------------------------
[~gaborgsomogyi] I tried this but according to the job manager logs, hadoop
config is being read during the creation of the cluster
i.e
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L83]
and during the execution of the job , this is being called which already has
both hadoop config and storage intialized
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L146]
code i tried
```
Configuration config = new Configuration();
config.setString("flink.hadoop.google.cloud.auth.service.account.enable",
"true");
config.setString("flink.hadoop.google.cloud.auth.service.account.json.keyfile",
"/Users/abc/.config/gcloud/application_default_credentials.json");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///opt/flink/checkpoints");
config.set(CHECKPOINTING_CONSISTENCY_MODE,
CheckpointingMode.EXACTLY_ONCE);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
```
Job Manager logs during cluster Init:
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}901{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
[] - {color}{color:#0000ff}Using{color}{color:#000000}
{color}{color:#008080}Hadoop{color}{color:#000000} configuration
{{color}{color:#a31515}"properties"{color}{color:#000000}:[]}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}903{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
[] - {color}{color:#0000ff}Using{color}{color:#000000} file
{color}{color:#0000ff}system{color}{color:#000000} options
{color}{color:#008080}GSFileSystemOptions{color}{color:#000000}{writerTemporaryBucketName={color}{color:#008080}Optional{color}{color:#000000}.empty,
writerChunkSize={color}{color:#008080}Optional{color}{color:#000000}.empty}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}916{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.utils.{color}{color:#008080}ConfigUtils{color}{color:#000000}
[] - {color}{color:#008080}Creating{color}{color:#000000}
{color}{color:#008080}GSRecoverableWriter{color}{color:#000000}
{color}{color:#0000ff}using{color}{color:#000000} no credentials{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}057{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.runtime.entrypoint.{color}{color:#008080}ClusterEntrypoint{color}{color:#000000}
[] - {color}{color:#008080}Install{color}{color:#000000} security
context.{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}061{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.runtime.security.modules.{color}{color:#008080}HadoopModuleFactory{color}{color:#000000}
[] - {color}{color:#008080}Cannot{color}{color:#000000} create
{color}{color:#008080}Hadoop{color}{color:#000000}
{color}{color:#008080}Security{color}{color:#000000}
{color}{color:#008080}Module{color}{color:#000000} because
{color}{color:#008080}Hadoop{color}{color:#000000} cannot be found
{color}{color:#0000ff}in{color}{color:#000000} the
{color}{color:#008080}Classpath{color}{color:#000000}.{color}
was (Author: JIRAUSER311187):
I tried this but according to the job manager logs, hadoop config is being read
during the creation of the cluster
i.e
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L83]
and during the execution of the job , this is being called which already has
both hadoop config and storage intialized
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L146]
code i tried
```
Configuration config = new Configuration();
config.setString("flink.hadoop.google.cloud.auth.service.account.enable",
"true");
config.setString("flink.hadoop.google.cloud.auth.service.account.json.keyfile",
"/Users/abc/.config/gcloud/application_default_credentials.json");
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
"file:///opt/flink/checkpoints");
config.set(CHECKPOINTING_CONSISTENCY_MODE,
CheckpointingMode.EXACTLY_ONCE);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
```
Job Manager logs during cluster Init:
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}901{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
[] - {color}{color:#0000ff}Using{color}{color:#000000}
{color}{color:#008080}Hadoop{color}{color:#000000} configuration
{{color}{color:#a31515}"properties"{color}{color:#000000}:[]}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}903{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
[] - {color}{color:#0000ff}Using{color}{color:#000000} file
{color}{color:#0000ff}system{color}{color:#000000} options
{color}{color:#008080}GSFileSystemOptions{color}{color:#000000}{writerTemporaryBucketName={color}{color:#008080}Optional{color}{color:#000000}.empty,
writerChunkSize={color}{color:#008080}Optional{color}{color:#000000}.empty}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}916{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.fs.gs.utils.{color}{color:#008080}ConfigUtils{color}{color:#000000}
[] - {color}{color:#008080}Creating{color}{color:#000000}
{color}{color:#008080}GSRecoverableWriter{color}{color:#000000}
{color}{color:#0000ff}using{color}{color:#000000} no credentials{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}057{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.runtime.entrypoint.{color}{color:#008080}ClusterEntrypoint{color}{color:#000000}
[] - {color}{color:#008080}Install{color}{color:#000000} security
context.{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}061{color}{color:#000000}
{color}{color:#008080}INFO{color}{color:#000000}
org.apache.flink.runtime.security.modules.{color}{color:#008080}HadoopModuleFactory{color}{color:#000000}
[] - {color}{color:#008080}Cannot{color}{color:#000000} create
{color}{color:#008080}Hadoop{color}{color:#000000}
{color}{color:#008080}Security{color}{color:#000000}
{color}{color:#008080}Module{color}{color:#000000} because
{color}{color:#008080}Hadoop{color}{color:#000000} cannot be found
{color}{color:#0000ff}in{color}{color:#000000} the
{color}{color:#008080}Classpath{color}{color:#000000}.{color}
> Dynamic Credentials in Flink for Google Cloud Storage
> -----------------------------------------------------
>
> Key: FLINK-38509
> URL: https://issues.apache.org/jira/browse/FLINK-38509
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 2.1.0
> Reporter: Royston Tauro
> Priority: Major
>
> Currently in session cluster mode, the only way to provide credentials for
> Google Cloud Storage is using the env or the core-site.xml in hadoop config
> which is read during the creation of the cluster only.
> Are there plans to make it dynamic to Jobs i.e each job can have have its own
> credentials via flinkConfig something similar to how spark allows it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)