This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch HADOOP-19343
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/HADOOP-19343 by this push:
     new 486c4ee7967 HADOOP-19343. Add support for append(), compose(), concat()
486c4ee7967 is described below

commit 486c4ee79677debc5c8d0c88bbcf746d49324e14
Author: Arunkumar Chacko <aruncha...@google.com>
AuthorDate: Wed Jul 2 17:58:18 2025 +0000

    HADOOP-19343. Add support for append(), compose(), concat()
    
    Closes #7773
    
    Signed-off-by: Chris Nauroth <cnaur...@apache.org>
---
 .../java/org/apache/hadoop/fs/gs/Constants.java    |   2 +
 .../org/apache/hadoop/fs/gs/CreateFileOptions.java |   1 +
 .../org/apache/hadoop/fs/gs/GcsListOperation.java  |  98 ++++++++++++
 .../apache/hadoop/fs/gs/GoogleCloudStorage.java    | 174 +++++++++++++--------
 .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 147 +++++++----------
 .../hadoop/fs/gs/GoogleHadoopFileSystem.java       |  67 +++++++-
 .../org/apache/hadoop/fs/gs/StorageResourceId.java |   5 +
 ...ctMkdir.java => ITestGoogleContractAppend.java} |  12 +-
 ...ctMkdir.java => ITestGoogleContractConcat.java} |  13 +-
 .../fs/gs/contract/ITestGoogleContractDelete.java  |   7 -
 .../fs/gs/contract/ITestGoogleContractMkdir.java   |   7 -
 .../fs/gs/contract/ITestGoogleContractRename.java  |   7 -
 12 files changed, 339 insertions(+), 201 deletions(-)

diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java
index 34434b2859a..61c1571c4dc 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.gs;
 
 final class Constants {
+  static final int MAX_COMPOSE_OBJECTS = 32;
+
   private Constants() {}
 
   // URI scheme for GCS.
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
index 1c0e48ae144..e3d0631b501 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java
@@ -29,6 +29,7 @@
  * Options that can be specified when creating a file in the {@link 
GoogleCloudStorageFileSystem}.
  */
 final class CreateFileOptions {
+  static final CreateFileOptions DEFAULT = CreateFileOptions.builder().build();
   private final ImmutableMap<String, byte[]> attributes;
   private final String contentType;
   private final long overwriteGenerationId;
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
new file mode 100644
index 00000000000..9cd4fdbb867
--- /dev/null
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.gs;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+
+import static 
org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument;
+
+final class GcsListOperation {
+  private static final int ALL = 0;
+  private final Storage.BlobListOption[] listOptions;
+  private final String bucketName;
+  private final Storage storage;
+  private final int limit;
+
+  private GcsListOperation(Builder builder) {
+    this.listOptions = builder.blobListOptions
+        .toArray(new Storage.BlobListOption[builder.blobListOptions.size()]);
+    this.bucketName = builder.bucket;
+    this.storage = builder.storage;
+    this.limit = builder.limit;
+  }
+
+  public List<Blob> execute() {
+    List<Blob> result = new ArrayList<>();
+    for (Blob blob : storage.list(bucketName, listOptions).iterateAll()) {
+      result.add(blob);
+
+      if (limit != ALL && result.size() >= limit) {
+        break;
+      }
+    }
+
+    return result;
+  }
+
+  static class Builder {
+    private final ArrayList<Storage.BlobListOption> blobListOptions = new 
ArrayList<>();
+    private String prefix;
+    private final String bucket;
+    private final Storage storage;
+    private int limit = GcsListOperation.ALL;
+
+    Builder(final String bucketName, final String thePrefix, Storage storage) {
+      this.storage = storage;
+      this.bucket = bucketName;
+      this.prefix = thePrefix;
+    }
+
+    Builder forRecursiveListing() {
+      return this;
+    }
+
+    GcsListOperation build() {
+      blobListOptions.add(Storage.BlobListOption.prefix(prefix));
+      return new GcsListOperation(this);
+    }
+
+    Builder forCurrentDirectoryListing() {
+      blobListOptions.add(Storage.BlobListOption.currentDirectory());
+      blobListOptions.add(Storage.BlobListOption.includeTrailingDelimiter());
+      return this;
+    }
+
+    Builder forCurrentDirectoryListingWithLimit(int theLimit) {
+      checkArgument(
+          theLimit > 0,
+          "limit should be greater than 0. found %d; prefix=%s", theLimit, 
prefix);
+
+      this.limit = theLimit;
+      prefix = StringPaths.toDirectoryPath(prefix);
+
+      blobListOptions.add(Storage.BlobListOption.pageSize(1));
+      forCurrentDirectoryListing();
+      return this;
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
index dcf5ff231f7..e8cafa57ef8 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java
@@ -288,49 +288,6 @@ private static GoogleCloudStorageItemInfo 
createItemInfoForBucket(StorageResourc
         bucket.getStorageClass() == null ? null : 
bucket.getStorageClass().name());
   }
 
-  List<GoogleCloudStorageItemInfo> listObjectInfo(
-      String bucketName,
-      String objectNamePrefix,
-      ListObjectOptions listOptions) throws IOException {
-    try {
-      long maxResults = listOptions.getMaxResults() > 0 ?
-          listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 
1) :
-          listOptions.getMaxResults();
-
-      Storage.BlobListOption[] blobListOptions =
-          getBlobListOptions(objectNamePrefix, listOptions, maxResults);
-      Page<Blob> blobs = storage.list(bucketName, blobListOptions);
-      ListOperationResult result = new ListOperationResult(maxResults);
-      for (Blob blob : blobs.iterateAll()) {
-        result.add(blob);
-      }
-
-      return result.getItems();
-    } catch (StorageException e) {
-      throw new IOException(
-          String.format("listing object '%s' failed.", BlobId.of(bucketName, 
objectNamePrefix)),
-          e);
-    }
-  }
-
-  private Storage.BlobListOption[] getBlobListOptions(
-      String objectNamePrefix, ListObjectOptions listOptions, long maxResults) 
{
-    List<Storage.BlobListOption> options = new ArrayList<>();
-
-    options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new 
Storage.BlobField[0])));
-    options.add(Storage.BlobListOption.prefix(objectNamePrefix));
-    // TODO: set max results as a BlobListOption
-    if ("/".equals(listOptions.getDelimiter())) {
-      options.add(Storage.BlobListOption.currentDirectory());
-    }
-
-    if (listOptions.getDelimiter() != null) {
-      options.add(Storage.BlobListOption.includeTrailingDelimiter());
-    }
-
-    return options.toArray(new Storage.BlobListOption[0]);
-  }
-
   private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
     long generationId = blob.getGeneration() == null ? 0L : 
blob.getGeneration();
     StorageResourceId resourceId =
@@ -403,8 +360,7 @@ void createEmptyObject(StorageResourceId resourceId, 
CreateObjectOptions options
     }
   }
 
-
-  public GoogleCloudStorageItemInfo composeObjects(
+  GoogleCloudStorageItemInfo composeObjects(
       List<StorageResourceId> sources, StorageResourceId destination, 
CreateObjectOptions options)
       throws IOException {
     LOG.trace("composeObjects({}, {}, {})", sources, destination, options);
@@ -538,13 +494,14 @@ List<GoogleCloudStorageItemInfo> 
listDirectoryRecursive(String bucketName, Strin
     // TODO: Take delimiter from config
     // TODO: Set specific fields
 
+    checkArgument(objectName.endsWith("/"), String.format("%s should end with 
/", objectName));
     try {
-      Page<Blob> blobs = storage.list(
-          bucketName,
-          Storage.BlobListOption.prefix(objectName));
+      List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectName, 
storage)
+          .forRecursiveListing().build()
+          .execute();
 
       List<GoogleCloudStorageItemInfo> result = new ArrayList<>();
-      for (Blob blob : blobs.iterateAll()) {
+      for (Blob blob : blobs) {
         result.add(createItemInfoForBlob(blob));
       }
 
@@ -624,7 +581,7 @@ private List<Bucket> listBucketsInternal() throws 
IOException {
     return allBuckets;
   }
 
-  public SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
+  SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo,
       GoogleHadoopFileSystemConfiguration config) throws IOException {
     LOG.trace("open({})", itemInfo);
     checkNotNull(itemInfo, "itemInfo should not be null");
@@ -647,7 +604,7 @@ private SeekableByteChannel open(
         config);
   }
 
-  public void move(Map<StorageResourceId, StorageResourceId> 
sourceToDestinationObjectsMap)
+  void move(Map<StorageResourceId, StorageResourceId> 
sourceToDestinationObjectsMap)
       throws IOException {
     validateMoveArguments(sourceToDestinationObjectsMap);
 
@@ -739,7 +696,7 @@ private Storage.MoveBlobRequest.Builder 
createMoveRequestBuilder(
    * Validates basic argument constraints like non-null, non-empty Strings, 
using {@code
    * Preconditions} in addition to checking for src/dst bucket equality.
    */
-  public static void validateMoveArguments(
+  static void validateMoveArguments(
       Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) 
throws IOException {
     checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null");
 
@@ -837,7 +794,7 @@ private void copyInternal(
     }
   }
 
-  public static void validateCopyArguments(
+  static void validateCopyArguments(
       Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap,
       GoogleCloudStorage gcsImpl)
       throws IOException {
@@ -927,6 +884,103 @@ List<GoogleCloudStorageItemInfo> 
getItemInfos(List<StorageResourceId> resourceId
     return result;
   }
 
+  List<GoogleCloudStorageItemInfo> listDirectory(String bucketName, String 
objectNamePrefix)
+      throws IOException {
+    checkArgument(
+        objectNamePrefix.endsWith("/"),
+        String.format("%s should end with /", objectNamePrefix));
+
+    try {
+      List<Blob> blobs = new GcsListOperation.Builder(bucketName, 
objectNamePrefix, storage)
+          .forCurrentDirectoryListing().build()
+          .execute();
+
+      ListOperationResult result = new ListOperationResult();
+      for (Blob blob : blobs) {
+        result.add(blob);
+      }
+
+      return result.getItems();
+    } catch (StorageException e) {
+      throw new IOException(
+          String.format("listing object '%s' failed.", BlobId.of(bucketName, 
objectNamePrefix)),
+          e);
+    }
+  }
+
+  void compose(
+      String bucketName, List<String> sources, String destination, String 
contentType)
+      throws IOException {
+    LOG.trace("compose({}, {}, {}, {})", bucketName, sources, destination, 
contentType);
+    List<StorageResourceId> sourceIds =
+        sources.stream()
+            .map(objectName -> new StorageResourceId(bucketName, objectName))
+            .collect(Collectors.toList());
+    StorageResourceId destinationId = new StorageResourceId(bucketName, 
destination);
+    CreateObjectOptions options =
+        CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder()
+            .setContentType(contentType)
+            .setEnsureEmptyObjectsMetadataMatch(false)
+            .build();
+    composeObjects(sourceIds, destinationId, options);
+  }
+
+  /**
+   * Get metadata for the given resourceId. The resourceId can be a file or a 
directory.
+   *
+   * For a resourceId gs://b/foo/a, it can be a file or a directory 
(gs:/b/foo/a/).
+   * This method checks for both and return the one that is found. "NotFound" 
is returned
+   * if not found.
+   */
+  GoogleCloudStorageItemInfo getFileOrDirectoryInfo(StorageResourceId 
resourceId) {
+    BlobId blobId = resourceId.toBlobId();
+    if (resourceId.isDirectory()) {
+      // Do not check for "file" for directory paths.
+      Blob blob = storage.get(blobId);
+      if (blob != null) {
+        return createItemInfoForBlob(blob);
+      }
+    } else {
+      BlobId dirId = resourceId.toDirectoryId().toBlobId();
+
+      // Check for both file and directory.
+      List<Blob> blobs = storage.get(blobId, dirId);
+      for (Blob blob : blobs) {
+        if (blob != null) {
+          return createItemInfoForBlob(blob);
+        }
+      }
+    }
+
+    return GoogleCloudStorageItemInfo.createNotFound(resourceId);
+  }
+
+  /**
+   * Check if any "implicit" directory exists for the given resourceId.
+   *
+   * Note that GCS object store does not have a concept of directories for 
non-HNS buckets.
+   * For e.g. one could create an object gs://bucket/foo/bar/a.txt, without 
creating the
+   * parent directories (i.e. placeholder emtpy files ending with a /). In 
this case we might
+   * want to treat gs://bucket/foo/ and gs://bucket/foo/bar/ as directories.
+   *
+   * This method helps check if a given resourceId (e.g. gs://bucket/foo/bar/) 
is an "implicit"
+   * directory.
+   *
+   * Note that this will result in a list operation and is more expensive than 
"get metadata".
+   */
+  GoogleCloudStorageItemInfo getImplicitDirectory(StorageResourceId 
resourceId) {
+    List<Blob> blobs = new GcsListOperation
+        .Builder(resourceId.getBucketName(), resourceId.getObjectName(), 
storage)
+        .forCurrentDirectoryListingWithLimit(1).build()
+        .execute();
+
+    if (blobs.isEmpty()) {
+      return GoogleCloudStorageItemInfo.createNotFound(resourceId);
+    }
+
+    return 
GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
+  }
+
   // Helper class to capture the results of list operation.
   private class ListOperationResult {
     private final Map<String, Blob> prefixes = new HashMap<>();
@@ -934,12 +988,6 @@ private class ListOperationResult {
 
     private  final Set<String> objectsSet = new HashSet<>();
 
-    private final long maxResults;
-
-    ListOperationResult(long maxResults) {
-      this.maxResults = maxResults;
-    }
-
     void add(Blob blob) {
       String path = blob.getBlobId().toGsUtilUri();
       if (blob.getGeneration() != null) {
@@ -957,17 +1005,9 @@ List<GoogleCloudStorageItemInfo> getItems() {
 
       for (Blob blob : objects) {
         result.add(createItemInfoForBlob(blob));
-
-        if (result.size() == maxResults) {
-          return result;
-        }
       }
 
       for (Blob blob : prefixes.values()) {
-        if (result.size() == maxResults) {
-          return result;
-        }
-
         result.add(createItemInfoForBlob(blob));
       }
 
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
index 951c38f5966..8cf11d009c8 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java
@@ -29,7 +29,6 @@
 import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +48,7 @@
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 /**
@@ -161,7 +161,7 @@ void close() {
     }
   }
 
-  public FileInfo getFileInfo(URI path) throws IOException {
+  FileInfo getFileInfo(URI path) throws IOException {
     checkArgument(path != null, "path must not be null");
     // Validate the given path. true == allow empty object name.
     // One should be able to get info about top level directory (== bucket),
@@ -182,42 +182,17 @@ private GoogleCloudStorageItemInfo getFileInfoInternal(
       return gcs.getItemInfo(resourceId);
     }
 
-    StorageResourceId dirId = resourceId.toDirectoryId();
-    if (!resourceId.isDirectory()) {
-      GoogleCloudStorageItemInfo itemInfo = gcs.getItemInfo(resourceId);
-      if (itemInfo.exists()) {
-        return itemInfo;
-      }
-
-      if (inferImplicitDirectories) {
-        // TODO: Set max result
-        List<GoogleCloudStorageItemInfo> listDirResult = gcs.listObjectInfo(
-            resourceId.getBucketName(),
-            resourceId.getObjectName(),
-            GET_FILE_INFO_LIST_OPTIONS);
-        if (!listDirResult.isEmpty()) {
-          return 
GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId());
-        }
-      }
+    GoogleCloudStorageItemInfo dirOrObject = 
gcs.getFileOrDirectoryInfo(resourceId);
+    if (dirOrObject.exists() || !inferImplicitDirectories) {
+      return dirOrObject;
     }
 
-    List<GoogleCloudStorageItemInfo> listDirInfo = 
ImmutableList.of(gcs.getItemInfo(dirId));
-    if (listDirInfo.isEmpty()) {
-      return GoogleCloudStorageItemInfo.createNotFound(resourceId);
-    }
-    checkState(listDirInfo.size() <= 2, "listed more than 2 objects: '%s'", 
listDirInfo);
-    GoogleCloudStorageItemInfo dirInfo = Iterables.get(listDirInfo, /* 
position= */ 0);
-    checkState(
-        dirInfo.getResourceId().equals(dirId) || !inferImplicitDirectories,
-        "listed wrong object '%s', but should be '%s'",
-        dirInfo.getResourceId(),
-        resourceId);
-    return dirInfo.getResourceId().equals(dirId) && dirInfo.exists()
-        ? dirInfo
-        : GoogleCloudStorageItemInfo.createNotFound(resourceId);
+    // File does not exist; Explicit directory does not exist. Check for 
implicit directory.
+    // This will result in a list operation, which is expensive
+    return gcs.getImplicitDirectory(resourceId);
   }
 
-  public void mkdirs(URI path) throws IOException {
+  void mkdirs(URI path) throws IOException {
     LOG.trace("mkdirs(path: {})", path);
     checkNotNull(path, "path should not be null");
 
@@ -313,18 +288,6 @@ private List<FileInfo> listRecursive(URI prefix) throws 
IOException {
     return fileInfos;
   }
 
-  private List<FileInfo> listDirectory(URI prefix) throws IOException {
-    StorageResourceId prefixId = getPrefixId(prefix);
-    List<GoogleCloudStorageItemInfo> itemInfos = gcs.listObjectInfo(
-        prefixId.getBucketName(),
-        prefixId.getObjectName(),
-        ListObjectOptions.DEFAULT_FLAT_LIST);
-
-    List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos);
-    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
-    return fileInfos;
-  }
-
   private StorageResourceId getPrefixId(URI prefix) {
     checkNotNull(prefix, "prefix could not be null");
 
@@ -375,42 +338,6 @@ private void deleteBucket(List<FileInfo> bucketsToDelete) 
throws IOException {
     throw new UnsupportedOperationException("deleteBucket is not supported.");
   }
 
-  public List<FileInfo> listFileInfo(URI path, ListFileOptions listOptions) 
throws IOException {
-    checkNotNull(path, "path can not be null");
-    LOG.trace("listStatus(path: {})", path);
-
-    StorageResourceId pathId =
-        StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true);
-
-    if (!pathId.isDirectory()) {
-      GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId);
-      if (pathInfo.exists()) {
-        List<FileInfo> listedInfo = new ArrayList<>();
-        listedInfo.add(FileInfo.fromItemInfo(pathInfo));
-
-        return listedInfo;
-      }
-    }
-
-    StorageResourceId dirId = pathId.toDirectoryId();
-    List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ?
-        gcs.listBucketInfo() :
-        gcs.listObjectInfo(
-            dirId.getBucketName(), dirId.getObjectName(), 
LIST_FILE_INFO_LIST_OPTIONS);
-
-    if (pathId.isStorageObject() && dirItemInfos.isEmpty()) {
-      throw new FileNotFoundException("Item not found: " + path);
-    }
-
-    if (!dirItemInfos.isEmpty() && 
Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) {
-      dirItemInfos.remove(0);
-    }
-
-    List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos);
-    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
-    return fileInfos;
-  }
-
   FileInfo getFileInfoObject(URI path) throws IOException {
     checkArgument(path != null, "path must not be null");
     StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true);
@@ -574,10 +501,7 @@ List<FileInfo> listFileInfoForPrefix(URI prefix, 
ListFileOptions listOptions)
     LOG.trace("listAllFileInfoForPrefix(prefix: {})", prefix);
     StorageResourceId prefixId = getPrefixId(prefix);
     List<GoogleCloudStorageItemInfo> itemInfos =
-        gcs.listObjectInfo(
-            prefixId.getBucketName(),
-            prefixId.getObjectName(),
-            updateListObjectOptions(ListObjectOptions.DEFAULT_FLAT_LIST, 
listOptions));
+        gcs.listDirectoryRecursive(prefixId.getBucketName(), 
prefixId.getObjectName());
     List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos);
     fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
     return fileInfos;
@@ -604,11 +528,6 @@ private void moveInternal(Map<FileInfo, URI> 
srcToDstItemNames) throws IOExcepti
     gcs.move(sourceToDestinationObjectsMap);
   }
 
-  private static ListObjectOptions updateListObjectOptions(
-      ListObjectOptions listObjectOptions, ListFileOptions listFileOptions) {
-    return 
listObjectOptions.builder().setFields(listFileOptions.getFields()).build();
-  }
-
   private List<FileInfo> getFileInfos(List<URI> paths) throws IOException {
     List<FileInfo> result = new ArrayList<>(paths.size());
     for (URI path : paths) {
@@ -798,4 +717,50 @@ static List<String> getDirs(String objectName) {
     }
     return dirs;
   }
+
+  List<FileInfo> listDirectory(URI path) throws IOException {
+    checkNotNull(path, "path can not be null");
+    LOG.trace("listStatus(path: {})", path);
+
+    StorageResourceId pathId =
+        StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true);
+
+    if (!pathId.isDirectory()) {
+      GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId);
+      if (pathInfo.exists()) {
+        List<FileInfo> listedInfo = new ArrayList<>();
+        listedInfo.add(FileInfo.fromItemInfo(pathInfo));
+
+        return listedInfo;
+      }
+    }
+
+    StorageResourceId dirId = pathId.toDirectoryId();
+    List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ?
+        gcs.listBucketInfo() :
+        gcs.listDirectory(
+            dirId.getBucketName(), dirId.getObjectName());
+
+    if (pathId.isStorageObject() && dirItemInfos.isEmpty()) {
+      throw new FileNotFoundException("Item not found: " + path);
+    }
+
+    if (!dirItemInfos.isEmpty() && 
Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) {
+      dirItemInfos.remove(0);
+    }
+
+    List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos);
+    fileInfos.sort(FILE_INFO_PATH_COMPARATOR);
+    return fileInfos;
+  }
+
+  void compose(List<URI> sources, URI destination, String contentType) throws 
IOException {
+    StorageResourceId destResource = 
StorageResourceId.fromStringPath(destination.toString());
+    List<String> sourceObjects =
+        sources.stream()
+            .map(uri -> 
StorageResourceId.fromStringPath(uri.toString()).getObjectName())
+            .collect(Collectors.toList());
+    gcs.compose(
+        destResource.getBucketName(), sourceObjects, 
destResource.getObjectName(), contentType);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
index e4db8146186..3c4e84ce1eb 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.gs;
 
+import static 
org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList;
 import static org.apache.hadoop.fs.gs.Constants.GCS_CONFIG_PREFIX;
 import static 
org.apache.hadoop.fs.gs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY;
 
@@ -34,6 +35,7 @@
 import java.net.URI;
 import java.nio.file.DirectoryNotEmptyException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -44,6 +46,8 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
 
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -321,10 +325,63 @@ public FSDataOutputStream createNonRecursive(
         progress);
   }
 
+  /**
+   * Appends to an existing file (optional operation). Not supported.
+   *
+   * @param hadoopPath The existing file to be appended.
+   * @param bufferSize The size of the buffer to be used.
+   * @param progress For reporting progress if it is not null.
+   * @return A writable stream.
+   * @throws IOException if an error occurs.
+   */
   @Override
-  public FSDataOutputStream append(final Path path, final int i, final 
Progressable progressable)
+  public FSDataOutputStream append(Path hadoopPath, int bufferSize, 
Progressable progress)
       throws IOException {
-    throw new UnsupportedOperationException(path.toString());
+    Preconditions.checkArgument(hadoopPath != null, "hadoopPath must not be 
null");
+    LOG.trace("append(hadoopPath: {}, bufferSize: {} [ignored])", hadoopPath, 
bufferSize);
+    URI filePath = getGcsPath(hadoopPath);
+    return new FSDataOutputStream(
+        new GoogleHadoopOutputStream(
+            this,
+            filePath,
+            CreateFileOptions.builder()
+                .setWriteMode(CreateFileOptions.WriteMode.APPEND)
+                .build(),
+            statistics),
+        statistics);
+  }
+
+  /**
+   * Concat existing files into one file.
+   *
+   * @param tgt the path to the target destination.
+   * @param srcs the paths to the sources to use for the concatenation.
+   * @throws IOException IO failure
+   */
+  @Override
+  public void concat(Path tgt, Path[] srcs) throws IOException {
+    LOG.trace("concat(tgt: {}, srcs.length: {})", tgt, srcs.length);
+
+    Preconditions.checkArgument(srcs.length > 0, "srcs must have at least one 
source");
+
+    URI tgtPath = getGcsPath(tgt);
+    List<URI> srcPaths = 
Arrays.stream(srcs).map(this::getGcsPath).collect(toImmutableList());
+
+    Preconditions.checkArgument(
+        !srcPaths.contains(tgtPath),
+        "target must not be contained in sources");
+
+    List<List<URI>> partitions =
+        Lists.partition(srcPaths, Constants.MAX_COMPOSE_OBJECTS - 1);
+    LOG.trace("concat(tgt: {}, {} partitions: {})", tgt, partitions.size(), 
partitions);
+    for (List<URI> partition : partitions) {
+      // We need to include the target in the list of sources to compose since
+      // the GCS FS compose operation will overwrite the target, whereas the 
Hadoop
+      // concat operation appends to the target.
+      List<URI> sources = Lists.newArrayList(tgtPath);
+      sources.addAll(partition);
+      getGcsFs().compose(sources, tgtPath, 
CreateFileOptions.DEFAULT.getContentType());
+    }
   }
 
   @Override
@@ -377,6 +434,7 @@ public boolean delete(final Path hadoopPath, final boolean 
recursive) throws IOE
       if (ApiErrorExtractor.INSTANCE.requestFailure(e)) {
         throw e;
       }
+
       LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]", 
hadoopPath, recursive, e);
       return false;
     }
@@ -397,7 +455,7 @@ public FileStatus[] listStatus(final Path hadoopPath) 
throws IOException {
     List<FileStatus> status;
 
     try {
-      List<FileInfo> fileInfos = getGcsFs().listFileInfo(gcsPath, 
ListFileOptions.OBJECTFIELDS);
+      List<FileInfo> fileInfos = getGcsFs().listDirectory(gcsPath);
       status = new ArrayList<>(fileInfos.size());
       String userName = getUgiUserName();
       for (FileInfo fileInfo : fileInfos) {
@@ -476,7 +534,7 @@ public boolean hasPathCapability(final Path path, final 
String capability) {
     switch (Ascii.toLowerCase(capability)) {
     case CommonPathCapabilities.FS_APPEND:
     case CommonPathCapabilities.FS_CONCAT:
-      return false;
+      return true;
     default:
       return false;
     }
@@ -524,7 +582,6 @@ public FileStatus getFileStatus(final Path path) throws 
IOException {
     checkOpen();
 
     URI gcsPath = getGcsPath(path);
-
     FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath);
     if (!fileInfo.exists()) {
       throw new FileNotFoundException(
diff --git 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java
 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java
index 5935564feed..31c268a5d19 100644
--- 
a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java
+++ 
b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java
@@ -23,6 +23,7 @@
 import static 
org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty;
 import static org.apache.hadoop.fs.gs.Constants.SCHEME;
 
+import com.google.cloud.storage.BlobId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -325,4 +326,8 @@ static StorageResourceId fromUriPath(URI path, boolean 
allowEmptyObjectName,
         new StorageResourceId(bucketName, generationId) :
         new StorageResourceId(bucketName, objectName, generationId);
   }
+
+  BlobId toBlobId() {
+    return BlobId.of(bucketName, objectName);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java
similarity index 79%
copy from 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
copy to 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java
index 27acc015ab8..4ca0b4cd082 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java
@@ -17,21 +17,19 @@
  */
 
 package org.apache.hadoop.fs.gs.contract;
-
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractContractAppendTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 
-public class ITestGoogleContractMkdir extends AbstractContractMkdirTest {
+public class ITestGoogleContractAppend extends AbstractContractAppendTest {
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
 
   @Override
-  public void testMkDirRmDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
+  public void testRenameFileBeingAppended() throws Throwable {
+    ContractTestUtils.skip("blobstores can not rename file that being 
appended");
   }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java
similarity index 74%
copy from 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
copy to 
hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java
index 27acc015ab8..3a21b66631f 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java
@@ -17,21 +17,14 @@
  */
 
 package org.apache.hadoop.fs.gs.contract;
-
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
+import org.apache.hadoop.fs.contract.AbstractContractConcatTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 
-public class ITestGoogleContractMkdir extends AbstractContractMkdirTest {
+/** GCS contract tests covering file concat. */
+public class ITestGoogleContractConcat extends AbstractContractConcatTest {
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
-
-  @Override
-  public void testMkDirRmDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
index 7ed3834025c..dabe396f65d 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java
@@ -21,17 +21,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractDeleteTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 public class ITestGoogleContractDelete extends AbstractContractDeleteTest {
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
-
-  @Override
-  public void testDeleteEmptyDirNonRecursive() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
index 27acc015ab8..4f846feb263 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.gs.contract;
 
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractMkdirTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
@@ -28,10 +27,4 @@ public class ITestGoogleContractMkdir extends 
AbstractContractMkdirTest {
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
-
-  @Override
-  public void testMkDirRmDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }
diff --git 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
index a159d46b006..cd168da7b07 100644
--- 
a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
+++ 
b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java
@@ -20,7 +20,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractRenameTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 /** GCS contract tests covering file rename. */
 public class ITestGoogleContractRename extends AbstractContractRenameTest {
@@ -28,10 +27,4 @@ public class ITestGoogleContractRename extends 
AbstractContractRenameTest {
   protected AbstractFSContract createContract(Configuration conf) {
     return new GoogleContract(conf);
   }
-
-  @Override
-  public void testRenameWithNonEmptySubDir() {
-    // TODO: Enable this
-    ContractTestUtils.skip("Skipping the test. This will be enabled in a 
subsequent change");
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to