This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch HADOOP-19343 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 9e9c5e8c5a6f67467e6fe6c804679a57f41a9c38 Author: Arunkumar Chacko <aruncha...@google.com> AuthorDate: Wed Jul 2 17:58:18 2025 +0000 HADOOP-19343. Add support for append(), compose(), concat() Closes #7773 Signed-off-by: Chris Nauroth <cnaur...@apache.org> --- .../java/org/apache/hadoop/fs/gs/Constants.java | 2 + .../org/apache/hadoop/fs/gs/CreateFileOptions.java | 1 + .../org/apache/hadoop/fs/gs/GcsListOperation.java | 98 ++++++++++++ .../apache/hadoop/fs/gs/GoogleCloudStorage.java | 174 +++++++++++++-------- .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 147 +++++++---------- .../hadoop/fs/gs/GoogleHadoopFileSystem.java | 67 +++++++- .../org/apache/hadoop/fs/gs/StorageResourceId.java | 5 + ...ctMkdir.java => ITestGoogleContractAppend.java} | 12 +- ...ctMkdir.java => ITestGoogleContractConcat.java} | 13 +- .../fs/gs/contract/ITestGoogleContractDelete.java | 7 - .../fs/gs/contract/ITestGoogleContractMkdir.java | 7 - .../fs/gs/contract/ITestGoogleContractRename.java | 7 - 12 files changed, 339 insertions(+), 201 deletions(-) diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java index 34434b2859a..61c1571c4dc 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/Constants.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.gs; final class Constants { + static final int MAX_COMPOSE_OBJECTS = 32; + private Constants() {} // URI scheme for GCS. diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java index 1c0e48ae144..e3d0631b501 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java @@ -29,6 +29,7 @@ * Options that can be specified when creating a file in the {@link GoogleCloudStorageFileSystem}. */ final class CreateFileOptions { + static final CreateFileOptions DEFAULT = CreateFileOptions.builder().build(); private final ImmutableMap<String, byte[]> attributes; private final String contentType; private final long overwriteGenerationId; diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java new file mode 100644 index 00000000000..9cd4fdbb867 --- /dev/null +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GcsListOperation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.gs; + +import java.util.ArrayList; +import java.util.List; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; + +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; + +final class GcsListOperation { + private static final int ALL = 0; + private final Storage.BlobListOption[] listOptions; + private final String bucketName; + private final Storage storage; + private final int limit; + + private GcsListOperation(Builder builder) { + this.listOptions = builder.blobListOptions + .toArray(new Storage.BlobListOption[builder.blobListOptions.size()]); + this.bucketName = builder.bucket; + this.storage = builder.storage; + this.limit = builder.limit; + } + + public List<Blob> execute() { + List<Blob> result = new ArrayList<>(); + for (Blob blob : storage.list(bucketName, listOptions).iterateAll()) { + result.add(blob); + + if (limit != ALL && result.size() >= limit) { + break; + } + } + + return result; + } + + static class Builder { + private final ArrayList<Storage.BlobListOption> blobListOptions = new ArrayList<>(); + private String prefix; + private final String bucket; + private final Storage storage; + private int limit = GcsListOperation.ALL; + + Builder(final String bucketName, final String thePrefix, Storage storage) { + this.storage = storage; + this.bucket = bucketName; + this.prefix = thePrefix; + } + + Builder forRecursiveListing() { + return this; + } + + GcsListOperation build() { + blobListOptions.add(Storage.BlobListOption.prefix(prefix)); + return new GcsListOperation(this); + } + + Builder forCurrentDirectoryListing() { + blobListOptions.add(Storage.BlobListOption.currentDirectory()); + blobListOptions.add(Storage.BlobListOption.includeTrailingDelimiter()); + return this; + } + + Builder forCurrentDirectoryListingWithLimit(int theLimit) { + checkArgument( + theLimit > 0, + "limit should be greater than 0. found %d; prefix=%s", theLimit, prefix); + + this.limit = theLimit; + prefix = StringPaths.toDirectoryPath(prefix); + + blobListOptions.add(Storage.BlobListOption.pageSize(1)); + forCurrentDirectoryListing(); + return this; + } + } +} diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java index dcf5ff231f7..e8cafa57ef8 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java @@ -288,49 +288,6 @@ private static GoogleCloudStorageItemInfo createItemInfoForBucket(StorageResourc bucket.getStorageClass() == null ? null : bucket.getStorageClass().name()); } - List<GoogleCloudStorageItemInfo> listObjectInfo( - String bucketName, - String objectNamePrefix, - ListObjectOptions listOptions) throws IOException { - try { - long maxResults = listOptions.getMaxResults() > 0 ? - listOptions.getMaxResults() + (listOptions.isIncludePrefix() ? 0 : 1) : - listOptions.getMaxResults(); - - Storage.BlobListOption[] blobListOptions = - getBlobListOptions(objectNamePrefix, listOptions, maxResults); - Page<Blob> blobs = storage.list(bucketName, blobListOptions); - ListOperationResult result = new ListOperationResult(maxResults); - for (Blob blob : blobs.iterateAll()) { - result.add(blob); - } - - return result.getItems(); - } catch (StorageException e) { - throw new IOException( - String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)), - e); - } - } - - private Storage.BlobListOption[] getBlobListOptions( - String objectNamePrefix, ListObjectOptions listOptions, long maxResults) { - List<Storage.BlobListOption> options = new ArrayList<>(); - - options.add(Storage.BlobListOption.fields(BLOB_FIELDS.toArray(new Storage.BlobField[0]))); - options.add(Storage.BlobListOption.prefix(objectNamePrefix)); - // TODO: set max results as a BlobListOption - if ("/".equals(listOptions.getDelimiter())) { - options.add(Storage.BlobListOption.currentDirectory()); - } - - if (listOptions.getDelimiter() != null) { - options.add(Storage.BlobListOption.includeTrailingDelimiter()); - } - - return options.toArray(new Storage.BlobListOption[0]); - } - private GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) { long generationId = blob.getGeneration() == null ? 0L : blob.getGeneration(); StorageResourceId resourceId = @@ -403,8 +360,7 @@ void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options } } - - public GoogleCloudStorageItemInfo composeObjects( + GoogleCloudStorageItemInfo composeObjects( List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options) throws IOException { LOG.trace("composeObjects({}, {}, {})", sources, destination, options); @@ -538,13 +494,14 @@ List<GoogleCloudStorageItemInfo> listDirectoryRecursive(String bucketName, Strin // TODO: Take delimiter from config // TODO: Set specific fields + checkArgument(objectName.endsWith("/"), String.format("%s should end with /", objectName)); try { - Page<Blob> blobs = storage.list( - bucketName, - Storage.BlobListOption.prefix(objectName)); + List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectName, storage) + .forRecursiveListing().build() + .execute(); List<GoogleCloudStorageItemInfo> result = new ArrayList<>(); - for (Blob blob : blobs.iterateAll()) { + for (Blob blob : blobs) { result.add(createItemInfoForBlob(blob)); } @@ -624,7 +581,7 @@ private List<Bucket> listBucketsInternal() throws IOException { return allBuckets; } - public SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo, + SeekableByteChannel open(GoogleCloudStorageItemInfo itemInfo, GoogleHadoopFileSystemConfiguration config) throws IOException { LOG.trace("open({})", itemInfo); checkNotNull(itemInfo, "itemInfo should not be null"); @@ -647,7 +604,7 @@ private SeekableByteChannel open( config); } - public void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) + void move(Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) throws IOException { validateMoveArguments(sourceToDestinationObjectsMap); @@ -739,7 +696,7 @@ private Storage.MoveBlobRequest.Builder createMoveRequestBuilder( * Validates basic argument constraints like non-null, non-empty Strings, using {@code * Preconditions} in addition to checking for src/dst bucket equality. */ - public static void validateMoveArguments( + static void validateMoveArguments( Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap) throws IOException { checkNotNull(sourceToDestinationObjectsMap, "srcObjects must not be null"); @@ -837,7 +794,7 @@ private void copyInternal( } } - public static void validateCopyArguments( + static void validateCopyArguments( Map<StorageResourceId, StorageResourceId> sourceToDestinationObjectsMap, GoogleCloudStorage gcsImpl) throws IOException { @@ -927,6 +884,103 @@ List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceId return result; } + List<GoogleCloudStorageItemInfo> listDirectory(String bucketName, String objectNamePrefix) + throws IOException { + checkArgument( + objectNamePrefix.endsWith("/"), + String.format("%s should end with /", objectNamePrefix)); + + try { + List<Blob> blobs = new GcsListOperation.Builder(bucketName, objectNamePrefix, storage) + .forCurrentDirectoryListing().build() + .execute(); + + ListOperationResult result = new ListOperationResult(); + for (Blob blob : blobs) { + result.add(blob); + } + + return result.getItems(); + } catch (StorageException e) { + throw new IOException( + String.format("listing object '%s' failed.", BlobId.of(bucketName, objectNamePrefix)), + e); + } + } + + void compose( + String bucketName, List<String> sources, String destination, String contentType) + throws IOException { + LOG.trace("compose({}, {}, {}, {})", bucketName, sources, destination, contentType); + List<StorageResourceId> sourceIds = + sources.stream() + .map(objectName -> new StorageResourceId(bucketName, objectName)) + .collect(Collectors.toList()); + StorageResourceId destinationId = new StorageResourceId(bucketName, destination); + CreateObjectOptions options = + CreateObjectOptions.DEFAULT_OVERWRITE.toBuilder() + .setContentType(contentType) + .setEnsureEmptyObjectsMetadataMatch(false) + .build(); + composeObjects(sourceIds, destinationId, options); + } + + /** + * Get metadata for the given resourceId. The resourceId can be a file or a directory. + * + * For a resourceId gs://b/foo/a, it can be a file or a directory (gs:/b/foo/a/). + * This method checks for both and return the one that is found. "NotFound" is returned + * if not found. + */ + GoogleCloudStorageItemInfo getFileOrDirectoryInfo(StorageResourceId resourceId) { + BlobId blobId = resourceId.toBlobId(); + if (resourceId.isDirectory()) { + // Do not check for "file" for directory paths. + Blob blob = storage.get(blobId); + if (blob != null) { + return createItemInfoForBlob(blob); + } + } else { + BlobId dirId = resourceId.toDirectoryId().toBlobId(); + + // Check for both file and directory. + List<Blob> blobs = storage.get(blobId, dirId); + for (Blob blob : blobs) { + if (blob != null) { + return createItemInfoForBlob(blob); + } + } + } + + return GoogleCloudStorageItemInfo.createNotFound(resourceId); + } + + /** + * Check if any "implicit" directory exists for the given resourceId. + * + * Note that GCS object store does not have a concept of directories for non-HNS buckets. + * For e.g. one could create an object gs://bucket/foo/bar/a.txt, without creating the + * parent directories (i.e. placeholder emtpy files ending with a /). In this case we might + * want to treat gs://bucket/foo/ and gs://bucket/foo/bar/ as directories. + * + * This method helps check if a given resourceId (e.g. gs://bucket/foo/bar/) is an "implicit" + * directory. + * + * Note that this will result in a list operation and is more expensive than "get metadata". + */ + GoogleCloudStorageItemInfo getImplicitDirectory(StorageResourceId resourceId) { + List<Blob> blobs = new GcsListOperation + .Builder(resourceId.getBucketName(), resourceId.getObjectName(), storage) + .forCurrentDirectoryListingWithLimit(1).build() + .execute(); + + if (blobs.isEmpty()) { + return GoogleCloudStorageItemInfo.createNotFound(resourceId); + } + + return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId()); + } + // Helper class to capture the results of list operation. private class ListOperationResult { private final Map<String, Blob> prefixes = new HashMap<>(); @@ -934,12 +988,6 @@ private class ListOperationResult { private final Set<String> objectsSet = new HashSet<>(); - private final long maxResults; - - ListOperationResult(long maxResults) { - this.maxResults = maxResults; - } - void add(Blob blob) { String path = blob.getBlobId().toGsUtilUri(); if (blob.getGeneration() != null) { @@ -957,17 +1005,9 @@ List<GoogleCloudStorageItemInfo> getItems() { for (Blob blob : objects) { result.add(createItemInfoForBlob(blob)); - - if (result.size() == maxResults) { - return result; - } } for (Blob blob : prefixes.values()) { - if (result.size() == maxResults) { - return result; - } - result.add(createItemInfoForBlob(blob)); } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java index 951c38f5966..8cf11d009c8 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java @@ -29,7 +29,6 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +48,7 @@ import java.util.Objects; import java.util.TreeMap; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -161,7 +161,7 @@ void close() { } } - public FileInfo getFileInfo(URI path) throws IOException { + FileInfo getFileInfo(URI path) throws IOException { checkArgument(path != null, "path must not be null"); // Validate the given path. true == allow empty object name. // One should be able to get info about top level directory (== bucket), @@ -182,42 +182,17 @@ private GoogleCloudStorageItemInfo getFileInfoInternal( return gcs.getItemInfo(resourceId); } - StorageResourceId dirId = resourceId.toDirectoryId(); - if (!resourceId.isDirectory()) { - GoogleCloudStorageItemInfo itemInfo = gcs.getItemInfo(resourceId); - if (itemInfo.exists()) { - return itemInfo; - } - - if (inferImplicitDirectories) { - // TODO: Set max result - List<GoogleCloudStorageItemInfo> listDirResult = gcs.listObjectInfo( - resourceId.getBucketName(), - resourceId.getObjectName(), - GET_FILE_INFO_LIST_OPTIONS); - if (!listDirResult.isEmpty()) { - return GoogleCloudStorageItemInfo.createInferredDirectory(resourceId.toDirectoryId()); - } - } + GoogleCloudStorageItemInfo dirOrObject = gcs.getFileOrDirectoryInfo(resourceId); + if (dirOrObject.exists() || !inferImplicitDirectories) { + return dirOrObject; } - List<GoogleCloudStorageItemInfo> listDirInfo = ImmutableList.of(gcs.getItemInfo(dirId)); - if (listDirInfo.isEmpty()) { - return GoogleCloudStorageItemInfo.createNotFound(resourceId); - } - checkState(listDirInfo.size() <= 2, "listed more than 2 objects: '%s'", listDirInfo); - GoogleCloudStorageItemInfo dirInfo = Iterables.get(listDirInfo, /* position= */ 0); - checkState( - dirInfo.getResourceId().equals(dirId) || !inferImplicitDirectories, - "listed wrong object '%s', but should be '%s'", - dirInfo.getResourceId(), - resourceId); - return dirInfo.getResourceId().equals(dirId) && dirInfo.exists() - ? dirInfo - : GoogleCloudStorageItemInfo.createNotFound(resourceId); + // File does not exist; Explicit directory does not exist. Check for implicit directory. + // This will result in a list operation, which is expensive + return gcs.getImplicitDirectory(resourceId); } - public void mkdirs(URI path) throws IOException { + void mkdirs(URI path) throws IOException { LOG.trace("mkdirs(path: {})", path); checkNotNull(path, "path should not be null"); @@ -313,18 +288,6 @@ private List<FileInfo> listRecursive(URI prefix) throws IOException { return fileInfos; } - private List<FileInfo> listDirectory(URI prefix) throws IOException { - StorageResourceId prefixId = getPrefixId(prefix); - List<GoogleCloudStorageItemInfo> itemInfos = gcs.listObjectInfo( - prefixId.getBucketName(), - prefixId.getObjectName(), - ListObjectOptions.DEFAULT_FLAT_LIST); - - List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos); - fileInfos.sort(FILE_INFO_PATH_COMPARATOR); - return fileInfos; - } - private StorageResourceId getPrefixId(URI prefix) { checkNotNull(prefix, "prefix could not be null"); @@ -375,42 +338,6 @@ private void deleteBucket(List<FileInfo> bucketsToDelete) throws IOException { throw new UnsupportedOperationException("deleteBucket is not supported."); } - public List<FileInfo> listFileInfo(URI path, ListFileOptions listOptions) throws IOException { - checkNotNull(path, "path can not be null"); - LOG.trace("listStatus(path: {})", path); - - StorageResourceId pathId = - StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true); - - if (!pathId.isDirectory()) { - GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId); - if (pathInfo.exists()) { - List<FileInfo> listedInfo = new ArrayList<>(); - listedInfo.add(FileInfo.fromItemInfo(pathInfo)); - - return listedInfo; - } - } - - StorageResourceId dirId = pathId.toDirectoryId(); - List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ? - gcs.listBucketInfo() : - gcs.listObjectInfo( - dirId.getBucketName(), dirId.getObjectName(), LIST_FILE_INFO_LIST_OPTIONS); - - if (pathId.isStorageObject() && dirItemInfos.isEmpty()) { - throw new FileNotFoundException("Item not found: " + path); - } - - if (!dirItemInfos.isEmpty() && Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) { - dirItemInfos.remove(0); - } - - List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos); - fileInfos.sort(FILE_INFO_PATH_COMPARATOR); - return fileInfos; - } - FileInfo getFileInfoObject(URI path) throws IOException { checkArgument(path != null, "path must not be null"); StorageResourceId resourceId = StorageResourceId.fromUriPath(path, true); @@ -574,10 +501,7 @@ List<FileInfo> listFileInfoForPrefix(URI prefix, ListFileOptions listOptions) LOG.trace("listAllFileInfoForPrefix(prefix: {})", prefix); StorageResourceId prefixId = getPrefixId(prefix); List<GoogleCloudStorageItemInfo> itemInfos = - gcs.listObjectInfo( - prefixId.getBucketName(), - prefixId.getObjectName(), - updateListObjectOptions(ListObjectOptions.DEFAULT_FLAT_LIST, listOptions)); + gcs.listDirectoryRecursive(prefixId.getBucketName(), prefixId.getObjectName()); List<FileInfo> fileInfos = FileInfo.fromItemInfos(itemInfos); fileInfos.sort(FILE_INFO_PATH_COMPARATOR); return fileInfos; @@ -604,11 +528,6 @@ private void moveInternal(Map<FileInfo, URI> srcToDstItemNames) throws IOExcepti gcs.move(sourceToDestinationObjectsMap); } - private static ListObjectOptions updateListObjectOptions( - ListObjectOptions listObjectOptions, ListFileOptions listFileOptions) { - return listObjectOptions.builder().setFields(listFileOptions.getFields()).build(); - } - private List<FileInfo> getFileInfos(List<URI> paths) throws IOException { List<FileInfo> result = new ArrayList<>(paths.size()); for (URI path : paths) { @@ -798,4 +717,50 @@ static List<String> getDirs(String objectName) { } return dirs; } + + List<FileInfo> listDirectory(URI path) throws IOException { + checkNotNull(path, "path can not be null"); + LOG.trace("listStatus(path: {})", path); + + StorageResourceId pathId = + StorageResourceId.fromUriPath(path, /* allowEmptyObjectName= */ true); + + if (!pathId.isDirectory()) { + GoogleCloudStorageItemInfo pathInfo = gcs.getItemInfo(pathId); + if (pathInfo.exists()) { + List<FileInfo> listedInfo = new ArrayList<>(); + listedInfo.add(FileInfo.fromItemInfo(pathInfo)); + + return listedInfo; + } + } + + StorageResourceId dirId = pathId.toDirectoryId(); + List<GoogleCloudStorageItemInfo> dirItemInfos = dirId.isRoot() ? + gcs.listBucketInfo() : + gcs.listDirectory( + dirId.getBucketName(), dirId.getObjectName()); + + if (pathId.isStorageObject() && dirItemInfos.isEmpty()) { + throw new FileNotFoundException("Item not found: " + path); + } + + if (!dirItemInfos.isEmpty() && Objects.equals(dirItemInfos.get(0).getResourceId(), dirId)) { + dirItemInfos.remove(0); + } + + List<FileInfo> fileInfos = FileInfo.fromItemInfos(dirItemInfos); + fileInfos.sort(FILE_INFO_PATH_COMPARATOR); + return fileInfos; + } + + void compose(List<URI> sources, URI destination, String contentType) throws IOException { + StorageResourceId destResource = StorageResourceId.fromStringPath(destination.toString()); + List<String> sourceObjects = + sources.stream() + .map(uri -> StorageResourceId.fromStringPath(uri.toString()).getObjectName()) + .collect(Collectors.toList()); + gcs.compose( + destResource.getBucketName(), sourceObjects, destResource.getObjectName(), contentType); + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java index e4db8146186..3c4e84ce1eb 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.gs; +import static org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList; import static org.apache.hadoop.fs.gs.Constants.GCS_CONFIG_PREFIX; import static org.apache.hadoop.fs.gs.GoogleHadoopFileSystemConfiguration.GCS_WORKING_DIRECTORY; @@ -34,6 +35,7 @@ import java.net.URI; import java.nio.file.DirectoryNotEmptyException; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.List; @@ -44,6 +46,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -321,10 +325,63 @@ public FSDataOutputStream createNonRecursive( progress); } + /** + * Appends to an existing file (optional operation). Not supported. + * + * @param hadoopPath The existing file to be appended. + * @param bufferSize The size of the buffer to be used. + * @param progress For reporting progress if it is not null. + * @return A writable stream. + * @throws IOException if an error occurs. + */ @Override - public FSDataOutputStream append(final Path path, final int i, final Progressable progressable) + public FSDataOutputStream append(Path hadoopPath, int bufferSize, Progressable progress) throws IOException { - throw new UnsupportedOperationException(path.toString()); + Preconditions.checkArgument(hadoopPath != null, "hadoopPath must not be null"); + LOG.trace("append(hadoopPath: {}, bufferSize: {} [ignored])", hadoopPath, bufferSize); + URI filePath = getGcsPath(hadoopPath); + return new FSDataOutputStream( + new GoogleHadoopOutputStream( + this, + filePath, + CreateFileOptions.builder() + .setWriteMode(CreateFileOptions.WriteMode.APPEND) + .build(), + statistics), + statistics); + } + + /** + * Concat existing files into one file. + * + * @param tgt the path to the target destination. + * @param srcs the paths to the sources to use for the concatenation. + * @throws IOException IO failure + */ + @Override + public void concat(Path tgt, Path[] srcs) throws IOException { + LOG.trace("concat(tgt: {}, srcs.length: {})", tgt, srcs.length); + + Preconditions.checkArgument(srcs.length > 0, "srcs must have at least one source"); + + URI tgtPath = getGcsPath(tgt); + List<URI> srcPaths = Arrays.stream(srcs).map(this::getGcsPath).collect(toImmutableList()); + + Preconditions.checkArgument( + !srcPaths.contains(tgtPath), + "target must not be contained in sources"); + + List<List<URI>> partitions = + Lists.partition(srcPaths, Constants.MAX_COMPOSE_OBJECTS - 1); + LOG.trace("concat(tgt: {}, {} partitions: {})", tgt, partitions.size(), partitions); + for (List<URI> partition : partitions) { + // We need to include the target in the list of sources to compose since + // the GCS FS compose operation will overwrite the target, whereas the Hadoop + // concat operation appends to the target. + List<URI> sources = Lists.newArrayList(tgtPath); + sources.addAll(partition); + getGcsFs().compose(sources, tgtPath, CreateFileOptions.DEFAULT.getContentType()); + } } @Override @@ -377,6 +434,7 @@ public boolean delete(final Path hadoopPath, final boolean recursive) throws IOE if (ApiErrorExtractor.INSTANCE.requestFailure(e)) { throw e; } + LOG.trace("delete(hadoopPath: {}, recursive: {}): false [failed]", hadoopPath, recursive, e); return false; } @@ -397,7 +455,7 @@ public FileStatus[] listStatus(final Path hadoopPath) throws IOException { List<FileStatus> status; try { - List<FileInfo> fileInfos = getGcsFs().listFileInfo(gcsPath, ListFileOptions.OBJECTFIELDS); + List<FileInfo> fileInfos = getGcsFs().listDirectory(gcsPath); status = new ArrayList<>(fileInfos.size()); String userName = getUgiUserName(); for (FileInfo fileInfo : fileInfos) { @@ -476,7 +534,7 @@ public boolean hasPathCapability(final Path path, final String capability) { switch (Ascii.toLowerCase(capability)) { case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.FS_CONCAT: - return false; + return true; default: return false; } @@ -524,7 +582,6 @@ public FileStatus getFileStatus(final Path path) throws IOException { checkOpen(); URI gcsPath = getGcsPath(path); - FileInfo fileInfo = getGcsFs().getFileInfo(gcsPath); if (!fileInfo.exists()) { throw new FileNotFoundException( diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java index 5935564feed..31c268a5d19 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/StorageResourceId.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; import static org.apache.hadoop.fs.gs.Constants.SCHEME; +import com.google.cloud.storage.BlobId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,4 +326,8 @@ static StorageResourceId fromUriPath(URI path, boolean allowEmptyObjectName, new StorageResourceId(bucketName, generationId) : new StorageResourceId(bucketName, objectName, generationId); } + + BlobId toBlobId() { + return BlobId.of(bucketName, objectName); + } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java similarity index 79% copy from hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java copy to hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java index 27acc015ab8..4ca0b4cd082 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractAppend.java @@ -17,21 +17,19 @@ */ package org.apache.hadoop.fs.gs.contract; - -import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractContractAppendTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; -public class ITestGoogleContractMkdir extends AbstractContractMkdirTest { +public class ITestGoogleContractAppend extends AbstractContractAppendTest { @Override protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } @Override - public void testMkDirRmDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + public void testRenameFileBeingAppended() throws Throwable { + ContractTestUtils.skip("blobstores can not rename file that being appended"); } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java similarity index 74% copy from hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java copy to hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java index 27acc015ab8..3a21b66631f 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractConcat.java @@ -17,21 +17,14 @@ */ package org.apache.hadoop.fs.gs.contract; - -import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; +import org.apache.hadoop.fs.contract.AbstractContractConcatTest; import org.apache.hadoop.fs.contract.AbstractFSContract; -public class ITestGoogleContractMkdir extends AbstractContractMkdirTest { +/** GCS contract tests covering file concat. */ +public class ITestGoogleContractConcat extends AbstractContractConcatTest { @Override protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } - - @Override - public void testMkDirRmDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java index 7ed3834025c..dabe396f65d 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractDelete.java @@ -21,17 +21,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.apache.hadoop.fs.contract.ContractTestUtils; public class ITestGoogleContractDelete extends AbstractContractDeleteTest { @Override protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } - - @Override - public void testDeleteEmptyDirNonRecursive() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java index 27acc015ab8..4f846feb263 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.gs.contract; -import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -28,10 +27,4 @@ public class ITestGoogleContractMkdir extends AbstractContractMkdirTest { protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } - - @Override - public void testMkDirRmDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java index a159d46b006..cd168da7b07 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java @@ -20,7 +20,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractRenameTest; import org.apache.hadoop.fs.contract.AbstractFSContract; -import org.apache.hadoop.fs.contract.ContractTestUtils; /** GCS contract tests covering file rename. */ public class ITestGoogleContractRename extends AbstractContractRenameTest { @@ -28,10 +27,4 @@ public class ITestGoogleContractRename extends AbstractContractRenameTest { protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } - - @Override - public void testRenameWithNonEmptySubDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org