This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch HADOOP-19343
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 765f25b9762d08cfb3bdf32b57b08ad2b0689198
Author: Arunkumar Chacko <aruncha...@google.com>
AuthorDate: Tue Jul 8 23:18:42 2025 +0000

    HADOOP-19343. Add additional authentication support
    
    Closes #7779
    
    Co-authored-by: Chris Nauroth <cnaur...@apache.org>
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
---
 .../hadoop-gcp/dev-support/findbugs-exclude.xml    |   5 +
 .../apache/hadoop/fs/gs/GcsInstrumentation.java    |  63 ++++
 .../org/apache/hadoop/fs/gs/GcsListOperation.java  |  20 +-
 .../org/apache/hadoop/fs/gs/GcsStatistics.java     |  72 +++++
 .../apache/hadoop/fs/gs/GcsStorageStatistics.java  |  52 +++
 .../apache/hadoop/fs/gs/GoogleCloudStorage.java    |  54 +++-
 .../hadoop/fs/gs/GoogleCloudStorageExceptions.java |  23 ++
 .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java |  21 +-
 .../hadoop/fs/gs/GoogleHadoopFSInputStream.java    |   2 +-
 .../hadoop/fs/gs/GoogleHadoopFileSystem.java       | 349 +++++++++++++--------
 .../fs/gs/GoogleHadoopFileSystemConfiguration.java |  39 ++-
 .../hadoop/fs/gs/GoogleHadoopOutputStream.java     |   2 +-
 .../hadoop/fs/gs/HadoopConfigurationProperty.java  |  21 ++
 .../fs/gs/HadoopCredentialsConfiguration.java      | 213 +++++++++++++
 .../org/apache/hadoop/fs/gs/RedactedString.java    |  50 +++
 .../org/apache/hadoop/fs/gs/StatisticTypeEnum.java |  23 ++
 .../java/org/apache/hadoop/fs/gs/StringPaths.java  |   2 +-
 .../markdown/tools/hadoop-gcp/Configuration.md     | 197 ++++++++++++
 .../src/site/markdown/tools/hadoop-gcp/testing.md  |  25 +-
 .../ITestGoogleContractContentSummary.java         |  29 ++
 .../contract/ITestGoogleContractRootDirectory.java |  31 ++
 .../gs/contract/ITestGoogleContractUnbuffer.java   |  29 ++
 22 files changed, 1145 insertions(+), 177 deletions(-)

diff --git a/hadoop-tools/hadoop-gcp/dev-support/findbugs-exclude.xml 
b/hadoop-tools/hadoop-gcp/dev-support/findbugs-exclude.xml
index 80be329bd6d..0063be022db 100644
--- a/hadoop-tools/hadoop-gcp/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-gcp/dev-support/findbugs-exclude.xml
@@ -26,4 +26,9 @@
     <Method name="createItemInfoForBlob" />
     <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE" />
   </Match>
+    <Match>
+    <Class name="org.apache.hadoop.fs.gs.GoogleCloudStorageExceptions" />
+    <Method name="createCompositeException" />
+    <Bug pattern="NP_NULL_ON_SOME_PATH" />
+  </Match>
 </FindBugsFilter>
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsInstrumentation.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsInstrumentation.java
new file mode 100644
index 00000000000..273d29d2f49
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsInstrumentation.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
+
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
+class GcsInstrumentation implements Closeable, IOStatisticsSource {
+  private final IOStatisticsStore instanceIOStatistics;
+
+  GcsInstrumentation() {
+    IOStatisticsStoreBuilder storeBuilder = iostatisticsStore();
+
+    // declare all counter statistics
+    EnumSet.allOf(GcsStatistics.class).stream()
+        .filter(statistic ->
+            statistic.getType() == StatisticTypeEnum.TYPE_COUNTER)
+        .forEach(stat -> {
+          storeBuilder.withCounters(stat.getSymbol());
+        });
+
+    EnumSet.allOf(GcsStatistics.class).stream()
+        .filter(statistic ->
+            statistic.getType() == StatisticTypeEnum.TYPE_DURATION)
+        .forEach(stat -> {
+          storeBuilder.withDurationTracking(stat.getSymbol());
+        });
+
+    this.instanceIOStatistics = storeBuilder.build();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public IOStatisticsStore getIOStatistics() {
+    return instanceIOStatistics;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
index 9cd4fdbb867..c3de5dd1c13 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
@@ -24,8 +24,6 @@
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.Storage;
 
-import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
-
 final class GcsListOperation {
   private static final int ALL = 0;
   private final Storage.BlobListOption[] listOptions;
@@ -72,7 +70,11 @@ Builder forRecursiveListing() {
     }
 
     GcsListOperation build() {
-      blobListOptions.add(Storage.BlobListOption.prefix(prefix));
+      // Can be null while listing the root directory.
+      if (prefix != null) {
+        blobListOptions.add(Storage.BlobListOption.prefix(prefix));
+      }
+
       return new GcsListOperation(this);
     }
 
@@ -82,13 +84,11 @@ Builder forCurrentDirectoryListing() {
       return this;
     }
 
-    Builder forCurrentDirectoryListingWithLimit(int theLimit) {
-      checkArgument(
-          theLimit > 0,
-          "limit should be greater than 0. found %d; prefix=%s", theLimit, 
prefix);
-
-      this.limit = theLimit;
-      prefix = StringPaths.toDirectoryPath(prefix);
+    Builder forImplicitDirectoryCheck() {
+      this.limit = 1;
+      if (prefix != null) {
+        prefix = StringPaths.toDirectoryPath(prefix);
+      }
 
       blobListOptions.add(Storage.BlobListOption.pageSize(1));
       forCurrentDirectoryListing();
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStatistics.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStatistics.java
new file mode 100644
index 00000000000..3e78bd24acc
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStatistics.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+
+import static org.apache.hadoop.fs.gs.StatisticTypeEnum.TYPE_DURATION;
+
+enum GcsStatistics {
+  INVOCATION_GET_FILE_STATUS(
+      StoreStatisticNames.OP_GET_FILE_STATUS,
+      "Calls of getFileStatus()",
+      TYPE_DURATION),
+  INVOCATION_CREATE(
+      StoreStatisticNames.OP_CREATE,
+      "Calls of create()",
+      TYPE_DURATION),
+  INVOCATION_DELETE(
+      StoreStatisticNames.OP_DELETE,
+      "Calls of delete()",
+      TYPE_DURATION),
+  INVOCATION_RENAME(
+      StoreStatisticNames.OP_RENAME,
+      "Calls of rename()",
+      TYPE_DURATION),
+  INVOCATION_OPEN(
+      StoreStatisticNames.OP_OPEN,
+      "Calls of open()",
+      TYPE_DURATION),
+  INVOCATION_MKDIRS(
+      StoreStatisticNames.OP_MKDIRS,
+      "Calls of mkdirs()",
+      TYPE_DURATION),
+  INVOCATION_LIST_STATUS(
+      StoreStatisticNames.OP_LIST_STATUS,
+      "Calls of listStatus()",
+      TYPE_DURATION);
+
+  private final String description;
+  private final StatisticTypeEnum type;
+  private final String symbol;
+
+  StatisticTypeEnum getType() {
+    return this.type;
+  }
+
+  String getSymbol() {
+    return this.symbol;
+  }
+
+  GcsStatistics(String symbol, String description, StatisticTypeEnum type) {
+    this.symbol = symbol;
+    this.description = description;
+    this.type = type;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStorageStatistics.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStorageStatistics.java
new file mode 100644
index 00000000000..394007817a8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsStorageStatistics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.impl.StorageStatisticsFromIOStatistics;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GcsStorageStatistics
+    extends StorageStatisticsFromIOStatistics {
+  static final String NAME = "GhfsStorageStatistics";
+
+  GcsStorageStatistics(final IOStatistics ioStatistics) {
+    super(NAME, Constants.SCHEME, ioStatistics);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (Iterator<LongStatistic> it = this.getLongStatistics(); it.hasNext();) 
{
+      LongStatistic statistic = it.next();
+
+      if (sb.length() != 0) {
+        sb.append(", ");
+      }
+      sb.append(String.format("%s=%s", statistic.getName(), 
statistic.getValue()));
+    }
+
+    return String.format("[%s]", sb);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
index e8cafa57ef8..24addcdd339 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.gs;
 
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
 import static java.lang.Math.toIntExact;
@@ -27,7 +28,9 @@
 import com.google.api.client.util.ExponentialBackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.gax.paging.Page;
+import com.google.auth.Credentials;
 import com.google.cloud.storage.*;
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.thirdparty.com.google.common.io.BaseEncoding;
@@ -55,7 +58,7 @@
  * client</a>.
  */
 class GoogleCloudStorage {
-  static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
+  static final Logger LOG = LoggerFactory.getLogger(GoogleCloudStorage.class);
   static final List<Storage.BlobField> BLOB_FIELDS =
       ImmutableList.of(
           Storage.BlobField.BUCKET, Storage.BlobField.CONTENT_ENCODING,
@@ -76,18 +79,19 @@ class GoogleCloudStorage {
    * Having an instance of gscImpl to redirect calls to Json client while new 
client implementation
    * is in WIP.
    */
-  GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration) throws 
IOException {
-    // TODO: Set credentials
-    this.storage = createStorage(configuration.getProjectId());
+  GoogleCloudStorage(GoogleHadoopFileSystemConfiguration configuration, 
Credentials credentials)
+          throws IOException {
+    this.storage = createStorage(configuration.getProjectId(), credentials);
     this.configuration = configuration;
   }
 
-  private static Storage createStorage(String projectId) {
+  private static Storage createStorage(String projectId, Credentials 
credentials) {
+    StorageOptions.Builder builder = StorageOptions.newBuilder();
     if (projectId != null) {
-      return 
StorageOptions.newBuilder().setProjectId(projectId).build().getService();
+      builder.setProjectId(projectId);
     }
 
-    return StorageOptions.newBuilder().build().getService();
+    return builder.setCredentials(credentials).build().getService();
   }
 
   WritableByteChannel create(final StorageResourceId resourceId, final 
CreateFileOptions options)
@@ -494,7 +498,9 @@ List<GoogleCloudStorageItemInfo> 
listDirectoryRecursive(String bucketName, Strin
     // TODO: Take delimiter from config
     // TODO: Set specific fields
 
-    checkArgument(objectName.endsWith("/"), String.format("%s should end with 
/", objectName));
+    checkArgument(
+            objectName == null || objectName.endsWith("/"),
+            String.format("%s should end with /", objectName));
     try {
       List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectName, 
storage)
           .forRecursiveListing().build()
@@ -887,7 +893,7 @@ List<GoogleCloudStorageItemInfo> 
getItemInfos(List<StorageResourceId> resourceId
   List<GoogleCloudStorageItemInfo> listDirectory(String bucketName, String 
objectNamePrefix)
       throws IOException {
     checkArgument(
-        objectNamePrefix.endsWith("/"),
+        objectNamePrefix == null || objectNamePrefix.endsWith("/"),
         String.format("%s should end with /", objectNamePrefix));
 
     try {
@@ -971,7 +977,7 @@ GoogleCloudStorageItemInfo 
getFileOrDirectoryInfo(StorageResourceId resourceId)
   GoogleCloudStorageItemInfo getImplicitDirectory(StorageResourceId 
resourceId) {
     List<Blob> blobs = new GcsListOperation
         .Builder(resourceId.getBucketName(), resourceId.getObjectName(), 
storage)
-        .forCurrentDirectoryListingWithLimit(1).build()
+        .forImplicitDirectoryCheck().build()
         .execute();
 
     if (blobs.isEmpty()) {
@@ -981,6 +987,34 @@ GoogleCloudStorageItemInfo 
getImplicitDirectory(StorageResourceId resourceId) {
     return 
GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
   }
 
+  public void deleteBuckets(List<String> bucketNames) throws IOException {
+    LOG.trace("deleteBuckets({})", bucketNames);
+
+    // Validate all the inputs first.
+    for (String bucketName : bucketNames) {
+      checkArgument(!Strings.isNullOrEmpty(bucketName), "bucketName must not 
be null or empty");
+    }
+
+    // Gather exceptions to wrap in a composite exception at the end.
+    List<IOException> innerExceptions = new ArrayList<>();
+
+    for (String bucketName : bucketNames) {
+      try {
+        boolean isDeleted = storage.delete(bucketName);
+        if (!isDeleted) {
+          innerExceptions.add(createFileNotFoundException(bucketName, null, 
null));
+        }
+      } catch (StorageException e) {
+        innerExceptions.add(
+                new IOException(String.format("Error deleting '%s' bucket", 
bucketName), e));
+      }
+    }
+
+    if (!innerExceptions.isEmpty()) {
+      throw 
GoogleCloudStorageExceptions.createCompositeException(innerExceptions);
+    }
+  }
+
   // Helper class to capture the results of list operation.
   private class ListOperationResult {
     private final Map<String, Blob> prefixes = new HashMap<>();
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
index 95f0e41617c..db7ffa7eb95 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageExceptions.java
@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.fs.gs;
 
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.nullToEmpty;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
 import javax.annotation.Nullable;
 
 /**
@@ -55,4 +59,23 @@ static FileNotFoundException createFileNotFoundException(
     return createFileNotFoundException(
         resourceId.getBucketName(), resourceId.getObjectName(), cause);
   }
+
+  public static IOException createCompositeException(Collection<IOException> 
innerExceptions) {
+    Preconditions.checkArgument(
+            innerExceptions != null && !innerExceptions.isEmpty(),
+            "innerExceptions (%s) must be not null and contain at least one 
element",
+            innerExceptions);
+
+    Iterator<IOException> innerExceptionIterator = innerExceptions.iterator();
+
+    if (innerExceptions.size() == 1) {
+      return innerExceptionIterator.next();
+    }
+
+    IOException combined = new IOException("Multiple IOExceptions.");
+    while (innerExceptionIterator.hasNext()) {
+      combined.addSuppressed(innerExceptionIterator.next());
+    }
+    return combined;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
index 8cf11d009c8..7da6a83c417 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
@@ -55,7 +55,7 @@
  * Provides FS semantics over GCS based on Objects API.
  */
 class GoogleCloudStorageFileSystem {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(GoogleCloudStorageFileSystem.class);
   // Comparator used for sorting paths.
   //
   // For some bulk operations, we need to operate on parent directories before
@@ -93,7 +93,7 @@ private static GoogleCloudStorage createCloudStorage(
       throws IOException {
     checkNotNull(configuration, "configuration must not be null");
 
-    return new GoogleCloudStorage(configuration);
+    return new GoogleCloudStorage(configuration, credentials);
   }
 
   GoogleCloudStorageFileSystem(final GoogleHadoopFileSystemConfiguration 
configuration,
@@ -330,12 +330,19 @@ private void deleteObjects(List<FileInfo> itemsToDelete) 
throws IOException {
   }
 
   private void deleteBucket(List<FileInfo> bucketsToDelete) throws IOException 
{
-    if (bucketsToDelete == null || bucketsToDelete.isEmpty()) {
-      return;
-    }
+    if (!bucketsToDelete.isEmpty()) {
+      List<String> bucketNames = new ArrayList<>(bucketsToDelete.size());
+      for (FileInfo bucketInfo : bucketsToDelete) {
+        
bucketNames.add(bucketInfo.getItemInfo().getResourceId().getBucketName());
+      }
 
-    // TODO: Add support for deleting bucket
-    throw new UnsupportedOperationException("deleteBucket is not supported.");
+      if (configuration.isBucketDeleteEnabled()) {
+        gcs.deleteBuckets(bucketNames);
+      } else {
+        LOG.info("Skipping deletion of buckets because enableBucketDelete is 
false: {}",
+                bucketNames);
+      }
+    }
   }
 
   FileInfo getFileInfoObject(URI path) throws IOException {
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
index 26629fc79b2..79ffbcc6f39 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFSInputStream.java
@@ -72,7 +72,7 @@ private GoogleHadoopFSInputStream(
       URI gcsPath,
       SeekableByteChannel channel,
       FileSystem.Statistics statistics) {
-    LOG.trace("GoogleHadoopFSInputStream(gcsPath: %s)", gcsPath);
+    LOG.trace("GoogleHadoopFSInputStream(gcsPath: {})", gcsPath);
     this.gcsPath = gcsPath;
     this.channel = channel;
     this.statistics = statistics;
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
index 3c4e84ce1eb..172d1347582 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
@@ -19,15 +19,19 @@
 package org.apache.hadoop.fs.gs;
 
 import static 
org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.gs.Constants.GCS_CONFIG_PREFIX;
 import static 
org.apache.hadoop.fs.gs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY;
 
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
 
 import com.google.auth.oauth2.GoogleCredentials;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.thirdparty.com.google.common.base.Ascii;
 
 import java.io.FileNotFoundException;
@@ -48,6 +52,7 @@
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +67,7 @@
  * particular, it is not subject to bucket-naming constraints, and files are 
allowed to be placed in
  * root.
  */
-public class GoogleHadoopFileSystem extends FileSystem {
+public class GoogleHadoopFileSystem extends FileSystem implements 
IOStatisticsSource {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopFileSystem.class);
 
@@ -90,7 +95,7 @@ public class GoogleHadoopFileSystem extends FileSystem {
    * allow modifying or querying the value. Modifying this value allows one to 
control how many
    * mappers are used to process a given file.
    */
-  private long defaultBlockSize = 
GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.getDefault();
+  private final long defaultBlockSize = 
GoogleHadoopFileSystemConfiguration.BLOCK_SIZE.getDefault();
 
   // The bucket the file system is rooted in used for default values of:
   // -- working directory
@@ -106,7 +111,21 @@ public class GoogleHadoopFileSystem extends FileSystem {
   private boolean isClosed;
   private FsPermission reportedPermissions;
 
-  public GoogleHadoopFileSystemConfiguration getFileSystemConfiguration() {
+  /**
+   * Setting this to static inorder to have a singleton instance. This will 
help us get the JVM
+   * level metrics. Note that we use this to generate Global Storage 
Statistics. If we make this
+   * an instance field, only the first filesystem instance metrics will be 
updated since while
+   * initializing GlobalStorageStatistics (refer initialize()) only the first 
instance will be
+   * registered.
+   *
+   * For filesystem instance level instrumentation, one more per instance 
object can be created
+   * and both be updated.
+   */
+  private static GcsInstrumentation instrumentation = new GcsInstrumentation();
+  private GcsStorageStatistics storageStatistics;
+
+
+  GoogleHadoopFileSystemConfiguration getFileSystemConfiguration() {
     return fileSystemConfiguration;
   }
 
@@ -132,6 +151,9 @@ public void initialize(final URI path, Configuration 
config) throws IOException
     // be sufficient (and is required) for the delegation token binding 
initialization.
     setConf(config);
 
+    storageStatistics = createStorageStatistics(
+            requireNonNull(getIOStatistics()));
+
     this.reportedPermissions = new FsPermission(PERMISSIONS_TO_REPORT);
 
     initializeFsRoot();
@@ -141,6 +163,12 @@ public void initialize(final URI path, Configuration 
config) throws IOException
     initializeGcsFs(fileSystemConfiguration);
   }
 
+  private static GcsStorageStatistics createStorageStatistics(
+          final IOStatistics ioStatistics) {
+    return (GcsStorageStatistics) GlobalStorageStatistics.INSTANCE
+            .put(GcsStorageStatistics.NAME, () -> new 
GcsStorageStatistics(ioStatistics));
+  }
+
   private void initializeFsRoot() {
     String rootBucket = initUri.getAuthority();
     checkArgument(rootBucket != null, "No bucket specified in GCS URI: {}", 
initUri);
@@ -180,9 +208,9 @@ private GoogleCredentials 
getCredentials(GoogleHadoopFileSystemConfiguration con
     return getCredentials(config, GCS_CONFIG_PREFIX);
   }
 
-  public static GoogleCredentials 
getCredentials(GoogleHadoopFileSystemConfiguration config,
+  static GoogleCredentials getCredentials(GoogleHadoopFileSystemConfiguration 
config,
       String... keyPrefixesVararg) throws IOException {
-    return GoogleCredentials.getApplicationDefault(); // TODO: Add other Auth 
mechanisms
+    return HadoopCredentialsConfiguration.getCredentials(config.getConfig(), 
keyPrefixesVararg);
   }
 
   @Override
@@ -194,7 +222,7 @@ protected void checkPath(final Path path) {
     String scheme = uri.getScheme();
     if (scheme != null && !scheme.equalsIgnoreCase(getScheme())) {
       throw new IllegalArgumentException(
-          String.format("Wrong scheme: {}, in path: {}, expected scheme: {}", 
scheme, path,
+          String.format("Wrong scheme: %s, in path: %s, expected scheme: %s", 
scheme, path,
               getScheme()));
     }
 
@@ -207,7 +235,7 @@ protected void checkPath(final Path path) {
     }
 
     throw new IllegalArgumentException(
-        String.format("Wrong bucket: {}, in path: {}, expected bucket: {}", 
bucket, path,
+        String.format("Wrong bucket: %s, in path: %s, expected bucket: %s", 
bucket, path,
             rootBucket));
   }
 
@@ -270,30 +298,40 @@ public String getScheme() {
 
   @Override
   public FSDataInputStream open(final Path hadoopPath, final int bufferSize) 
throws IOException {
-    LOG.trace("open({})", hadoopPath);
-    URI gcsPath = getGcsPath(hadoopPath);
-    return new FSDataInputStream(GoogleHadoopFSInputStream.create(this, 
gcsPath, statistics));
+    return runOperation(
+      GcsStatistics.INVOCATION_OPEN,
+      () -> {
+        LOG.trace("open({})", hadoopPath);
+        URI gcsPath = getGcsPath(hadoopPath);
+        return new FSDataInputStream(GoogleHadoopFSInputStream.create(this, 
gcsPath, statistics));
+      },
+      String.format("open(%s)", hadoopPath));
   }
 
   @Override
-  public FSDataOutputStream create(Path hadoopPath, FsPermission permission, 
boolean overwrite,
-      int bufferSize, short replication, long blockSize, Progressable 
progress) throws IOException {
-    checkArgument(hadoopPath != null, "hadoopPath must not be null");
-    checkArgument(replication > 0, "replication must be a positive integer: 
%s", replication);
-    checkArgument(blockSize > 0, "blockSize must be a positive integer: %s", 
blockSize);
+  public FSDataOutputStream create(
+          Path hadoopPath, FsPermission permission, boolean overwrite, int 
bufferSize,
+          short replication, long blockSize, Progressable progress) throws 
IOException {
+    return runOperation(
+      GcsStatistics.INVOCATION_CREATE,
+      () -> {
+        checkArgument(hadoopPath != null, "hadoopPath must not be null");
+        checkArgument(replication > 0, "replication must be a positive 
integer: %s", replication);
+        checkArgument(blockSize > 0, "blockSize must be a positive integer: 
%s", blockSize);
 
-    checkOpen();
+        checkOpen();
 
-    LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} 
[ignored])", hadoopPath,
-        overwrite, bufferSize);
+        LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} 
[ignored])", hadoopPath,
+                overwrite, bufferSize);
 
-    CreateFileOptions.WriteMode writeMode =
-        overwrite ? CreateFileOptions.WriteMode.OVERWRITE : 
CreateFileOptions.WriteMode.CREATE_NEW;
-    FSDataOutputStream response = new FSDataOutputStream(
-        new GoogleHadoopOutputStream(this, getGcsPath(hadoopPath),
-            CreateFileOptions.builder().setWriteMode(writeMode).build(), 
statistics), statistics);
+        CreateFileOptions.WriteMode writeMode = overwrite ?
+                CreateFileOptions.WriteMode.OVERWRITE : 
CreateFileOptions.WriteMode.CREATE_NEW;
 
-    return response;
+        CreateFileOptions fileOptions = 
CreateFileOptions.builder().setWriteMode(writeMode).build();
+        return new FSDataOutputStream(new GoogleHadoopOutputStream(
+                this, getGcsPath(hadoopPath), fileOptions, statistics), 
statistics);
+      },
+      String.format("create(%s, %s)", hadoopPath, overwrite));
   }
 
   @Override
@@ -386,91 +424,112 @@ public void concat(Path tgt, Path[] srcs) throws 
IOException {
 
   @Override
   public boolean rename(final Path src, final Path dst) throws IOException {
-    LOG.trace("rename({}, {})", src, dst);
-
-    checkArgument(src != null, "src must not be null");
-    checkArgument(dst != null, "dst must not be null");
-
-    // Even though the underlying GCSFS will also throw an IAE if src is root, 
since our filesystem
-    // root happens to equal the global root, we want to explicitly check it 
here since derived
-    // classes may not have filesystem roots equal to the global root.
-    if (this.makeQualified(src).equals(fsRoot)) {
-      LOG.trace("rename(src: {}, dst: {}): false [src is a root]", src, dst);
-      return false;
-    }
-
-    try {
-      checkOpen();
-
-      URI srcPath = getGcsPath(src);
-      URI dstPath = getGcsPath(dst);
-      getGcsFs().rename(srcPath, dstPath);
-
-      LOG.trace("rename(src: {}, dst: {}): true", src, dst);
-    } catch (IOException e) {
-      if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
-        throw e;
-      }
-      LOG.trace("rename(src: %s, dst: %s): false [failed]", src, dst, e);
-      return false;
-    }
-
-    return true;
+    return runOperation(GcsStatistics.INVOCATION_RENAME,
+      () -> {
+        LOG.trace("rename({}, {})", src, dst);
+
+        checkArgument(src != null, "src must not be null");
+        checkArgument(dst != null, "dst must not be null");
+
+        // Even though the underlying GCSFS will also throw an IAE if src is 
root, since our
+        // filesystem root happens to equal the global root, we want to 
explicitly check it
+        // here since derived classes may not have filesystem roots equal to 
the global root.
+        if (this.makeQualified(src).equals(fsRoot)) {
+          LOG.trace("rename(src: {}, dst: {}): false [src is a root]", src, 
dst);
+          return false;
+        }
+
+        try {
+          checkOpen();
+
+          URI srcPath = getGcsPath(src);
+          URI dstPath = getGcsPath(dst);
+          getGcsFs().rename(srcPath, dstPath);
+
+          LOG.trace("rename(src: {}, dst: {}): true", src, dst);
+        } catch (IOException e) {
+          if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
+            throw e;
+          }
+          LOG.trace("rename(src: {}, dst: {}): false [failed]", src, dst, e);
+          return false;
+        }
+
+        return true;
+      },
+      String.format("rename(%s, %s)", src, dst));
   }
 
   @Override
   public boolean delete(final Path hadoopPath, final boolean recursive) throws 
IOException {
-    LOG.trace("delete({}, {})", hadoopPath, recursive);
-    checkArgument(hadoopPath != null, "hadoopPath must not be null");
+    return runOperation(GcsStatistics.INVOCATION_DELETE,
+      () -> {
+        LOG.trace("delete({}, {})", hadoopPath, recursive);
+        checkArgument(hadoopPath != null, "hadoopPath must not be null");
 
-    checkOpen();
+        checkOpen();
 
-    URI gcsPath = getGcsPath(hadoopPath);
-    try {
-      getGcsFs().delete(gcsPath, recursive);
-    } catch (DirectoryNotEmptyException e) {
-      throw e;
-    } catch (IOException e) {
-      if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
-        throw e;
-      }
+        URI gcsPath = getGcsPath(hadoopPath);
+        try {
+          getGcsFs().delete(gcsPath, recursive);
+        } catch (DirectoryNotEmptyException e) {
+          throw e;
+        } catch (IOException e) {
+          if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
+            throw e;
+          }
 
-      LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]", 
hadoopPath, recursive, e);
-      return false;
-    }
+          LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]",
+                  hadoopPath, recursive, e);
+          return false;
+        }
+
+        LOG.trace("delete(hadoopPath: {}, recursive: {}): true",
+                hadoopPath, recursive);
+        return true;
+      },
+      String.format("delete(%s,%s", hadoopPath, recursive));
+  }
 
-    LOG.trace("delete(hadoopPath: %s, recursive: %b): true", hadoopPath, 
recursive);
-    return true;
+  private <B> B runOperation(GcsStatistics stat, CallableRaisingIOE<B> 
operation, String context)
+          throws IOException {
+    LOG.trace("{}({})", stat, context);
+    return trackDuration(instrumentation.getIOStatistics(), stat.getSymbol(), 
operation);
   }
 
   @Override
   public FileStatus[] listStatus(final Path hadoopPath) throws IOException {
-    checkArgument(hadoopPath != null, "hadoopPath must not be null");
-
-    checkOpen();
-
-    LOG.trace("listStatus(hadoopPath: {})", hadoopPath);
-
-    URI gcsPath = getGcsPath(hadoopPath);
-    List<FileStatus> status;
-
-    try {
-      List<FileInfo> fileInfos = getGcsFs().listDirectory(gcsPath);
-      status = new ArrayList<>(fileInfos.size());
-      String userName = getUgiUserName();
-      for (FileInfo fileInfo : fileInfos) {
-        status.add(getFileStatus(fileInfo, userName));
-      }
-    } catch (FileNotFoundException fnfe) {
-      throw (FileNotFoundException)
-          new FileNotFoundException(
-              String.format(
-                  "listStatus(hadoopPath: %s): '%s' does not exist.",
-                  hadoopPath, gcsPath))
-              .initCause(fnfe);
-    }
-
-    return status.toArray(new FileStatus[0]);
+    return runOperation(
+      GcsStatistics.INVOCATION_LIST_STATUS,
+      () -> {
+        checkArgument(hadoopPath != null, "hadoopPath must not be null");
+
+        checkOpen();
+
+        LOG.trace("listStatus(hadoopPath: {})", hadoopPath);
+
+        URI gcsPath = getGcsPath(hadoopPath);
+        List<FileStatus> status;
+
+        try {
+          List<FileInfo> fileInfos = getGcsFs().listDirectory(gcsPath);
+          status = new ArrayList<>(fileInfos.size());
+          String userName = getUgiUserName();
+          for (FileInfo fileInfo : fileInfos) {
+            status.add(getFileStatus(fileInfo, userName));
+          }
+        } catch (FileNotFoundException fnfe) {
+          throw (FileNotFoundException)
+                  new FileNotFoundException(
+                          String.format(
+                                  "listStatus(hadoopPath: %s): '%s' does not 
exist.",
+                                  hadoopPath, gcsPath))
+                          .initCause(fnfe);
+        }
+
+        return status.toArray(new FileStatus[0]);
+      },
+      String.format("listStatus(%s", hadoopPath));
   }
 
   /**
@@ -522,7 +581,7 @@ public URI getUri() {
   @Override
   protected int getDefaultPort() {
     int result = -1;
-    LOG.trace("getDefaultPort(): %d", result);
+    LOG.trace("getDefaultPort(): {}", result);
     return result;
   }
 
@@ -553,43 +612,53 @@ public Path getWorkingDirectory() {
 
   @Override
   public boolean mkdirs(final Path hadoopPath, final FsPermission permission) 
throws IOException {
-    checkArgument(hadoopPath != null, "hadoopPath must not be null");
+    return runOperation(
+      GcsStatistics.INVOCATION_MKDIRS,
+      () -> {
+        checkArgument(hadoopPath != null, "hadoopPath must not be null");
 
-    LOG.trace("mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, 
permission);
-
-    checkOpen();
-
-    URI gcsPath = getGcsPath(hadoopPath);
-    try {
-      getGcsFs().mkdirs(gcsPath);
-    } catch (java.nio.file.FileAlreadyExistsException faee) {
-      // Need to convert to the Hadoop flavor of FileAlreadyExistsException.
-      throw (FileAlreadyExistsException)
-          new FileAlreadyExistsException(
-              String.format(
-                  "mkdirs(hadoopPath: %s, permission: %s): failed",
-                  hadoopPath, permission))
-              .initCause(faee);
-    }
+        LOG.trace("mkdirs(hadoopPath: {}, permission: {}): true", hadoopPath, 
permission);
 
-    return true;
+        checkOpen();
+
+        URI gcsPath = getGcsPath(hadoopPath);
+        try {
+          getGcsFs().mkdirs(gcsPath);
+        } catch (java.nio.file.FileAlreadyExistsException faee) {
+          // Need to convert to the Hadoop flavor of 
FileAlreadyExistsException.
+          throw (FileAlreadyExistsException)
+                  new FileAlreadyExistsException(
+                          String.format(
+                                  "mkdirs(hadoopPath: %s, permission: %s): 
failed",
+                                  hadoopPath, permission))
+                          .initCause(faee);
+        }
+
+        return true;
+      },
+      String.format("mkdirs(%s)", hadoopPath));
   }
 
   @Override
   public FileStatus getFileStatus(final Path path) throws IOException {
-    checkArgument(path != null, "path must not be null");
-
-    checkOpen();
-
-    URI gcsPath = getGcsPath(path);
-    FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
-    if (!fileInfo.exists()) {
-      throw new FileNotFoundException(
-          String.format(
-              "%s not found: %s", fileInfo.isDirectory() ? "Directory" : 
"File", path));
-    }
-    String userName = getUgiUserName();
-    return getFileStatus(fileInfo, userName);
+    return runOperation(
+      GcsStatistics.INVOCATION_GET_FILE_STATUS,
+      () -> {
+        checkArgument(path != null, "path must not be null");
+
+        checkOpen();
+
+        URI gcsPath = getGcsPath(path);
+        FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
+        if (!fileInfo.exists()) {
+          throw new FileNotFoundException(
+                  String.format(
+                          "%s not found: %s", fileInfo.isDirectory() ? 
"Directory" : "File", path));
+        }
+        String userName = getUgiUserName();
+        return getFileStatus(fileInfo, userName);
+      },
+      String.format("getFileStatus(%s)", path));
   }
 
   /**
@@ -661,6 +730,26 @@ public void setWorkingDirectory(final Path hadoopPath) {
     LOG.trace("setWorkingDirectory(hadoopPath: {}): {}", hadoopPath, 
workingDirectory);
   }
 
+  /**
+   * Get the instrumentation's IOStatistics.
+   * @return statistics
+   */
+  @Override
+  public IOStatistics getIOStatistics() {
+    return instrumentation != null
+            ? instrumentation.getIOStatistics()
+            : null;
+  }
+
+  /**
+   * Get the storage statistics of this filesystem.
+   * @return the storage statistics
+   */
+  @Override
+  public GcsStorageStatistics getStorageStatistics() {
+    return this.storageStatistics;
+  }
+
   private static String getUgiUserName() throws IOException {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     return ugi.getShortUserName();
@@ -684,4 +773,4 @@ private FileStatus getFileStatus(FileInfo fileInfo, String 
userName) {
     LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), 
userName, status);
     return status;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
index 4097b5e1f83..9d48747ad58 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
@@ -130,6 +130,19 @@ class GoogleHadoopFileSystemConfiguration {
           "fs.gs.outputstream.sync.min.interval",
           0L);
 
+  /**
+   * If true, recursive delete on a path that refers to a GCS bucket itself 
('/' for any
+   * bucket-rooted GoogleHadoopFileSystem) or delete on that path when it's 
empty will result in
+   * fully deleting the GCS bucket. If false, any operation that normally 
would have deleted the
+   * bucket will be ignored instead. Setting to 'false' preserves the typical 
behavior of "rm -rf /"
+   * which translates to deleting everything inside of root, but without 
clobbering the filesystem
+   * authority corresponding to that root path in the process.
+   */
+  static final HadoopConfigurationProperty<Boolean> GCE_BUCKET_DELETE_ENABLE =
+          new HadoopConfigurationProperty<>(
+                  "fs.gs.bucket.delete.enable",
+                  false);
+
   private final String workingDirectory;
   private final String projectId;
   private final Configuration config;
@@ -169,31 +182,31 @@ long getInplaceSeekLimit() {
     return GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT.get(config, 
config::getLongBytes);
   }
 
-  public int getFadviseRequestTrackCount() {
+  int getFadviseRequestTrackCount() {
     return GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt);
   }
 
-  public boolean isGzipEncodingSupportEnabled() {
+  boolean isGzipEncodingSupportEnabled() {
     return GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE.get(config, 
config::getBoolean);
   }
 
-  public long getMinRangeRequestSize() {
+  long getMinRangeRequestSize() {
     return GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(config, 
config::getLongBytes);
   }
 
-  public long getBlockSize() {
+  long getBlockSize() {
     return BLOCK_SIZE.get(config, config::getLong);
   }
 
-  public boolean isReadExactRequestedBytesEnabled() {
+  boolean isReadExactRequestedBytesEnabled() {
     return false; //TODO: Remove this option?
   }
 
-  public long getMaxRewriteChunkSize() {
+  long getMaxRewriteChunkSize() {
     return GCS_REWRITE_MAX_CHUNK_SIZE.get(config, config::getLong);
   }
 
-  public Pattern getMarkerFilePattern() {
+  Pattern getMarkerFilePattern() {
     String pattern =  GCS_MARKER_FILE_PATTERN.get(config, config::get);
     if (pattern == null) {
       return null;
@@ -207,11 +220,19 @@ public Pattern getMarkerFilePattern() {
     return fileMarkerFilePattern;
   }
 
-  public boolean isEnsureNoConflictingItems() {
+  boolean isEnsureNoConflictingItems() {
     return GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE.get(config, 
config::getBoolean);
   }
 
-  public Duration getMinSyncInterval() {
+  Duration getMinSyncInterval() {
     return GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL.getTimeDuration(config);
   }
+
+  Configuration getConfig() {
+    return config;
+  }
+
+  boolean isBucketDeleteEnabled() {
+    return GCE_BUCKET_DELETE_ENABLE.get(config, config::getBoolean);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
index c41ce13edae..e1a87915a04 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
@@ -54,7 +54,7 @@
 
 class GoogleHadoopOutputStream extends OutputStream
     implements StreamCapabilities, Syncable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(GoogleHadoopOutputStream.class);
 
   // Prefix used for all temporary files created by this stream.
   private static final String TMP_FILE_PREFIX = "_GHFS_SYNC_TMP_FILE_";
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
index 450459e6a8d..71bd729f62e 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.function.BiFunction;
@@ -29,6 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
@@ -74,6 +76,25 @@ Duration getTimeDuration(Configuration config) {
         lookupKey, Duration.ofMillis(config.getTimeDuration(lookupKey, 
defValStr, MILLISECONDS)));
   }
 
+  HadoopConfigurationProperty<T> withPrefixes(List<String> prefixes) {
+    this.keyPrefixes = ImmutableList.copyOf(prefixes);
+    return this;
+  }
+
+  RedactedString getPassword(Configuration config) {
+    checkState(defaultValue == null || defaultValue instanceof String, "Not a 
string property");
+    String lookupKey = getLookupKey(config, key, (c, k) -> c.get(k) != null);
+    char[] value;
+    try {
+      value = config.getPassword(lookupKey);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return logProperty(
+            lookupKey,
+            RedactedString.create(value == null ? (String) defaultValue : 
String.valueOf(value)));
+  }
+
   private String getLookupKey(Configuration config, String lookupKey,
       BiFunction<Configuration, String, Boolean> checkFn) {
     for (String prefix : keyPrefixes) {
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopCredentialsConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopCredentialsConfiguration.java
new file mode 100644
index 00000000000..a0a36b7bfef
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopCredentialsConfiguration.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import com.google.auth.oauth2.ComputeEngineCredentials;
+import com.google.auth.oauth2.ExternalAccountCredentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.auth.oauth2.UserCredentials;
+import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The Hadoop credentials configuration.
+ *
+ * <p>When reading configuration this class makes use of a list of key 
prefixes that are each
+ * applied to key suffixes to create a complete configuration key. There is a 
base prefix of
+ * 'google.cloud.' that is included by the builder for each configuration key 
suffix. When
+ * constructing, other prefixes can be specified. Prefixes specified later can 
be used to override
+ * the values of previously set values. In this way a set of global 
credentials can be specified for
+ * most connectors with an override specified for any connectors that need 
different credentials.
+ */
+final class HadoopCredentialsConfiguration {
+
+  /**
+   * All instances constructed using the builder will use {@code google.cloud} 
as the first prefix
+   * checked. Other prefixes can be added and will override values in the 
{@code google.cloud}
+   * prefix.
+   */
+  private static final String BASE_KEY_PREFIX = "google.cloud";
+  private static   final String CLOUD_PLATFORM_SCOPE =
+          "https://www.googleapis.com/auth/cloud-platform";;
+  /** Key suffix used to configure authentication type. */
+  private static final HadoopConfigurationProperty<AuthenticationType> 
AUTHENTICATION_TYPE_SUFFIX =
+          new HadoopConfigurationProperty<>(".auth.type", 
AuthenticationType.COMPUTE_ENGINE);
+  /**
+   * Key suffix used to configure the path to a JSON file containing a Service 
Account key and
+   * identifier (email). Technically, this could be a JSON containing a 
non-service account user,
+   * but this setting is only used in the service account flow and is 
namespaced as such.
+   */
+  private static final HadoopConfigurationProperty<String> 
SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX =
+          new 
HadoopConfigurationProperty<>(".auth.service.account.json.keyfile");
+  /**
+   * Key suffix used to configure the path to a JSON file containing a 
workload identity federation,
+   * i.e. external account credential configuration. Technically, this could 
be a JSON containing an
+   * service account impersonation url and credential source. but this setting 
is only used in the
+   * workload identity federation flow and is namespaced as such.
+   */
+  private static final HadoopConfigurationProperty<String>
+          WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE_SUFFIX =
+          new HadoopConfigurationProperty<>(
+                  ".auth.workload.identity.federation.credential.config.file");
+
+  /** Key suffix for setting a token server URL to use to refresh OAuth token. 
*/
+  private static final HadoopConfigurationProperty<String> 
TOKEN_SERVER_URL_SUFFIX =
+          new HadoopConfigurationProperty<>(".token.server.url");
+
+  private static final HadoopConfigurationProperty<Long> READ_TIMEOUT_SUFFIX =
+          new HadoopConfigurationProperty<>(".http.read-timeout", 5_000L);
+  /**
+   * Configuration key for defining the OAUth2 client ID. Required when the 
authentication type is
+   * USER_CREDENTIALS
+   */
+  private static final HadoopConfigurationProperty<String> 
AUTH_CLIENT_ID_SUFFIX =
+          new HadoopConfigurationProperty<>(".auth.client.id");
+  /**
+   * Configuration key for defining the OAUth2 client secret. Required when 
the authentication type
+   * is USER_CREDENTIALS
+   */
+  private static final HadoopConfigurationProperty<RedactedString> 
AUTH_CLIENT_SECRET_SUFFIX =
+          new HadoopConfigurationProperty<>(".auth.client.secret");
+  /**
+   * Configuration key for defining the OAuth2 refresh token. Required when 
the authentication type
+   * is USER_CREDENTIALS
+   */
+  private static final HadoopConfigurationProperty<RedactedString> 
AUTH_REFRESH_TOKEN_SUFFIX =
+          new HadoopConfigurationProperty<>(".auth.refresh.token");
+
+  private HadoopCredentialsConfiguration() {}
+
+  /**
+   * Returns full list of config prefixes that will be resolved based on the 
order in returned list.
+   */
+  static List<String> getConfigKeyPrefixes(String... keyPrefixes) {
+    return 
ImmutableList.<String>builder().add(keyPrefixes).add(BASE_KEY_PREFIX).build();
+  }
+
+  /**
+   * Get the credentials for the configured {@link AuthenticationType}.
+   *
+   * @throws IllegalStateException if configured {@link AuthenticationType} is 
not recognized.
+   */
+  static GoogleCredentials getCredentials(Configuration config, String... 
keyPrefixesVararg)
+          throws IOException {
+    List<String> keyPrefixes = getConfigKeyPrefixes(keyPrefixesVararg);
+    return getCredentials(config, keyPrefixes);
+  }
+
+  @VisibleForTesting
+  static GoogleCredentials getCredentials(Configuration config, List<String> 
keyPrefixes)
+          throws IOException {
+    GoogleCredentials credentials = getCredentialsInternal(config, 
keyPrefixes);
+    return credentials == null ? null : configureCredentials(config, 
keyPrefixes, credentials);
+  }
+
+  private static GoogleCredentials getCredentialsInternal(
+          Configuration config, List<String> keyPrefixes) throws IOException {
+    AuthenticationType authenticationType =
+            AUTHENTICATION_TYPE_SUFFIX.withPrefixes(keyPrefixes).get(config, 
config::getEnum);
+    switch (authenticationType) {
+    case APPLICATION_DEFAULT:
+      return GoogleCredentials.getApplicationDefault();
+    case COMPUTE_ENGINE:
+      return ComputeEngineCredentials.newBuilder().build();
+    case SERVICE_ACCOUNT_JSON_KEYFILE:
+      String keyFile = SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX
+              .withPrefixes(keyPrefixes).get(config, config::get);
+
+      if (Strings.isNullOrEmpty(keyFile)) {
+        throw new IllegalArgumentException(String.format(
+                "Missing keyfile property ('%s') for authentication type '%s'",
+                SERVICE_ACCOUNT_JSON_KEYFILE_SUFFIX.getKey(),
+                authenticationType));
+      }
+
+      try (FileInputStream fis = new FileInputStream(keyFile)) {
+        return ServiceAccountCredentials.fromStream(fis);
+      }
+    case USER_CREDENTIALS:
+      String clientId = 
AUTH_CLIENT_ID_SUFFIX.withPrefixes(keyPrefixes).get(config, config::get);
+      RedactedString clientSecret =
+              
AUTH_CLIENT_SECRET_SUFFIX.withPrefixes(keyPrefixes).getPassword(config);
+      RedactedString refreshToken =
+              
AUTH_REFRESH_TOKEN_SUFFIX.withPrefixes(keyPrefixes).getPassword(config);
+
+      return UserCredentials.newBuilder()
+              .setClientId(clientId)
+              .setClientSecret(clientSecret.getValue())
+              .setRefreshToken(refreshToken.getValue())
+              .build();
+
+    case WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE:
+      String configFile =
+              WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE_SUFFIX
+                      .withPrefixes(keyPrefixes)
+                      .get(config, config::get);
+      try (FileInputStream fis = new FileInputStream(configFile)) {
+        return ExternalAccountCredentials.fromStream(fis);
+      }
+    case UNAUTHENTICATED:
+      return null;
+    default:
+      throw new IllegalArgumentException("Unknown authentication type: " + 
authenticationType);
+    }
+  }
+
+  private static GoogleCredentials configureCredentials(
+          Configuration config, List<String> keyPrefixes, GoogleCredentials 
credentials) {
+    credentials = credentials.createScoped(CLOUD_PLATFORM_SCOPE);
+    String tokenServerUrl =
+            TOKEN_SERVER_URL_SUFFIX.withPrefixes(keyPrefixes).get(config, 
config::get);
+    if (tokenServerUrl == null) {
+      return credentials;
+    }
+    if (credentials instanceof ServiceAccountCredentials) {
+      return ((ServiceAccountCredentials) credentials)
+              
.toBuilder().setTokenServerUri(URI.create(tokenServerUrl)).build();
+    }
+    if (credentials instanceof UserCredentials) {
+      return ((UserCredentials) credentials)
+              
.toBuilder().setTokenServerUri(URI.create(tokenServerUrl)).build();
+    }
+    return credentials;
+  }
+
+  /** Enumerates all supported authentication types. */
+  public enum AuthenticationType {
+    /** Configures Application Default Credentials authentication. */
+    APPLICATION_DEFAULT,
+    /** Configures Google Compute Engine service account authentication. */
+    COMPUTE_ENGINE,
+    /** Configures JSON keyfile service account authentication. */
+    SERVICE_ACCOUNT_JSON_KEYFILE,
+    /** Configures workload identity pool key file. */
+    WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE,
+    /** Configures unauthenticated access. */
+    UNAUTHENTICATED,
+    /** Configures user credentials authentication. */
+    USER_CREDENTIALS,
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/RedactedString.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/RedactedString.java
new file mode 100644
index 00000000000..db2d49de697
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/RedactedString.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
+
+import javax.annotation.Nullable;
+
+/**
+ * Holder class for string values that should not be logged and displayed when 
{@code toString}
+ * method called. For example, it should be used for credentials.
+ */
+class RedactedString {
+
+  private final String value;
+
+  RedactedString(String value) {
+    this.value = value;
+  }
+
+  @Nullable
+  static RedactedString create(@Nullable String value) {
+    return isNullOrEmpty(value) ? null : new RedactedString(value);
+  }
+
+  String getValue() {
+    return value;
+  }
+
+  @Override
+  public final String toString() {
+    return "<redacted>";
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StatisticTypeEnum.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StatisticTypeEnum.java
new file mode 100644
index 00000000000..4c203e6b687
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StatisticTypeEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+enum StatisticTypeEnum {
+  TYPE_COUNTER, TYPE_DURATION
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StringPaths.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StringPaths.java
index 80682c3ed2a..fb4449e517c 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StringPaths.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StringPaths.java
@@ -30,7 +30,7 @@
  */
 final class StringPaths {
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+  public static final Logger LOG = LoggerFactory.getLogger(StringPaths.class);
 
   private StringPaths() {
   }
diff --git 
a/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/Configuration.md 
b/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/Configuration.md
new file mode 100644
index 00000000000..d91e69f6343
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/Configuration.md
@@ -0,0 +1,197 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+## Configuration properties
+
+### General configuration
+
+* `fs.gs.project.id` (not set by default)
+
+  Google Cloud Project ID with access to Google Cloud Storage buckets.
+  Required only for list buckets and create bucket operations.
+
+* `fs.gs.working.dir` (default: `/`)
+
+  The directory relative `gs:` uris resolve in inside the default bucket.
+
+* `fs.gs.rewrite.max.chunk.size` (default: `512m`)
+
+  Maximum size of object chunk that will be rewritten in a single rewrite
+  request when `fs.gs.copy.with.rewrite.enable` is set to `true`.
+
+* `fs.gs.bucket.delete.enable` (default: `false`)
+
+  If `true`, recursive delete on a path that refers to a Cloud Storage bucket
+  itself or delete on that path when it is empty will result in deletion of
+  the bucket itself. If `false`, any operation that normally would have
+  deleted the bucket will be ignored. Setting to `false` preserves the typical
+  behavior of `rm -rf /` which translates to deleting everything inside of
+  root, but without clobbering the filesystem authority corresponding to that
+  root path in the process.
+
+* `fs.gs.block.size` (default: `64m`)
+
+  The reported block size of the file system. This does not change any
+  behavior of the connector or the underlying Google Cloud Storage objects.
+  However, it will affect the number of splits Hadoop MapReduce uses for a
+  given input.
+
+* `fs.gs.create.items.conflict.check.enable` (default: `true`)
+
+  Enables a check that ensures that conflicting directories do not exist when
+  creating files and conflicting files do not exist when creating directories.
+
+* `fs.gs.marker.file.pattern` (not set by default)
+
+  If set, files that match specified pattern are copied last during folder
+  rename operation.
+
+### Authentication
+
+* `fs.gs.auth.type` (default: `COMPUTE_ENGINE`)
+
+  What type of authentication mechanism to use for Google Cloud Storage
+  access.
+
+  Valid values:
+
+  * `APPLICATION_DEFAULT` - configures
+    [Application Default 
Credentials](https://javadoc.io/doc/com.google.auth/google-auth-library-oauth2-http/latest/com/google/auth/oauth2/GoogleCredentials.html)
+    authentication
+
+  * `COMPUTE_ENGINE` - configures Google Compute Engine service account
+    authentication
+
+  * `SERVICE_ACCOUNT_JSON_KEYFILE` - configures JSON keyfile service account
+    authentication
+
+  * `UNAUTHENTICATED` - configures unauthenticated access
+
+  * `USER_CREDENTIALS` - configure [user credentials](#user-credentials)
+
+* `fs.gs.auth.service.account.json.keyfile` (not set by default)
+
+  The path to the JSON keyfile for the service account when `fs.gs.auth.type`
+  property is set to `SERVICE_ACCOUNT_JSON_KEYFILE`. The file must exist at
+  the same path on all nodes
+
+#### User credentials
+
+User credentials allows you to access Google resources on behalf of a user, 
with
+the according permissions associated to this user.
+
+To achieve this the connector will use the
+[refresh token grant flow](https://oauth.net/2/grant-types/refresh-token/) to
+retrieve a new access tokens when necessary.
+
+In order to use this authentication type, you will first need to retrieve a
+refresh token using the
+[authorization code grant 
flow](https://oauth.net/2/grant-types/authorization-code)
+and pass it to the connector with OAuth client ID and secret:
+
+* `fs.gs.auth.client.id` (not set by default)
+
+  The OAuth2 client ID.
+
+* `fs.gs.auth.client.secret` (not set by default)
+
+  The OAuth2 client secret.
+
+* `fs.gs.auth.refresh.token` (not set by default)
+
+  The refresh token.
+
+### IO configuration
+
+* `fs.gs.inputstream.support.gzip.encoding.enable` (default: `false`)
+
+  If set to `false` then reading files with GZIP content encoding (HTTP header
+  `Content-Encoding: gzip`) will result in failure (`IOException` is thrown).
+
+  This feature is disabled by default because processing of
+  [GZIP 
encoded](https://cloud.google.com/storage/docs/transcoding#decompressive_transcoding)
+  files is inefficient and error-prone in Hadoop and Spark.
+
+* `fs.gs.outputstream.buffer.size` (default: `8m`)
+
+  Write buffer size used by the file system API to send the data to be
+  uploaded to Cloud Storage upload thread via pipes. The various pipe types
+  are documented below.
+
+* `fs.gs.outputstream.sync.min.interval` (default: `0`)
+
+  Output stream configuration that controls the minimum interval between
+  consecutive syncs. This allows to avoid getting rate-limited by Google Cloud
+  Storage. Default is `0` - no wait between syncs. Note that `hflush()` will
+  be no-op if called more frequently than minimum sync interval and `hsync()`
+  will block until an end of a min sync interval.
+
+### Fadvise feature configuration
+
+* `fs.gs.inputstream.fadvise` (default: `AUTO`)
+
+  Tunes reading objects behavior to optimize HTTP GET requests for various use
+  cases.
+
+  This property controls fadvise feature that allows to read objects in
+  different modes:
+
+  * `SEQUENTIAL` - in this mode connector sends a single streaming
+    (unbounded) Cloud Storage request to read object from a specified
+    position sequentially.
+
+  * `RANDOM` - in this mode connector will send bounded Cloud Storage range
+    requests (specified through HTTP Range header) which are more efficient
+    in some cases (e.g. reading objects in row-columnar file formats like
+    ORC, Parquet, etc).
+
+    Range request size is limited by whatever is greater, `fs.gs.io.buffer`
+    or read buffer size passed by a client.
+
+    To avoid sending too small range requests (couple bytes) - could happen
+    if `fs.gs.io.buffer` is 0 and client passes very small read buffer,
+    minimum range request size is limited to 2 MB by default configurable
+    through `fs.gs.inputstream.min.range.request.size` property
+
+  * `AUTO` - in this mode (adaptive range reads) connector starts to send
+    bounded range requests when reading non gzip-encoded objects instead of
+    streaming requests as soon as first backward read or forward read for
+    more than `fs.gs.inputstream.inplace.seek.limit` bytes was detected.
+
+  * `AUTO_RANDOM` - It is complementing `AUTO` mode which uses sequential
+    mode to start with and adapts to bounded range requests. `AUTO_RANDOM`
+    mode uses bounded channel initially and adapts to sequential requests if
+    consecutive requests are within `fs.gs.inputstream.min.range.request.size`.
+    gzip-encode object will bypass this adoption, it will always be a
+    streaming(unbounded) channel. This helps in cases where egress limits is
+    getting breached for customer because `AUTO` mode will always lead to
+    one unbounded channel for a file. `AUTO_RANDOM` will avoid such unwanted
+    unbounded channels.
+
+* `fs.gs.fadvise.request.track.count` (default: `3`)
+
+  Self adaptive fadvise mode uses distance between the served requests to
+  decide the access pattern. This property controls how many such requests
+  need to be tracked. It is used when `AUTO_RANDOM` is selected.
+
+* `fs.gs.inputstream.inplace.seek.limit` (default: `8m`)
+
+  If forward seeks are within this many bytes of the current position, seeks
+  are performed by reading and discarding bytes in-place rather than opening a
+  new underlying stream.
+
+* `fs.gs.inputstream.min.range.request.size` (default: `2m`)
+
+  Minimum size in bytes of the read range for Cloud Storage request when
+  opening a new stream to read an object.
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/testing.md 
b/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/testing.md
index a56d7e6c395..e9c2c305e2e 100644
--- a/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/testing.md
+++ b/hadoop-tools/hadoop-gcp/src/site/markdown/tools/hadoop-gcp/testing.md
@@ -51,11 +51,22 @@ Example:
 
 ```xml
 <configuration>
-  <property>
-    <name>fs.contract.test.fs.gs</name>
-    <value>gs://your bucket name</value>
-  </property>
-
+    <property>
+      <name>fs.gs.auth.type</name>
+      <value>SERVICE_ACCOUNT_JSON_KEYFILE</value>
+    </property>
+    <property>
+      <name>fs.gs.auth.service.account.json.keyfile</name>
+      <value>YOUR_JSON_KEY_FILE</value>
+    </property>
+    <property>
+      <name>fs.gs.project.id</name>
+      <value>YOUR_PROJECT_ID_HERE</value>
+    </property>
+    <property>
+      <name>fs.contract.test.fs.gs</name>
+      <value>gs://your_bucket</value>
+    </property>
 </configuration>
 ```
 
@@ -63,8 +74,6 @@ Example:
 
 After completing the configuration, execute the test run through Maven.
 
-This has to be run from a GCP VM. This limitation will be removed later.
-
 ```bash
 mvn clean verify
-```
\ No newline at end of file
+```
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractContentSummary.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractContentSummary.java
new file mode 100644
index 00000000000..56c0938972a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractContentSummary.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractContentSummaryTest;
+
+public class ITestGoogleContractContentSummary extends 
AbstractContractContentSummaryTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRootDirectory.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRootDirectory.java
new file mode 100644
index 00000000000..e00b3f2fb68
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRootDirectory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/** GCS contract tests covering file root directory. */
+public class ITestGoogleContractRootDirectory extends 
AbstractContractRootDirectoryTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractUnbuffer.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractUnbuffer.java
new file mode 100644
index 00000000000..0d520ac516c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractUnbuffer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs.contract;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class ITestGoogleContractUnbuffer extends AbstractContractUnbufferTest {
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new GoogleContract(conf);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to