galenwarren commented on a change in pull request #18489:
URL: https://github.com/apache/flink/pull/18489#discussion_r791317887



##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       As it stands now, if service account were enabled but no credentials 
were supplied via either `GOOGLE_APPLICATION_CREDENTIALS` or 
`google.cloud.auth.service.account.json.keyfile`, it would create a Storage 
instance with no credential. If you were writing to a publicly writable GCS 
bucket, this would work, but it would fail if the bucket required credentials.
   
   This is similar to what would happen (as far as I understand) wrt Hadoop 
config; even if service credentials are enabled (which they are by default), 
you still have to specify a credential of some kind.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       As it stands now, if service account were enabled but no credentials 
were supplied via either `GOOGLE_APPLICATION_CREDENTIALS` or 
`google.cloud.auth.service.account.json.keyfile`, it would create a Storage 
instance with no credential. If you were writing to a publicly writable GCS 
bucket, this would work, but it would fail if the bucket required credentials.
   
   This is similar to what would happen (as far as I understand) wrt Hadoop 
config; even if service credentials are enabled (which they are by default), 
you still have to specify a credential of some kind or else it won't use one.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       Also, it's worth noting that there are more authentication 
[options](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md)
 supported for Hadoop than just those two:
   
   * Private key and id
   * P12 certificate
   * Short-lived service-account impersonation
   
   But the only ones that have been documented as supported in Flink so far are 
the two that are directly mentioned in the new docs, 
`GOOGLE_APPLICATION_CREDENTIALS` and 
`google.cloud.auth.service.account.json.keyfile`.
   
   Do you think it's OK just to support those?

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials
+
+        // store any credentials we are to use, here
+        Optional<String> credentialsPath = Optional.empty();
+
+        // only look for credentials if service account support is enabled
+        Configuration hadoopConfig = getHadoopFileSystem().getConf();
+        boolean enableServiceAccount =
+                
hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true);
+        if (enableServiceAccount) {
+
+            // load google application credentials, and then fall back to
+            // "google.cloud.auth.service.account.json.keyfile" from Hadoop
+            credentialsPath = 
Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
+            if (credentialsPath.isPresent()) {
+                LOGGER.info(
+                        "Recoverable writer is using 
GOOGLE_APPLICATION_CREDENTIALS at {}",
+                        credentialsPath.get());
+            } else {
+                credentialsPath =
+                        Optional.ofNullable(
+                                
hadoopConfig.get("google.cloud.auth.service.account.json.keyfile"));
+                credentialsPath.ifPresent(
+                        path ->
+                                LOGGER.info(
+                                        "Recoverable writer is using 
credentials from Hadoop at {}",
+                                        path));
+            }
+        }
 
-        // create the Google storage service instance
-        Storage storage = StorageOptions.getDefaultInstance().getService();
+        // construct the storage instance, using credentials if provided
+        Storage storage;
+        if (credentialsPath.isPresent()) {
+            LOGGER.info(
+                    "Creating GSRecoverableWriter using credentials from {}",
+                    credentialsPath.get());
+            try (FileInputStream credentialsStream = new 
FileInputStream(credentialsPath.get())) {
+                GoogleCredentials credentials = 
GoogleCredentials.fromStream(credentialsStream);
+                storage =
+                        StorageOptions.newBuilder()
+                                .setCredentials(credentials)
+                                .build()
+                                .getService();
+            }
+        } else {
+            LOGGER.info("Creating GSRecoverableWriter using no credentials");
+            storage = StorageOptions.newBuilder().build().getService();
+        }

Review comment:
       Good suggestion, I'll make that change.

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       Please check me on this, but my thought was that if someone was using 
the `gcs-connector` before -- i.e. just as a Hadoop-backed FileSystem -- that 
they would have been able to supply arbitrary config options in the 
`core-site/default.xml` and expected them to be applied. So I was trying to 
preserve that behavior.
   
   But, yes, if we support arbitrary config options in `core-site/default.xml` 
*except* for certain authentication-related options, that does seem a bit 
counterintuitive.
   
   I suppose one option could be to continue to parse and pass through all the 
options but to document that the only authentication options that will yield 
the proper behavior for all FileSystem operations are the two documented, and 
not the others (P12, etc.).

##########
File path: 
flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java
##########
@@ -39,17 +45,67 @@
 
     GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, 
GSFileSystemOptions options) {
         super(Preconditions.checkNotNull(googleHadoopFileSystem));
-        LOGGER.info("Creating GSFileSystem with options {}", options);
-
         this.options = Preconditions.checkNotNull(options);
+        LOGGER.info("Creating GSFileSystem with options {}", options);
     }
 
     @Override
-    public RecoverableWriter createRecoverableWriter() {
-        LOGGER.info("Creating recoverable writer with options {}", options);
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+
+        // follow the same rules as for the Hadoop connector, i.e.
+        // 1) only use service credentials at all if Hadoop
+        // "google.cloud.auth.service.account.enable" is true (default: true)
+        // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, 
if supplied
+        // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as 
location of
+        // credentials, if supplied
+        // 4) use no credentials

Review comment:
       Yes, I agree.
   
   I'll add a note to the docs (on the other PR) to call this out, and I'll 
work on some unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to