This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch HADOOP-19343
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 72255fb9a189ad118779bcc41e89b87d74ed42d3
Author: Arunkumar Chacko <aruncha...@google.com>
AuthorDate: Wed Jun 25 18:22:57 2025 +0000

    HADOOP-19343. Add support for hflush()
    
    Closes #7761
    
    Co-authored-by: Chris Nauroth <cnaur...@apache.org>
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
---
 .../{CreateOptions.java => CreateFileOptions.java} |  28 ++-
 .../apache/hadoop/fs/gs/ErrorTypeExtractor.java    |  27 +-
 .../apache/hadoop/fs/gs/GoogleCloudStorage.java    |  82 ++++++-
 .../gs/GoogleCloudStorageClientWriteChannel.java   |  14 +-
 .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 115 ++++++++-
 .../hadoop/fs/gs/GoogleHadoopFileSystem.java       |  14 +-
 .../fs/gs/GoogleHadoopFileSystemConfiguration.java |  44 +++-
 .../hadoop/fs/gs/GoogleHadoopOutputStream.java     | 273 +++++++++++++++++++--
 .../hadoop/fs/gs/HadoopConfigurationProperty.java  |  10 +
 ...tRename.java => ITestGoogleContractCreate.java} |  17 +-
 .../fs/gs/contract/ITestGoogleContractMkdir.java   |  24 --
 .../fs/gs/contract/ITestGoogleContractRename.java  |   6 -
 .../hadoop-gcp/src/test/resources/contract/gs.xml  | 100 +++++++-
 13 files changed, 648 insertions(+), 106 deletions(-)

diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
similarity index 81%
rename from 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java
rename to 
hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
index c9b44a1a481..1c0e48ae144 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
@@ -28,24 +28,38 @@
 /**
  * Options that can be specified when creating a file in the {@link 
GoogleCloudStorageFileSystem}.
  */
-final class CreateOptions {
+final class CreateFileOptions {
   private final ImmutableMap<String, byte[]> attributes;
   private final String contentType;
   private final long overwriteGenerationId;
   private final WriteMode mode;
+  private final boolean ensureNoDirectoryConflict;
 
-  private CreateOptions(CreateOperationOptionsBuilder builder) {
+  private CreateFileOptions(CreateOperationOptionsBuilder builder) {
     this.attributes = ImmutableMap.copyOf(builder.attributes);
     this.contentType = builder.contentType;
     this.overwriteGenerationId = builder.overwriteGenerationId;
     this.mode = builder.writeMode;
+    this.ensureNoDirectoryConflict = builder.ensureNoDirectoryConflict;
   }
 
   boolean isOverwriteExisting() {
     return this.mode == WriteMode.OVERWRITE;
   }
 
+  boolean isEnsureNoDirectoryConflict() {
+    return ensureNoDirectoryConflict;
+  }
+
+  CreateOperationOptionsBuilder toBuilder() {
+    return builder().setWriteMode(this.mode)
+        .setEnsureNoDirectoryConflict(ensureNoDirectoryConflict);
+  }
+
   enum WriteMode {
+    /** Write new bytes to the end of the existing file rather than the 
beginning. */
+    APPEND,
+
     /**
      * Creates a new file for write and fails if file already exists.
      */
@@ -98,14 +112,20 @@ static class CreateOperationOptionsBuilder {
     private String contentType = "application/octet-stream";
     private long overwriteGenerationId = 
StorageResourceId.UNKNOWN_GENERATION_ID;
     private WriteMode writeMode = WriteMode.CREATE_NEW;
+    private boolean ensureNoDirectoryConflict = true;
 
     CreateOperationOptionsBuilder setWriteMode(WriteMode mode) {
       this.writeMode = mode;
       return this;
     }
 
-    CreateOptions build() {
-      CreateOptions options = new CreateOptions(this);
+    CreateOperationOptionsBuilder setEnsureNoDirectoryConflict(boolean ensure) 
{
+      this.ensureNoDirectoryConflict = ensure;
+      return this;
+    }
+
+    CreateFileOptions build() {
+      CreateFileOptions options = new CreateFileOptions(this);
 
       checkArgument(!options.getAttributes().containsKey("Content-Type"),
           "The Content-Type attribute must be set via the contentType option");
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
index 547d855d1d6..a94156e68e6 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java
@@ -20,6 +20,8 @@
 
 import javax.annotation.Nullable;
 
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.cloud.storage.StorageException;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
 
@@ -63,8 +65,6 @@ enum ErrorType {
     UNAVAILABLE, UNKNOWN
   }
 
-  //  public static final ErrorTypeExtractor INSTANCE = new 
ErrorTypeExtractor();
-
   private static final String BUCKET_ALREADY_EXISTS_MESSAGE =
       "FAILED_PRECONDITION: Your previous request to create the named bucket 
succeeded and you "
           + "already own it.";
@@ -89,7 +89,28 @@ static ErrorType getErrorType(Exception error) {
     case UNAVAILABLE:
       return ErrorType.UNAVAILABLE;
     default:
-      return ErrorType.UNKNOWN;
+      return getErrorTypeFromStorageException(error);
     }
   }
+
+  private static ErrorType getErrorTypeFromStorageException(Exception error) {
+    if (error instanceof StorageException) {
+      StorageException se = (StorageException) error;
+      int httpCode = se.getCode();
+
+      if (httpCode == HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED) {
+        return ErrorType.FAILED_PRECONDITION;
+      }
+
+      if (httpCode == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+        return ErrorType.NOT_FOUND;
+      }
+
+      if (httpCode == HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE) {
+        return ErrorType.UNAVAILABLE;
+      }
+    }
+
+    return ErrorType.UNKNOWN;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
index 89a86eef8ff..dcf5ff231f7 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
@@ -48,6 +48,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * A wrapper around <a 
href="https://github.com/googleapis/java-storage";>Google cloud storage
@@ -89,7 +90,7 @@ private static Storage createStorage(String projectId) {
     return StorageOptions.newBuilder().build().getService();
   }
 
-  WritableByteChannel create(final StorageResourceId resourceId, final 
CreateOptions options)
+  WritableByteChannel create(final StorageResourceId resourceId, final 
CreateFileOptions options)
       throws IOException {
     LOG.trace("create({})", resourceId);
 
@@ -402,6 +403,47 @@ void createEmptyObject(StorageResourceId resourceId, 
CreateObjectOptions options
     }
   }
 
+
+  public GoogleCloudStorageItemInfo composeObjects(
+      List<StorageResourceId> sources, StorageResourceId destination, 
CreateObjectOptions options)
+      throws IOException {
+    LOG.trace("composeObjects({}, {}, {})", sources, destination, options);
+    for (StorageResourceId inputId : sources) {
+      if (!destination.getBucketName().equals(inputId.getBucketName())) {
+        throw new IOException(
+            String.format(
+                "Bucket doesn't match for source '%s' and destination '%s'!",
+                inputId, destination));
+      }
+    }
+    Storage.ComposeRequest request =
+        Storage.ComposeRequest.newBuilder()
+            .addSource(
+                
sources.stream().map(StorageResourceId::getObjectName).collect(Collectors.toList()))
+            .setTarget(
+                BlobInfo.newBuilder(destination.getBucketName(), 
destination.getObjectName())
+                    .setContentType(options.getContentType())
+                    .setContentEncoding(options.getContentEncoding())
+                    .setMetadata(encodeMetadata(options.getMetadata()))
+                    .build())
+            .setTargetOptions(
+                Storage.BlobTargetOption.generationMatch(
+                    destination.hasGenerationId()
+                        ? destination.getGenerationId()
+                        : getWriteGeneration(destination, true)))
+            .build();
+
+    Blob composedBlob;
+    try {
+      composedBlob = storage.compose(request);
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+    GoogleCloudStorageItemInfo compositeInfo = 
createItemInfoForBlob(destination, composedBlob);
+    LOG.trace("composeObjects() done, returning: {}", compositeInfo);
+    return compositeInfo;
+  }
+
   /**
    * Helper to check whether an empty object already exists with the expected 
metadata specified in
    * {@code options}, to be used to determine whether it's safe to ignore an 
exception that was
@@ -450,6 +492,7 @@ private boolean canIgnoreExceptionForEmptyObject(
         return true;
       }
     }
+
     return false;
   }
 
@@ -472,18 +515,14 @@ private void createEmptyObjectInternal(
       blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist());
     }
 
-    try {
-      // TODO: Set encryption key and related properties
-      storage.create(
-          BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), 
resourceId.getObjectName()))
-              .setMetadata(rewrittenMetadata)
-              .setContentEncoding(createObjectOptions.getContentEncoding())
-              .setContentType(createObjectOptions.getContentType())
-              .build(),
-          blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
-    } catch (StorageException e) {
-      throw new IOException(String.format("Creating empty object %s failed.", 
resourceId), e);
-    }
+    // TODO: Set encryption key and related properties
+    storage.create(
+        BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), 
resourceId.getObjectName()))
+            .setMetadata(rewrittenMetadata)
+            .setContentEncoding(createObjectOptions.getContentEncoding())
+            .setContentType(createObjectOptions.getContentType())
+            .build(),
+        blobTargetOptions.toArray(new Storage.BlobTargetOption[0]));
   }
 
   private static Map<String, String> encodeMetadata(Map<String, byte[]> 
metadata) {
@@ -871,6 +910,23 @@ private static GoogleCloudStorageItemInfo 
getGoogleCloudStorageItemInfo(
     return storageItemInfo;
   }
 
+  List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> 
resourceIds)
+      throws IOException {
+    LOG.trace("getItemInfos({})", resourceIds);
+
+    if (resourceIds.isEmpty()) {
+      return new ArrayList<>();
+    }
+
+    List<GoogleCloudStorageItemInfo> result = new 
ArrayList<>(resourceIds.size());
+    for (StorageResourceId resourceId : resourceIds) {
+      // TODO: Do this concurrently
+      result.add(getItemInfo(resourceId));
+    }
+
+    return result;
+  }
+
   // Helper class to capture the results of list operation.
   private class ListOperationResult {
     private final Map<String, Blob> prefixes = new HashMap<>();
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java
index 7956b6f0a82..438d8c040c9 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java
@@ -44,8 +44,12 @@ class GoogleCloudStorageClientWriteChannel implements 
WritableByteChannel {
   private final StorageResourceId resourceId;
   private WritableByteChannel writableByteChannel;
 
-  GoogleCloudStorageClientWriteChannel(final Storage storage,
-      final StorageResourceId resourceId, final CreateOptions createOptions) 
throws IOException {
+  private GoogleCloudStorageItemInfo completedItemInfo = null;
+
+  GoogleCloudStorageClientWriteChannel(
+      final Storage storage,
+      final StorageResourceId resourceId,
+      final CreateFileOptions createOptions) throws IOException {
     this.resourceId = resourceId;
     BlobWriteSession blobWriteSession = getBlobWriteSession(storage, 
resourceId, createOptions);
     try {
@@ -56,7 +60,7 @@ class GoogleCloudStorageClientWriteChannel implements 
WritableByteChannel {
   }
 
   private static BlobInfo getBlobInfo(final StorageResourceId resourceId,
-      final CreateOptions createOptions) {
+      final CreateFileOptions createOptions) {
     BlobInfo blobInfo = BlobInfo.newBuilder(
             BlobId.of(resourceId.getBucketName(), resourceId.getObjectName(),
                 
resourceId.getGenerationId())).setContentType(createOptions.getContentType())
@@ -66,12 +70,12 @@ private static BlobInfo getBlobInfo(final StorageResourceId 
resourceId,
   }
 
   private static BlobWriteSession getBlobWriteSession(final Storage storage,
-      final StorageResourceId resourceId, final CreateOptions createOptions) {
+      final StorageResourceId resourceId, final CreateFileOptions 
createOptions) {
     return storage.blobWriteSession(getBlobInfo(resourceId, createOptions),
         generateWriteOptions(createOptions));
   }
 
-  private static BlobWriteOption[] generateWriteOptions(final CreateOptions 
createOptions) {
+  private static BlobWriteOption[] generateWriteOptions(final 
CreateFileOptions createOptions) {
     List<BlobWriteOption> blobWriteOptions = new ArrayList<>();
 
     blobWriteOptions.add(BlobWriteOption.disableGzipContent());
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
index 2b0c238eb02..951c38f5966 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.gs;
 
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
+import static 
org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList;
 import static java.util.Comparator.comparing;
 import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER;
 import static org.apache.hadoop.fs.gs.Constants.SCHEME;
@@ -100,7 +102,7 @@ private static GoogleCloudStorage createCloudStorage(
     gcs = createCloudStorage(configuration, credentials);
   }
 
-  WritableByteChannel create(final URI path, final CreateOptions createOptions)
+  WritableByteChannel create(final URI path, final CreateFileOptions 
createOptions)
       throws IOException {
     LOG.trace("create(path: {}, createOptions: {})", path, createOptions);
     checkNotNull(path, "path could not be null");
@@ -113,6 +115,32 @@ WritableByteChannel create(final URI path, final 
CreateOptions createOptions)
               resourceId));
     }
 
+    // Because create call should create parent directories too, before 
creating an actual file
+    // we need to check if there are no conflicting items in the directory 
tree:
+    // - if there are no conflicting files with the same name as any parent 
subdirectory
+    // - if there are no conflicting directory with the name as a file
+    //
+    // For example, for a new `gs://bucket/c/d/f` file:
+    // - files `gs://bucket/c` and `gs://bucket/c/d` should not exist
+    // - directory `gs://bucket/c/d/f/` should not exist
+    if (configuration.isEnsureNoConflictingItems()) {
+      // Check if a directory with the same name exists.
+      StorageResourceId dirId = resourceId.toDirectoryId();
+      Boolean conflictingDirExist = false;
+      if (createOptions.isEnsureNoDirectoryConflict()) {
+        // TODO: Do this concurrently
+        conflictingDirExist =
+            getFileInfoInternal(dirId, /* inferImplicitDirectories */ 
true).exists();
+      }
+
+      checkNoFilesConflictingWithDirs(resourceId);
+
+      // Check if a directory with the same name exists.
+      if (conflictingDirExist) {
+        throw new FileAlreadyExistsException("A directory with that name 
exists: " + path);
+      }
+    }
+
     if (createOptions.getOverwriteGenerationId() != 
StorageResourceId.UNKNOWN_GENERATION_ID) {
       resourceId = new StorageResourceId(resourceId.getBucketName(), 
resourceId.getObjectName(),
           createOptions.getOverwriteGenerationId());
@@ -214,8 +242,11 @@ public void mkdirs(URI path) throws IOException {
 
     resourceId = resourceId.toDirectoryId();
 
-    // TODO: Before creating a leaf directory we need to check if there are no 
conflicting files
-    // TODO: with the same name as any subdirectory
+    // Before creating a leaf directory we need to check if there are no 
conflicting files
+    // with the same name as any subdirectory
+    if (configuration.isEnsureNoConflictingItems()) {
+      checkNoFilesConflictingWithDirs(resourceId);
+    }
 
     // Create only a leaf directory because subdirectories will be inferred
     // if leaf directory exists
@@ -689,4 +720,82 @@ static String getItemName(URI path) {
             : objectName.lastIndexOf(PATH_DELIMITER);
     return index < 0 ? objectName : objectName.substring(index + 1);
   }
+
+  static CreateObjectOptions objectOptionsFromFileOptions(CreateFileOptions 
options) {
+    checkArgument(
+        options.getWriteMode() == CreateFileOptions.WriteMode.CREATE_NEW
+            || options.getWriteMode() == CreateFileOptions.WriteMode.OVERWRITE,
+        "unsupported write mode: %s",
+        options.getWriteMode());
+    return CreateObjectOptions.builder()
+        .setContentType(options.getContentType())
+        .setMetadata(options.getAttributes())
+        .setOverwriteExisting(options.getWriteMode() == 
CreateFileOptions.WriteMode.OVERWRITE)
+        .build();
+  }
+
+  GoogleHadoopFileSystemConfiguration getConfiguration() {
+    return configuration;
+  }
+
+  GoogleCloudStorageItemInfo composeObjects(ImmutableList<StorageResourceId> 
sources,
+      StorageResourceId dstId, CreateObjectOptions composeObjectOptions) 
throws IOException {
+    return gcs.composeObjects(sources, dstId, composeObjectOptions);
+  }
+
+  void delete(List<StorageResourceId> items) throws IOException {
+    gcs.deleteObjects(items);
+  }
+
+  private void checkNoFilesConflictingWithDirs(StorageResourceId resourceId) 
throws IOException {
+    // Create a list of all files that can conflict with 
intermediate/subdirectory paths.
+    // For example: gs://foo/bar/zoo/ => (gs://foo/bar, gs://foo/bar/zoo)
+    List<StorageResourceId> fileIds =
+        getDirs(resourceId.getObjectName()).stream()
+            .filter(subdir -> !isNullOrEmpty(subdir))
+            .map(
+                subdir ->
+                    new StorageResourceId(
+                        resourceId.getBucketName(), 
StringPaths.toFilePath(subdir)))
+            .collect(toImmutableList());
+
+    // Each intermediate path must ensure that corresponding file does not 
exist
+    //
+    // If for any of the intermediate paths file already exists then bail out 
early.
+    // It is possible that the status of intermediate paths can change after
+    // we make this check therefore this is a good faith effort and not a 
guarantee.
+    for (GoogleCloudStorageItemInfo fileInfo : gcs.getItemInfos(fileIds)) {
+      if (fileInfo.exists()) {
+        throw new FileAlreadyExistsException(
+            "Cannot create directories because of existing file: " + 
fileInfo.getResourceId());
+      }
+    }
+  }
+
+  /**
+   * For objects whose name looks like a path (foo/bar/zoo), returns all 
directory paths.
+   *
+   * <p>For example:
+   *
+   * <ul>
+   *   <li>foo/bar/zoo => returns: (foo/, foo/bar/)
+   *   <li>foo/bar/zoo/ => returns: (foo/, foo/bar/, foo/bar/zoo/)
+   *   <li>foo => returns: ()
+   * </ul>
+   *
+   * @param objectName Name of an object.
+   * @return List of subdirectory like paths.
+   */
+  static List<String> getDirs(String objectName) {
+    if (isNullOrEmpty(objectName)) {
+      return ImmutableList.of();
+    }
+    List<String> dirs = new ArrayList<>();
+    int index = 0;
+    while ((index = objectName.indexOf(PATH_DELIMITER, index)) >= 0) {
+      index = index + PATH_DELIMITER.length();
+      dirs.add(objectName.substring(0, index));
+    }
+    return dirs;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
index 8bf2f057724..e4db8146186 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
@@ -283,11 +283,11 @@ public FSDataOutputStream create(Path hadoopPath, 
FsPermission permission, boole
     LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} 
[ignored])", hadoopPath,
         overwrite, bufferSize);
 
-    CreateOptions.WriteMode writeMode =
-        overwrite ? CreateOptions.WriteMode.OVERWRITE : 
CreateOptions.WriteMode.CREATE_NEW;
+    CreateFileOptions.WriteMode writeMode =
+        overwrite ? CreateFileOptions.WriteMode.OVERWRITE : 
CreateFileOptions.WriteMode.CREATE_NEW;
     FSDataOutputStream response = new FSDataOutputStream(
         new GoogleHadoopOutputStream(this, getGcsPath(hadoopPath),
-            CreateOptions.builder().setWriteMode(writeMode).build(), 
statistics), statistics);
+            CreateFileOptions.builder().setWriteMode(writeMode).build(), 
statistics), statistics);
 
     return response;
   }
@@ -596,12 +596,6 @@ public long getUsed() throws IOException {
     return result;
   }
 
-//  @Override
-//  public long getDefaultBlockSize() {
-//    LOG.trace("getDefaultBlockSize(): {}", defaultBlockSize);
-//    return defaultBlockSize;
-//  }
-
   @Override
   public void setWorkingDirectory(final Path hadoopPath) {
     checkArgument(hadoopPath != null, "hadoopPath must not be null");
@@ -633,4 +627,4 @@ private FileStatus getFileStatus(FileInfo fileInfo, String 
userName) {
     LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), 
userName, status);
     return status;
   }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
index 20831885fe6..4097b5e1f83 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.gs;
 
+import java.time.Duration;
 import java.util.regex.Pattern;
 
 import static java.lang.Math.toIntExact;
@@ -44,7 +45,7 @@ class GoogleHadoopFileSystemConfiguration {
   /**
    * Configuration key for GCS project ID. Default value: none
    */
-  static final HadoopConfigurationProperty<String> GCS_PROJECT_ID =
+  private static final HadoopConfigurationProperty<String> GCS_PROJECT_ID =
       new HadoopConfigurationProperty<>("fs.gs.project.id");
 
   /**
@@ -56,7 +57,7 @@ class GoogleHadoopFileSystemConfiguration {
   /**
    * Configuration key for setting write buffer size.
    */
-  static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE 
=
+  private static final HadoopConfigurationProperty<Long> 
GCS_OUTPUT_STREAM_BUFFER_SIZE =
       new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 
1024 * 1024);
 
 
@@ -64,20 +65,20 @@ class GoogleHadoopFileSystemConfiguration {
    * If forward seeks are within this many bytes of the current position, 
seeks are performed by
    * reading and discarding bytes in-place rather than opening a new 
underlying stream.
    */
-  public static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT =
+  private static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT =
       new HadoopConfigurationProperty<>(
           "fs.gs.inputstream.inplace.seek.limit",
           GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT);
 
   /** Tunes reading objects behavior to optimize HTTP GET requests for various 
use cases. */
-  public static final HadoopConfigurationProperty<Fadvise> 
GCS_INPUT_STREAM_FADVISE =
+  private static final HadoopConfigurationProperty<Fadvise> 
GCS_INPUT_STREAM_FADVISE =
       new HadoopConfigurationProperty<>("fs.gs.inputstream.fadvise", 
Fadvise.RANDOM);
 
   /**
    * If false, reading a file with GZIP content encoding (HTTP header 
"Content-Encoding: gzip") will
    * result in failure (IOException is thrown).
    */
-  public static final HadoopConfigurationProperty<Boolean>
+  private static final HadoopConfigurationProperty<Boolean>
       GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE =
       new HadoopConfigurationProperty<>(
           "fs.gs.inputstream.support.gzip.encoding.enable",
@@ -87,7 +88,7 @@ class GoogleHadoopFileSystemConfiguration {
    * Minimum size in bytes of the HTTP Range header set in GCS request when 
opening new stream to
    * read an object.
    */
-  public static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE =
+  private static final HadoopConfigurationProperty<Long> 
GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE =
       new HadoopConfigurationProperty<>(
           "fs.gs.inputstream.min.range.request.size",
           2 * 1024 * 1024L);
@@ -96,22 +97,39 @@ class GoogleHadoopFileSystemConfiguration {
    * Configuration key for number of request to track for adapting the access 
pattern i.e. fadvise:
    * AUTO & AUTO_RANDOM.
    */
-  public static final HadoopConfigurationProperty<Integer> 
GCS_FADVISE_REQUEST_TRACK_COUNT =
+  private static final HadoopConfigurationProperty<Integer> 
GCS_FADVISE_REQUEST_TRACK_COUNT =
       new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 
3);
 
   /**
    * Configuration key for specifying max number of bytes rewritten in a 
single rewrite request when
    * fs.gs.copy.with.rewrite.enable is set to 'true'.
    */
-  public static final HadoopConfigurationProperty<Long> 
GCS_REWRITE_MAX_CHUNK_SIZE =
+  private static final HadoopConfigurationProperty<Long> 
GCS_REWRITE_MAX_CHUNK_SIZE =
       new HadoopConfigurationProperty<>(
           "fs.gs.rewrite.max.chunk.size",
           512 * 1024 * 1024L);
 
   /** Configuration key for marker file pattern. Default value: none */
-  public static final HadoopConfigurationProperty<String> 
GCS_MARKER_FILE_PATTERN =
+  private static final HadoopConfigurationProperty<String> 
GCS_MARKER_FILE_PATTERN =
       new HadoopConfigurationProperty<>("fs.gs.marker.file.pattern");
 
+  /**
+   * Configuration key for enabling check to ensure that conflicting 
directories do not exist when
+   * creating files and conflicting files do not exist when creating 
directories.
+   */
+  private static final HadoopConfigurationProperty<Boolean> 
GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.create.items.conflict.check.enable",
+          true);
+
+  /**
+   * Configuration key for the minimal time interval between consecutive 
sync/hsync/hflush calls.
+   */
+  private static final HadoopConfigurationProperty<Long> 
GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL =
+      new HadoopConfigurationProperty<>(
+          "fs.gs.outputstream.sync.min.interval",
+          0L);
+
   private final String workingDirectory;
   private final String projectId;
   private final Configuration config;
@@ -188,4 +206,12 @@ public Pattern getMarkerFilePattern() {
 
     return fileMarkerFilePattern;
   }
+
+  public boolean isEnsureNoConflictingItems() {
+    return GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE.get(config, 
config::getBoolean);
+  }
+
+  public Duration getMinSyncInterval() {
+    return GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL.getTimeDuration(config);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
index 747d9f001c5..c41ce13edae 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java
@@ -25,23 +25,88 @@
 import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.WritableByteChannel;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import javax.annotation.Nonnull;
 
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
 
+import org.apache.hadoop.thirdparty.com.google.common.base.Ascii;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class GoogleHadoopOutputStream extends OutputStream {
-  public static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
+
+class GoogleHadoopOutputStream extends OutputStream
+    implements StreamCapabilities, Syncable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageResourceId.class);
+
+  // Prefix used for all temporary files created by this stream.
+  private static final String TMP_FILE_PREFIX = "_GHFS_SYNC_TMP_FILE_";
+
+  // Temporary files don't need to contain the desired attributes of the final 
destination file
+  // since metadata settings get clobbered on final compose() anyways; 
additionally, due to
+  // the way we pick temp file names and already ensured directories for the 
destination file,
+  // we can optimize tempfile creation by skipping various directory checks.
+  private static final CreateFileOptions TMP_FILE_CREATE_OPTIONS =
+      CreateFileOptions.builder().setEnsureNoDirectoryConflict(false).build();
+
+  // Deletion of temporary files occurs asynchronously for performance 
reasons, but in-flight
+  // deletions are awaited on close() so as long as all output streams are 
closed, there should
+  // be no remaining in-flight work occurring inside this threadpool.
+  private static final ExecutorService TMP_FILE_CLEANUP_THREADPOOL =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder()
+              .setNameFormat("ghfs-output-stream-sync-cleanup-%d")
+              .setDaemon(true)
+              .build());
 
   private final GoogleHadoopFileSystem ghfs;
 
+  private final CreateObjectOptions composeObjectOptions;
+
   // Path of the file to write to.
   private final URI dstGcsPath;
 
-  private OutputStream outputStream;
+  /**
+   * The last known generationId of the {@link #dstGcsPath} file, or possibly 
{@link
+   * StorageResourceId#UNKNOWN_GENERATION_ID} if unknown.
+   */
+  private long dstGenerationId;
+
+  // GCS path pointing at the "tail" file which will be appended to the 
destination
+  // on hflush()/hsync() call.
+  private URI tmpGcsPath;
+
+  /**
+   * Stores the component index corresponding to {@link #tmpGcsPath}. If 
close() is called, the
+   * total number of components in the {@link #dstGcsPath} will be {@code 
tmpIndex + 1}.
+   */
+  private int tmpIndex;
+
+  // OutputStream pointing at the "tail" file which will be appended to the 
destination
+  // on hflush()/hsync() call.
+  private OutputStream tmpOut;
+
+  private final RateLimiter syncRateLimiter;
+
+  // List of temporary file-deletion futures accrued during the lifetime of 
this output stream.
+  private final List<Future<Void>> tmpDeletionFutures = new ArrayList<>();
 
   // Statistics tracker provided by the parent GoogleHadoopFileSystem for 
recording
   // numbers of bytes written.
@@ -57,19 +122,49 @@ class GoogleHadoopOutputStream extends OutputStream {
    * @throws IOException if an IO error occurs.
    */
   GoogleHadoopOutputStream(GoogleHadoopFileSystem ghfs, URI dstGcsPath,
-      CreateOptions createFileOptions, FileSystem.Statistics statistics) 
throws IOException {
+      CreateFileOptions createFileOptions, FileSystem.Statistics statistics) 
throws IOException {
     LOG.trace("GoogleHadoopOutputStream(gcsPath: {}, createFileOptions: {})", 
dstGcsPath,
         createFileOptions);
     this.ghfs = ghfs;
     this.dstGcsPath = dstGcsPath;
     this.statistics = statistics;
 
-    this.outputStream = createOutputStream(ghfs.getGcsFs(), dstGcsPath, 
createFileOptions,
-        ghfs.getFileSystemConfiguration());
+    Duration minSyncInterval = 
ghfs.getFileSystemConfiguration().getMinSyncInterval();
+
+    this.syncRateLimiter =
+        minSyncInterval.isNegative() || minSyncInterval.isZero()
+            ? null
+            : RateLimiter.create(/* permitsPerSecond= */ 1_000.0 / 
minSyncInterval.toMillis());
+    this.composeObjectOptions =
+        GoogleCloudStorageFileSystem.objectOptionsFromFileOptions(
+            createFileOptions.toBuilder()
+                // Set write mode to OVERWRITE because we use compose 
operation to append new data
+                // to an existing object
+                .setWriteMode(CreateFileOptions.WriteMode.OVERWRITE)
+                .build());
+
+    if (createFileOptions.getWriteMode() == 
CreateFileOptions.WriteMode.APPEND) {
+      // When appending first component has to go to new temporary file.
+      this.tmpGcsPath = getNextTmpPath();
+      this.tmpIndex = 1;
+    } else {
+      // The first component of the stream will go straight to the destination 
filename to optimize
+      // the case where no hsync() or a single hsync() is called during the 
lifetime of the stream;
+      // committing the first component thus doesn't require any compose() 
call under the hood.
+      this.tmpGcsPath = dstGcsPath;
+      this.tmpIndex = 0;
+    }
+
+    this.tmpOut =
+        createOutputStream(
+            ghfs.getGcsFs(),
+            tmpGcsPath,
+            tmpIndex == 0 ? createFileOptions : TMP_FILE_CREATE_OPTIONS);
+    this.dstGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
   }
 
-  private static OutputStream createOutputStream(GoogleCloudStorageFileSystem 
gcsfs, URI gcsPath,
-      CreateOptions options, GoogleHadoopFileSystemConfiguration 
fileSystemConfiguration)
+  private OutputStream createOutputStream(GoogleCloudStorageFileSystem gcsfs, 
URI gcsPath,
+      CreateFileOptions options)
       throws IOException {
     WritableByteChannel channel;
     try {
@@ -80,14 +175,14 @@ private static OutputStream 
createOutputStream(GoogleCloudStorageFileSystem gcsf
           String.format("'%s' already exists", gcsPath)).initCause(e);
     }
     OutputStream outputStream = Channels.newOutputStream(channel);
-    int bufferSize = fileSystemConfiguration.getOutStreamBufferSize();
+    int bufferSize = gcsfs.getConfiguration().getOutStreamBufferSize();
     return bufferSize > 0 ? new BufferedOutputStream(outputStream, bufferSize) 
: outputStream;
   }
 
   @Override
   public void write(int b) throws IOException {
     throwIfNotOpen();
-    outputStream.write(b);
+    tmpOut.write(b);
     statistics.incrementBytesWritten(1);
     statistics.incrementWriteOps(1);
   }
@@ -95,30 +190,176 @@ public void write(int b) throws IOException {
   @Override
   public void write(@Nonnull byte[] b, int offset, int len) throws IOException 
{
     throwIfNotOpen();
-    outputStream.write(b, offset, len);
+    tmpOut.write(b, offset, len);
     statistics.incrementBytesWritten(len);
     statistics.incrementWriteOps(1);
   }
 
+  private void commitTempFile() throws IOException {
+    // TODO: return early when 0 bytes have been written in the temp files
+    tmpOut.close();
+
+    long tmpGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID;
+    LOG.trace(
+        "tmpOut is an instance of {}; expected generationId {}.",
+        tmpOut.getClass(), tmpGenerationId);
+
+    // On the first component, tmpGcsPath will equal finalGcsPath, and no 
compose() call is
+    // necessary. Otherwise, we compose in-place into the destination object 
and then delete
+    // the temporary object.
+    if (dstGcsPath.equals(tmpGcsPath)) {
+      // First commit was direct to the destination; the generationId of the 
object we just
+      // committed will be used as the destination generation id for future 
compose calls.
+      dstGenerationId = tmpGenerationId;
+    } else {
+      StorageResourceId dstId =
+          StorageResourceId.fromUriPath(
+              dstGcsPath, /* allowEmptyObjectName= */ false, dstGenerationId);
+      StorageResourceId tmpId =
+          StorageResourceId.fromUriPath(
+              tmpGcsPath, /* allowEmptyObjectName= */ false, tmpGenerationId);
+      checkState(
+          dstId.getBucketName().equals(tmpId.getBucketName()),
+          "Destination bucket in path '%s' doesn't match temp file bucket in 
path '%s'",
+          dstGcsPath,
+          tmpGcsPath);
+      GoogleCloudStorageFileSystem gcs = ghfs.getGcsFs();
+      GoogleCloudStorageItemInfo composedObject =
+          gcs.composeObjects(ImmutableList.of(dstId, tmpId), dstId, 
composeObjectOptions);
+      dstGenerationId = composedObject.getContentGeneration();
+      tmpDeletionFutures.add(
+          TMP_FILE_CLEANUP_THREADPOOL.submit(
+              () -> {
+                gcs.delete(ImmutableList.of(tmpId));
+                return null;
+              }));
+    }
+  }
+
   @Override
   public void close() throws IOException {
-    LOG.trace("close(): final destination: {}", dstGcsPath);
+    LOG.trace(
+        "close(): temp tail file: %s final destination: {}", tmpGcsPath, 
dstGcsPath);
 
-    if (outputStream == null) {
+    if (tmpOut == null) {
       LOG.trace("close(): Ignoring; stream already closed.");
       return;
     }
 
+    commitTempFile();
+
     try {
-      outputStream.close();
+      tmpOut.close();
     } finally {
-      outputStream = null;
+      tmpOut = null;
+    }
+    tmpGcsPath = null;
+    tmpIndex = -1;
+
+    LOG.trace("close(): Awaiting {} deletionFutures", 
tmpDeletionFutures.size());
+    for (Future<?> deletion : tmpDeletionFutures) {
+      try {
+        deletion.get();
+      } catch (ExecutionException | InterruptedException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        throw new IOException(
+            String.format(
+                "Failed to delete temporary files while closing stream: '%s'", 
dstGcsPath),
+            e);
+      }
     }
   }
 
   private void throwIfNotOpen() throws IOException {
-    if (outputStream == null) {
+    if (tmpOut == null) {
       throw new ClosedChannelException();
     }
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    checkArgument(!isNullOrEmpty(capability), "capability must not be null or 
empty string");
+    switch (Ascii.toLowerCase(capability)) {
+    case StreamCapabilities.HFLUSH:
+    case StreamCapabilities.HSYNC:
+      return syncRateLimiter != null;
+    case StreamCapabilities.IOSTATISTICS:
+      return false; // TODO: Add support
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * There is no way to flush data to become available for readers without a 
full-fledged hsync(),
+   * If the output stream is only syncable, this method is a no-op. If the 
output stream is also
+   * flushable, this method will simply use the same implementation of hsync().
+   *
+   * <p>If it is rate limited, unlike hsync(), which will try to acquire the 
permits and block, it
+   * will do nothing.
+   */
+  @Override
+  public void hflush() throws IOException {
+    LOG.trace("hflush(): {}", dstGcsPath);
+
+    long startMs = System.currentTimeMillis();
+    throwIfNotOpen();
+    // If rate limit not set or permit acquired than use hsync()
+    if (syncRateLimiter == null || syncRateLimiter.tryAcquire()) {
+      LOG.trace("hflush() uses hsyncInternal() for {}", dstGcsPath);
+      hsyncInternal(startMs);
+      return;
+    }
+    LOG.trace(
+        "hflush(): No-op due to rate limit ({}): readers will *not* yet see 
flushed data for {}",
+        syncRateLimiter, dstGcsPath);
+
+  }
+
+  @Override
+  public void hsync() throws IOException {
+    LOG.trace("hsync(): {}", dstGcsPath);
+
+    long startMs = System.currentTimeMillis();
+    throwIfNotOpen();
+    if (syncRateLimiter != null) {
+      LOG.trace(
+          "hsync(): Rate limited ({}) with blocking permit acquisition for {}",
+          syncRateLimiter, dstGcsPath);
+      syncRateLimiter.acquire();
+    }
+    hsyncInternal(startMs);
+
+  }
+
+  /** Internal implementation of hsync, can be reused by hflush() as well. */
+  private void hsyncInternal(long startMs) throws IOException {
+    LOG.trace(
+        "hsyncInternal(): Committing tail file {} to final destination {}", 
tmpGcsPath, dstGcsPath);
+    commitTempFile();
+
+    // Use a different temporary path for each temporary component to reduce 
the possible avenues of
+    // race conditions in the face of low-level retries, etc.
+    ++tmpIndex;
+    tmpGcsPath = getNextTmpPath();
+
+    LOG.trace(
+        "hsync(): Opening next temporary tail file {} at {} index", 
tmpGcsPath, tmpIndex);
+    tmpOut = createOutputStream(ghfs.getGcsFs(), tmpGcsPath, 
TMP_FILE_CREATE_OPTIONS);
+
+    long finishMs = System.currentTimeMillis();
+    LOG.trace("Took {}ms to sync() for {}", finishMs - startMs, dstGcsPath);
+  }
+  /** Returns URI to be used for the next temp "tail" file in the series. */
+  private URI getNextTmpPath() {
+    Path basePath = ghfs.getHadoopPath(dstGcsPath);
+    Path tempPath =
+        new Path(
+            basePath.getParent(),
+            String.format(
+                "%s%s.%d.%s", TMP_FILE_PREFIX, basePath.getName(), tmpIndex, 
UUID.randomUUID()));
+    return ghfs.getGcsPath(tempPath);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
index 9360290a09c..450459e6a8d 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.function.BiFunction;
 
@@ -28,6 +29,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 /**
  * Hadoop configuration property.
  */
@@ -64,6 +67,13 @@ T get(Configuration config, BiFunction<String, T, T> 
getterFn) {
     return logProperty(lookupKey, getterFn.apply(lookupKey, defaultValue));
   }
 
+  Duration getTimeDuration(Configuration config) {
+    String lookupKey = getLookupKey(config, key, (c, k) -> c.get(k) != null);
+    String defValStr = defaultValue == null ? null : 
String.valueOf(defaultValue);
+    return logProperty(
+        lookupKey, Duration.ofMillis(config.getTimeDuration(lookupKey, 
defValStr, MILLISECONDS)));
+  }
+
   private String getLookupKey(Configuration config, String lookupKey,
       BiFunction<Configuration, String, Boolean> checkFn) {
     for (String prefix : keyPrefixes) {
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java
similarity index 68%
copy from 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
copy to 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java
index 5d9459cac19..56ae35b4ff4 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java
@@ -15,29 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.fs.gs.contract;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
+import org.apache.hadoop.fs.contract.AbstractContractCreateTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
-/** GCS contract tests covering file rename. */
-public class ITestGoogleContractRename extends AbstractContractRenameTest {
+public class ITestGoogleContractCreate extends AbstractContractCreateTest {
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
 
   @Override
-  public void testRenameWithNonEmptySubDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
-
-  @Override
-  public void testRenameNonexistentFile() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  public void testOverwriteEmptyDirectory() throws Throwable {
+    ContractTestUtils.skip("blobstores can't distinguish empty directories 
from files");
   }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
index 26181f20385..27acc015ab8 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
@@ -29,33 +29,9 @@ protected AbstractFSContract createContract(Configuration 
conf) {
     return new GoogleContract(conf);
   }
 
-  @Override
-  public void testMkdirsDoesNotRemoveParentDirectories() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
-
-  @Override
-  public void testCreateDirWithExistingDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
-
   @Override
   public void testMkDirRmDir() {
     // TODO: Enable this
     ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
   }
-
-  @Override
-  public void testNoMkdirOverFile() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
-
-  @Override
-  public void testMkdirOverParentFile() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
index 5d9459cac19..a159d46b006 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
@@ -34,10 +34,4 @@ public void testRenameWithNonEmptySubDir() {
     // TODO: Enable this
     ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
   }
-
-  @Override
-  public void testRenameNonexistentFile() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }
diff --git a/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml 
b/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml
index 1de34245a5d..542df5166b6 100644
--- a/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml
+++ b/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml
@@ -17,5 +17,103 @@
   -->
 
 <configuration>
+    <property>
+        <name>fs.contract.test.root-tests-enabled</name>
+        <value>true</value>
+    </property>
 
-</configuration>
+    <property>
+        <name>fs.contract.test.random-seek-count</name>
+        <value>10</value>
+    </property>
+
+    <property>
+        <name>fs.contract.create-visibility-delayed</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.is-blobstore</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.is-case-sensitive</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.rename-returns-false-if-source-missing</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.rename-returns-false-if-dest-exists</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.rename-remove-dest-if-empty-dir</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-append</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-atomic-directory-delete</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-atomic-rename</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-block-locality</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-concat</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-getfilestatus</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-seek</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-seek-on-closed-file</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.rejects-seek-past-eof</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-strict-exceptions</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.contract.supports-unix-permissions</name>
+        <value>false</value>
+    </property>
+
+    <property>
+        <name>fs.contract.rename-overwrites-dest</name>
+        <value>false</value>
+    </property>
+</configuration>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to