[ 
https://issues.apache.org/jira/browse/FLINK-38509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029455#comment-18029455
 ] 

Royston Tauro edited comment on FLINK-38509 at 10/13/25 10:41 AM:
------------------------------------------------------------------

[~gaborgsomogyi] I tried this but according to the job manager logs, hadoop 
config is being read during the creation of the cluster
i.e 
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L83]

and during the execution of the job , this is being called which already has 
both hadoop config and storage intialized
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L146]

code i tried
```
 Configuration config = new Configuration();
        
config.setString("flink.hadoop.google.cloud.auth.service.account.enable", 
"true");
        
config.setString("flink.hadoop.google.cloud.auth.service.account.json.keyfile", 
"/Users/abc/.config/gcloud/application_default_credentials.json");
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///opt/flink/checkpoints");
        config.set(CHECKPOINTING_CONSISTENCY_MODE, 
CheckpointingMode.EXACTLY_ONCE);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
```

 

Job Manager logs during cluster Init:
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}901{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
 [] - {color}{color:#0000ff}Using{color}{color:#000000} 
{color}{color:#008080}Hadoop{color}{color:#000000} configuration 
{{color}{color:#a31515}"properties"{color}{color:#000000}:[]}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}903{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
 [] - {color}{color:#0000ff}Using{color}{color:#000000} file 
{color}{color:#0000ff}system{color}{color:#000000} options 
{color}{color:#008080}GSFileSystemOptions{color}{color:#000000}{writerTemporaryBucketName={color}{color:#008080}Optional{color}{color:#000000}.empty,
 
writerChunkSize={color}{color:#008080}Optional{color}{color:#000000}.empty}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}916{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.utils.{color}{color:#008080}ConfigUtils{color}{color:#000000}
 [] - {color}{color:#008080}Creating{color}{color:#000000} 
{color}{color:#008080}GSRecoverableWriter{color}{color:#000000} 
{color}{color:#0000ff}using{color}{color:#000000} no credentials{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}057{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.runtime.entrypoint.{color}{color:#008080}ClusterEntrypoint{color}{color:#000000}
 [] - {color}{color:#008080}Install{color}{color:#000000} security 
context.{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}061{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.runtime.security.modules.{color}{color:#008080}HadoopModuleFactory{color}{color:#000000}
 [] - {color}{color:#008080}Cannot{color}{color:#000000} create 
{color}{color:#008080}Hadoop{color}{color:#000000} 
{color}{color:#008080}Security{color}{color:#000000} 
{color}{color:#008080}Module{color}{color:#000000} because 
{color}{color:#008080}Hadoop{color}{color:#000000} cannot be found 
{color}{color:#0000ff}in{color}{color:#000000} the 
{color}{color:#008080}Classpath{color}{color:#000000}.{color}
 
 


was (Author: JIRAUSER311187):
I tried this but according to the job manager logs, hadoop config is being read 
during the creation of the cluster
i.e 
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L83]

and during the execution of the job , this is being called which already has 
both hadoop config and storage intialized
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java#L146]

code i tried
```
 Configuration config = new Configuration();
        
config.setString("flink.hadoop.google.cloud.auth.service.account.enable", 
"true");
        
config.setString("flink.hadoop.google.cloud.auth.service.account.json.keyfile", 
"/Users/abc/.config/gcloud/application_default_credentials.json");
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
"file:///opt/flink/checkpoints");
        config.set(CHECKPOINTING_CONSISTENCY_MODE, 
CheckpointingMode.EXACTLY_ONCE);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
```

 

Job Manager logs during cluster Init:
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}901{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
 [] - {color}{color:#0000ff}Using{color}{color:#000000} 
{color}{color:#008080}Hadoop{color}{color:#000000} configuration 
{{color}{color:#a31515}"properties"{color}{color:#000000}:[]}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}903{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.{color}{color:#008080}GSFileSystemFactory{color}{color:#000000}
 [] - {color}{color:#0000ff}Using{color}{color:#000000} file 
{color}{color:#0000ff}system{color}{color:#000000} options 
{color}{color:#008080}GSFileSystemOptions{color}{color:#000000}{writerTemporaryBucketName={color}{color:#008080}Optional{color}{color:#000000}.empty,
 
writerChunkSize={color}{color:#008080}Optional{color}{color:#000000}.empty}{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}58{color}{color:#000000},{color}{color:#098658}916{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.fs.gs.utils.{color}{color:#008080}ConfigUtils{color}{color:#000000}
 [] - {color}{color:#008080}Creating{color}{color:#000000} 
{color}{color:#008080}GSRecoverableWriter{color}{color:#000000} 
{color}{color:#0000ff}using{color}{color:#000000} no credentials{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}057{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.runtime.entrypoint.{color}{color:#008080}ClusterEntrypoint{color}{color:#000000}
 [] - {color}{color:#008080}Install{color}{color:#000000} security 
context.{color}
{color:#098658}2025{color}{color:#000000}-{color}{color:#098658}10{color}{color:#000000}-{color}{color:#098658}13{color}{color:#000000}
 
{color}{color:#098658}15{color}{color:#000000}:{color}{color:#098658}45{color}{color:#000000}:{color}{color:#098658}59{color}{color:#000000},{color}{color:#098658}061{color}{color:#000000}
 {color}{color:#008080}INFO{color}{color:#000000} 
org.apache.flink.runtime.security.modules.{color}{color:#008080}HadoopModuleFactory{color}{color:#000000}
 [] - {color}{color:#008080}Cannot{color}{color:#000000} create 
{color}{color:#008080}Hadoop{color}{color:#000000} 
{color}{color:#008080}Security{color}{color:#000000} 
{color}{color:#008080}Module{color}{color:#000000} because 
{color}{color:#008080}Hadoop{color}{color:#000000} cannot be found 
{color}{color:#0000ff}in{color}{color:#000000} the 
{color}{color:#008080}Classpath{color}{color:#000000}.{color}
 
 

> Dynamic Credentials in Flink for Google Cloud Storage
> -----------------------------------------------------
>
>                 Key: FLINK-38509
>                 URL: https://issues.apache.org/jira/browse/FLINK-38509
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 2.1.0
>            Reporter: Royston Tauro
>            Priority: Major
>
> Currently in session cluster mode, the only way to provide credentials for 
> Google Cloud Storage is using the env or the core-site.xml in hadoop config 
> which is read during the creation of the cluster only.
> Are there plans to make it dynamic to Jobs i.e each job can have have its own 
> credentials via flinkConfig something similar to how spark allows it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to