xintongsong commented on a change in pull request #18489:
URL: https://github.com/apache/flink/pull/18489#discussion_r791304149



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials
+
+        // store any credentials we are to use, here
+        Optional<String> credentialsPath = Optional.empty();
+
+        // only look for credentials if service account support is enabled
+        Configuration hadoopConfig = getHadoopFileSystem().getConf();
+        boolean enableServiceAccount =
+                
hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true);
+        if (enableServiceAccount) {
+
+            // load google application credentials, and then fall back to
+            // "google.cloud.auth.service.account.json.keyfile" from Hadoop
+            credentialsPath = 
Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
+            if (credentialsPath.isPresent()) {
+                LOGGER.info(
+                        "Recoverable writer is using 
GOOGLE_APPLICATION_CREDENTIALS at {}",
+                        credentialsPath.get());
+            } else {
+                credentialsPath =
+                        Optional.ofNullable(
+                                
hadoopConfig.get("google.cloud.auth.service.account.json.keyfile"));
+                credentialsPath.ifPresent(
+                        path ->
+                                LOGGER.info(
+                                        "Recoverable writer is using 
credentials from Hadoop at {}",
+                                        path));
+            }
+        }
 
-        // create the Google storage service instance
-        Storage storage = StorageOptions.getDefaultInstance().getService();
+        // construct the storage instance, using credentials if provided
+        Storage storage;
+        if (credentialsPath.isPresent()) {
+            LOGGER.info(
+                    "Creating GSRecoverableWriter using credentials from {}",
+                    credentialsPath.get());
+            try (FileInputStream credentialsStream = new 
FileInputStream(credentialsPath.get())) {
+                GoogleCredentials credentials = 
GoogleCredentials.fromStream(credentialsStream);
+                storage =
+                        StorageOptions.newBuilder()
+                                .setCredentials(credentials)
+                                .build()
+                                .getService();
+            }
+        } else {
+            LOGGER.info("Creating GSRecoverableWriter using no credentials");
+            storage = StorageOptions.newBuilder().build().getService();
+        }

Review comment:
       I'd suggest to minimize things we do in the `if-else` branches as follow:
   ```
           // construct the storage instance, using credentials if provided
           StorageOptions.Builder storageOptionBuilder = 
StorageOptions.newBuilder();
           if (credentialsPath.isPresent()) {
               LOGGER.info(
                       "Creating GSRecoverableWriter using credentials from {}",
                       credentialsPath.get());
               try (FileInputStream credentialsStream = new 
FileInputStream(credentialsPath.get())) {
                   GoogleCredentials credentials = 
GoogleCredentials.fromStream(credentialsStream);
                   storageOptionBuilder.setCredentials(credentials);
               }
           } else {
               LOGGER.info("Creating GSRecoverableWriter using no credentials");
           }
   
           // create the GS blob storage wrapper
           GSBlobStorageImpl blobStorage =
                   new 
GSBlobStorageImpl(storageOptionBuilder.build().getService());
   ```

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       Just trying to understand, what happens if a user configs none of this 
options? I.e, the service account is enabled by default, but no credential is 
provided.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to