This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 846ab49afd2 [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL 846ab49afd2 is described below commit 846ab49afd20ecf49fe76e18dd3e9b41143bf207 Author: Patrick Lucas <m...@patricklucas.com> AuthorDate: Thu Nov 30 10:27:14 2023 +0100 [FLINK-33694][gs-fs-hadoop] Support overriding GCS root URL --- .../apache/flink/fs/gs/GSFileSystemFactory.java | 15 ++++++-- .../org/apache/flink/fs/gs/utils/ConfigUtils.java | 10 ++++++ .../flink/fs/gs/GSFileSystemFactoryTest.java | 41 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java index 052b6aaddc3..61937dc1551 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/GSFileSystemFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.fs.gs; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; @@ -92,11 +93,16 @@ public class GSFileSystemFactory implements FileSystemFactory { this.fileSystemOptions = new GSFileSystemOptions(flinkConfig); LOGGER.info("Using file system options {}", fileSystemOptions); - // get storage credentials and construct Storage instance + StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); + + // get storage credentials Optional<GoogleCredentials> credentials = ConfigUtils.getStorageCredentials(hadoopConfig, configContext); - StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder(); credentials.ifPresent(storageOptionsBuilder::setCredentials); + + // override the GCS root URL only if overridden in the Hadoop config + ConfigUtils.getGcsRootUrl(hadoopConfig).ifPresent(storageOptionsBuilder::setHost); + this.storage = storageOptionsBuilder.build().getService(); } @@ -123,6 +129,11 @@ public class GSFileSystemFactory implements FileSystemFactory { return new GSFileSystem(googleHadoopFileSystem, storage, fileSystemOptions); } + @VisibleForTesting + Storage getStorage() { + return storage; + } + /** Config context implementation used at runtime. */ private static class RuntimeConfigContext implements ConfigUtils.ConfigContext { diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java index 6670e3d2f1d..cdb9bfffbdd 100644 --- a/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java +++ b/flink-filesystems/flink-gs-fs-hadoop/src/main/java/org/apache/flink/fs/gs/utils/ConfigUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.util.HadoopConfigLoader; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +32,7 @@ import java.io.StringWriter; import java.io.Writer; import java.util.Collections; import java.util.Optional; +import java.util.function.BiFunction; /** Utilities class for configuration of Hadoop and Google Storage. */ public class ConfigUtils { @@ -156,6 +158,14 @@ public class ConfigUtils { } } + public static Optional<String> getGcsRootUrl( + org.apache.hadoop.conf.Configuration hadoopConfig) { + // Ignore the default value, only returning a value if actually included in the config + BiFunction<String, String, String> getterFn = (key, defaultValue) -> hadoopConfig.get(key); + String value = GoogleHadoopFileSystemConfiguration.GCS_ROOT_URL.get(hadoopConfig, getterFn); + return Optional.ofNullable(value); + } + /** * Helper to serialize a Hadoop config to a string, for logging. * diff --git a/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java new file mode 100644 index 00000000000..44aa266804f --- /dev/null +++ b/flink-filesystems/flink-gs-fs-hadoop/src/test/java/org/apache/flink/fs/gs/GSFileSystemFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.gs; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.Test; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link GSFileSystemFactory}. */ +public class GSFileSystemFactoryTest { + + @Test + public void testOverrideStorageRootUrl() { + Configuration flinkConfig = new Configuration(); + flinkConfig.setString("gs.storage.root.url", "http://240.0.0.0:12345"); + + GSFileSystemFactory factory = new GSFileSystemFactory(); + factory.configure(flinkConfig); + + String gsStorageClientHost = factory.getStorage().getOptions().getHost(); + assertEquals(gsStorageClientHost, "http://240.0.0.0:12345"); + } +}