galenwarren commented on a change in pull request #18489: URL: https://github.com/apache/flink/pull/18489#discussion_r791317887
########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials Review comment: As it stands now, if service account were enabled but no credentials were supplied via either `GOOGLE_APPLICATION_CREDENTIALS` or `google.cloud.auth.service.account.json.keyfile`, it would create a Storage instance with no credential. If you were writing to a publicly writable GCS bucket, this would work, but it would fail if the bucket required credentials. This is similar to what would happen (as far as I understand) wrt Hadoop config; even if service credentials are enabled (which they are by default), you still have to specify a credential of some kind. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials Review comment: As it stands now, if service account were enabled but no credentials were supplied via either `GOOGLE_APPLICATION_CREDENTIALS` or `google.cloud.auth.service.account.json.keyfile`, it would create a Storage instance with no credential. If you were writing to a publicly writable GCS bucket, this would work, but it would fail if the bucket required credentials. This is similar to what would happen (as far as I understand) wrt Hadoop config; even if service credentials are enabled (which they are by default), you still have to specify a credential of some kind or else it won't use one. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials Review comment: Also, it's worth noting that there are more authentication [options](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md) supported for Hadoop than just those two: * Private key and id * P12 certificate * Short-lived service-account impersonation But the only ones that have been documented as supported in Flink so far are the two that are directly mentioned in the new docs, `GOOGLE_APPLICATION_CREDENTIALS` and `google.cloud.auth.service.account.json.keyfile`. Do you think it's OK just to support those? ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials + + // store any credentials we are to use, here + Optional<String> credentialsPath = Optional.empty(); + + // only look for credentials if service account support is enabled + Configuration hadoopConfig = getHadoopFileSystem().getConf(); + boolean enableServiceAccount = + hadoopConfig.getBoolean("google.cloud.auth.service.account.enable", true); + if (enableServiceAccount) { + + // load google application credentials, and then fall back to + // "google.cloud.auth.service.account.json.keyfile" from Hadoop + credentialsPath = Optional.ofNullable(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); + if (credentialsPath.isPresent()) { + LOGGER.info( + "Recoverable writer is using GOOGLE_APPLICATION_CREDENTIALS at {}", + credentialsPath.get()); + } else { + credentialsPath = + Optional.ofNullable( + hadoopConfig.get("google.cloud.auth.service.account.json.keyfile")); + credentialsPath.ifPresent( + path -> + LOGGER.info( + "Recoverable writer is using credentials from Hadoop at {}", + path)); + } + } - // create the Google storage service instance - Storage storage = StorageOptions.getDefaultInstance().getService(); + // construct the storage instance, using credentials if provided + Storage storage; + if (credentialsPath.isPresent()) { + LOGGER.info( + "Creating GSRecoverableWriter using credentials from {}", + credentialsPath.get()); + try (FileInputStream credentialsStream = new FileInputStream(credentialsPath.get())) { + GoogleCredentials credentials = GoogleCredentials.fromStream(credentialsStream); + storage = + StorageOptions.newBuilder() + .setCredentials(credentials) + .build() + .getService(); + } + } else { + LOGGER.info("Creating GSRecoverableWriter using no credentials"); + storage = StorageOptions.newBuilder().build().getService(); + } Review comment: Good suggestion, I'll make that change. ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials Review comment: Please check me on this, but my thought was that if someone was using the `gcs-connector` before -- i.e. just as a Hadoop-backed FileSystem -- that they would have been able to supply arbitrary config options in the `core-site/default.xml` and expected them to be applied. So I was trying to preserve that behavior. But, yes, if we support arbitrary config options in `core-site/default.xml` *except* for certain authentication-related options, that does seem a bit counterintuitive. I suppose one option could be to continue to parse and pass through all the options but to document that the only authentication options that will yield the proper behavior for all FileSystem operations are the two documented, and not the others (P12, etc.). ########## File path: flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystem.java ########## @@ -39,17 +45,67 @@ GSFileSystem(GoogleHadoopFileSystem googleHadoopFileSystem, GSFileSystemOptions options) { super(Preconditions.checkNotNull(googleHadoopFileSystem)); - LOGGER.info("Creating GSFileSystem with options {}", options); - this.options = Preconditions.checkNotNull(options); + LOGGER.info("Creating GSFileSystem with options {}", options); } @Override - public RecoverableWriter createRecoverableWriter() { - LOGGER.info("Creating recoverable writer with options {}", options); + public RecoverableWriter createRecoverableWriter() throws IOException { + + // follow the same rules as for the Hadoop connector, i.e. + // 1) only use service credentials at all if Hadoop + // "google.cloud.auth.service.account.enable" is true (default: true) + // 2) use GOOGLE_APPLICATION_CREDENTIALS as location of credentials, if supplied + // 3) use Hadoop "google.cloud.auth.service.account.json.keyfile" as location of + // credentials, if supplied + // 4) use no credentials Review comment: Yes, I agree. I'll add a note to the docs (on the other PR) to call this out, and I'll work on some unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org