This is an automated email from the ASF dual-hosted git repository. cnauroth pushed a commit to branch HADOOP-19343 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ac927949aab69255a553399499771bff97cfa271 Author: Arunkumar Chacko <aruncha...@google.com> AuthorDate: Wed Jun 25 18:22:57 2025 +0000 HADOOP-19343. Add support for hflush() Closes #7761 Co-authored-by: Chris Nauroth <cnaur...@apache.org> Signed-off-by: Chris Nauroth <cnaur...@apache.org> --- .../{CreateOptions.java => CreateFileOptions.java} | 28 ++- .../apache/hadoop/fs/gs/ErrorTypeExtractor.java | 27 +- .../apache/hadoop/fs/gs/GoogleCloudStorage.java | 82 ++++++- .../gs/GoogleCloudStorageClientWriteChannel.java | 14 +- .../hadoop/fs/gs/GoogleCloudStorageFileSystem.java | 115 ++++++++- .../hadoop/fs/gs/GoogleHadoopFileSystem.java | 14 +- .../fs/gs/GoogleHadoopFileSystemConfiguration.java | 44 +++- .../hadoop/fs/gs/GoogleHadoopOutputStream.java | 273 +++++++++++++++++++-- .../hadoop/fs/gs/HadoopConfigurationProperty.java | 10 + ...tRename.java => ITestGoogleContractCreate.java} | 17 +- .../fs/gs/contract/ITestGoogleContractMkdir.java | 24 -- .../fs/gs/contract/ITestGoogleContractRename.java | 6 - .../hadoop-gcp/src/test/resources/contract/gs.xml | 100 +++++++- 13 files changed, 648 insertions(+), 106 deletions(-) diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java similarity index 81% rename from hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java rename to hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java index c9b44a1a481..1c0e48ae144 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateOptions.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/CreateFileOptions.java @@ -28,24 +28,38 @@ /** * Options that can be specified when creating a file in the {@link GoogleCloudStorageFileSystem}. */ -final class CreateOptions { +final class CreateFileOptions { private final ImmutableMap<String, byte[]> attributes; private final String contentType; private final long overwriteGenerationId; private final WriteMode mode; + private final boolean ensureNoDirectoryConflict; - private CreateOptions(CreateOperationOptionsBuilder builder) { + private CreateFileOptions(CreateOperationOptionsBuilder builder) { this.attributes = ImmutableMap.copyOf(builder.attributes); this.contentType = builder.contentType; this.overwriteGenerationId = builder.overwriteGenerationId; this.mode = builder.writeMode; + this.ensureNoDirectoryConflict = builder.ensureNoDirectoryConflict; } boolean isOverwriteExisting() { return this.mode == WriteMode.OVERWRITE; } + boolean isEnsureNoDirectoryConflict() { + return ensureNoDirectoryConflict; + } + + CreateOperationOptionsBuilder toBuilder() { + return builder().setWriteMode(this.mode) + .setEnsureNoDirectoryConflict(ensureNoDirectoryConflict); + } + enum WriteMode { + /** Write new bytes to the end of the existing file rather than the beginning. */ + APPEND, + /** * Creates a new file for write and fails if file already exists. */ @@ -98,14 +112,20 @@ static class CreateOperationOptionsBuilder { private String contentType = "application/octet-stream"; private long overwriteGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID; private WriteMode writeMode = WriteMode.CREATE_NEW; + private boolean ensureNoDirectoryConflict = true; CreateOperationOptionsBuilder setWriteMode(WriteMode mode) { this.writeMode = mode; return this; } - CreateOptions build() { - CreateOptions options = new CreateOptions(this); + CreateOperationOptionsBuilder setEnsureNoDirectoryConflict(boolean ensure) { + this.ensureNoDirectoryConflict = ensure; + return this; + } + + CreateFileOptions build() { + CreateFileOptions options = new CreateFileOptions(this); checkArgument(!options.getAttributes().containsKey("Content-Type"), "The Content-Type attribute must be set via the contentType option"); diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java index 547d855d1d6..a94156e68e6 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/ErrorTypeExtractor.java @@ -20,6 +20,8 @@ import javax.annotation.Nullable; +import com.google.api.client.http.HttpStatusCodes; +import com.google.cloud.storage.StorageException; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -63,8 +65,6 @@ enum ErrorType { UNAVAILABLE, UNKNOWN } - // public static final ErrorTypeExtractor INSTANCE = new ErrorTypeExtractor(); - private static final String BUCKET_ALREADY_EXISTS_MESSAGE = "FAILED_PRECONDITION: Your previous request to create the named bucket succeeded and you " + "already own it."; @@ -89,7 +89,28 @@ static ErrorType getErrorType(Exception error) { case UNAVAILABLE: return ErrorType.UNAVAILABLE; default: - return ErrorType.UNKNOWN; + return getErrorTypeFromStorageException(error); } } + + private static ErrorType getErrorTypeFromStorageException(Exception error) { + if (error instanceof StorageException) { + StorageException se = (StorageException) error; + int httpCode = se.getCode(); + + if (httpCode == HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED) { + return ErrorType.FAILED_PRECONDITION; + } + + if (httpCode == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + return ErrorType.NOT_FOUND; + } + + if (httpCode == HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE) { + return ErrorType.UNAVAILABLE; + } + } + + return ErrorType.UNKNOWN; + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java index 89a86eef8ff..dcf5ff231f7 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorage.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * A wrapper around <a href="https://github.com/googleapis/java-storage">Google cloud storage @@ -89,7 +90,7 @@ private static Storage createStorage(String projectId) { return StorageOptions.newBuilder().build().getService(); } - WritableByteChannel create(final StorageResourceId resourceId, final CreateOptions options) + WritableByteChannel create(final StorageResourceId resourceId, final CreateFileOptions options) throws IOException { LOG.trace("create({})", resourceId); @@ -402,6 +403,47 @@ void createEmptyObject(StorageResourceId resourceId, CreateObjectOptions options } } + + public GoogleCloudStorageItemInfo composeObjects( + List<StorageResourceId> sources, StorageResourceId destination, CreateObjectOptions options) + throws IOException { + LOG.trace("composeObjects({}, {}, {})", sources, destination, options); + for (StorageResourceId inputId : sources) { + if (!destination.getBucketName().equals(inputId.getBucketName())) { + throw new IOException( + String.format( + "Bucket doesn't match for source '%s' and destination '%s'!", + inputId, destination)); + } + } + Storage.ComposeRequest request = + Storage.ComposeRequest.newBuilder() + .addSource( + sources.stream().map(StorageResourceId::getObjectName).collect(Collectors.toList())) + .setTarget( + BlobInfo.newBuilder(destination.getBucketName(), destination.getObjectName()) + .setContentType(options.getContentType()) + .setContentEncoding(options.getContentEncoding()) + .setMetadata(encodeMetadata(options.getMetadata())) + .build()) + .setTargetOptions( + Storage.BlobTargetOption.generationMatch( + destination.hasGenerationId() + ? destination.getGenerationId() + : getWriteGeneration(destination, true))) + .build(); + + Blob composedBlob; + try { + composedBlob = storage.compose(request); + } catch (StorageException e) { + throw new IOException(e); + } + GoogleCloudStorageItemInfo compositeInfo = createItemInfoForBlob(destination, composedBlob); + LOG.trace("composeObjects() done, returning: {}", compositeInfo); + return compositeInfo; + } + /** * Helper to check whether an empty object already exists with the expected metadata specified in * {@code options}, to be used to determine whether it's safe to ignore an exception that was @@ -450,6 +492,7 @@ private boolean canIgnoreExceptionForEmptyObject( return true; } } + return false; } @@ -472,18 +515,14 @@ private void createEmptyObjectInternal( blobTargetOptions.add(Storage.BlobTargetOption.doesNotExist()); } - try { - // TODO: Set encryption key and related properties - storage.create( - BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName())) - .setMetadata(rewrittenMetadata) - .setContentEncoding(createObjectOptions.getContentEncoding()) - .setContentType(createObjectOptions.getContentType()) - .build(), - blobTargetOptions.toArray(new Storage.BlobTargetOption[0])); - } catch (StorageException e) { - throw new IOException(String.format("Creating empty object %s failed.", resourceId), e); - } + // TODO: Set encryption key and related properties + storage.create( + BlobInfo.newBuilder(BlobId.of(resourceId.getBucketName(), resourceId.getObjectName())) + .setMetadata(rewrittenMetadata) + .setContentEncoding(createObjectOptions.getContentEncoding()) + .setContentType(createObjectOptions.getContentType()) + .build(), + blobTargetOptions.toArray(new Storage.BlobTargetOption[0])); } private static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) { @@ -871,6 +910,23 @@ private static GoogleCloudStorageItemInfo getGoogleCloudStorageItemInfo( return storageItemInfo; } + List<GoogleCloudStorageItemInfo> getItemInfos(List<StorageResourceId> resourceIds) + throws IOException { + LOG.trace("getItemInfos({})", resourceIds); + + if (resourceIds.isEmpty()) { + return new ArrayList<>(); + } + + List<GoogleCloudStorageItemInfo> result = new ArrayList<>(resourceIds.size()); + for (StorageResourceId resourceId : resourceIds) { + // TODO: Do this concurrently + result.add(getItemInfo(resourceId)); + } + + return result; + } + // Helper class to capture the results of list operation. private class ListOperationResult { private final Map<String, Blob> prefixes = new HashMap<>(); diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java index 7956b6f0a82..438d8c040c9 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageClientWriteChannel.java @@ -44,8 +44,12 @@ class GoogleCloudStorageClientWriteChannel implements WritableByteChannel { private final StorageResourceId resourceId; private WritableByteChannel writableByteChannel; - GoogleCloudStorageClientWriteChannel(final Storage storage, - final StorageResourceId resourceId, final CreateOptions createOptions) throws IOException { + private GoogleCloudStorageItemInfo completedItemInfo = null; + + GoogleCloudStorageClientWriteChannel( + final Storage storage, + final StorageResourceId resourceId, + final CreateFileOptions createOptions) throws IOException { this.resourceId = resourceId; BlobWriteSession blobWriteSession = getBlobWriteSession(storage, resourceId, createOptions); try { @@ -56,7 +60,7 @@ class GoogleCloudStorageClientWriteChannel implements WritableByteChannel { } private static BlobInfo getBlobInfo(final StorageResourceId resourceId, - final CreateOptions createOptions) { + final CreateFileOptions createOptions) { BlobInfo blobInfo = BlobInfo.newBuilder( BlobId.of(resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId())).setContentType(createOptions.getContentType()) @@ -66,12 +70,12 @@ private static BlobInfo getBlobInfo(final StorageResourceId resourceId, } private static BlobWriteSession getBlobWriteSession(final Storage storage, - final StorageResourceId resourceId, final CreateOptions createOptions) { + final StorageResourceId resourceId, final CreateFileOptions createOptions) { return storage.blobWriteSession(getBlobInfo(resourceId, createOptions), generateWriteOptions(createOptions)); } - private static BlobWriteOption[] generateWriteOptions(final CreateOptions createOptions) { + private static BlobWriteOption[] generateWriteOptions(final CreateFileOptions createOptions) { List<BlobWriteOption> blobWriteOptions = new ArrayList<>(); blobWriteOptions.add(BlobWriteOption.disableGzipContent()); diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java index 2b0c238eb02..951c38f5966 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleCloudStorageFileSystem.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.gs; import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.*; +import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Comparator.comparing; import static org.apache.hadoop.fs.gs.Constants.PATH_DELIMITER; import static org.apache.hadoop.fs.gs.Constants.SCHEME; @@ -100,7 +102,7 @@ private static GoogleCloudStorage createCloudStorage( gcs = createCloudStorage(configuration, credentials); } - WritableByteChannel create(final URI path, final CreateOptions createOptions) + WritableByteChannel create(final URI path, final CreateFileOptions createOptions) throws IOException { LOG.trace("create(path: {}, createOptions: {})", path, createOptions); checkNotNull(path, "path could not be null"); @@ -113,6 +115,32 @@ WritableByteChannel create(final URI path, final CreateOptions createOptions) resourceId)); } + // Because create call should create parent directories too, before creating an actual file + // we need to check if there are no conflicting items in the directory tree: + // - if there are no conflicting files with the same name as any parent subdirectory + // - if there are no conflicting directory with the name as a file + // + // For example, for a new `gs://bucket/c/d/f` file: + // - files `gs://bucket/c` and `gs://bucket/c/d` should not exist + // - directory `gs://bucket/c/d/f/` should not exist + if (configuration.isEnsureNoConflictingItems()) { + // Check if a directory with the same name exists. + StorageResourceId dirId = resourceId.toDirectoryId(); + Boolean conflictingDirExist = false; + if (createOptions.isEnsureNoDirectoryConflict()) { + // TODO: Do this concurrently + conflictingDirExist = + getFileInfoInternal(dirId, /* inferImplicitDirectories */ true).exists(); + } + + checkNoFilesConflictingWithDirs(resourceId); + + // Check if a directory with the same name exists. + if (conflictingDirExist) { + throw new FileAlreadyExistsException("A directory with that name exists: " + path); + } + } + if (createOptions.getOverwriteGenerationId() != StorageResourceId.UNKNOWN_GENERATION_ID) { resourceId = new StorageResourceId(resourceId.getBucketName(), resourceId.getObjectName(), createOptions.getOverwriteGenerationId()); @@ -214,8 +242,11 @@ public void mkdirs(URI path) throws IOException { resourceId = resourceId.toDirectoryId(); - // TODO: Before creating a leaf directory we need to check if there are no conflicting files - // TODO: with the same name as any subdirectory + // Before creating a leaf directory we need to check if there are no conflicting files + // with the same name as any subdirectory + if (configuration.isEnsureNoConflictingItems()) { + checkNoFilesConflictingWithDirs(resourceId); + } // Create only a leaf directory because subdirectories will be inferred // if leaf directory exists @@ -689,4 +720,82 @@ static String getItemName(URI path) { : objectName.lastIndexOf(PATH_DELIMITER); return index < 0 ? objectName : objectName.substring(index + 1); } + + static CreateObjectOptions objectOptionsFromFileOptions(CreateFileOptions options) { + checkArgument( + options.getWriteMode() == CreateFileOptions.WriteMode.CREATE_NEW + || options.getWriteMode() == CreateFileOptions.WriteMode.OVERWRITE, + "unsupported write mode: %s", + options.getWriteMode()); + return CreateObjectOptions.builder() + .setContentType(options.getContentType()) + .setMetadata(options.getAttributes()) + .setOverwriteExisting(options.getWriteMode() == CreateFileOptions.WriteMode.OVERWRITE) + .build(); + } + + GoogleHadoopFileSystemConfiguration getConfiguration() { + return configuration; + } + + GoogleCloudStorageItemInfo composeObjects(ImmutableList<StorageResourceId> sources, + StorageResourceId dstId, CreateObjectOptions composeObjectOptions) throws IOException { + return gcs.composeObjects(sources, dstId, composeObjectOptions); + } + + void delete(List<StorageResourceId> items) throws IOException { + gcs.deleteObjects(items); + } + + private void checkNoFilesConflictingWithDirs(StorageResourceId resourceId) throws IOException { + // Create a list of all files that can conflict with intermediate/subdirectory paths. + // For example: gs://foo/bar/zoo/ => (gs://foo/bar, gs://foo/bar/zoo) + List<StorageResourceId> fileIds = + getDirs(resourceId.getObjectName()).stream() + .filter(subdir -> !isNullOrEmpty(subdir)) + .map( + subdir -> + new StorageResourceId( + resourceId.getBucketName(), StringPaths.toFilePath(subdir))) + .collect(toImmutableList()); + + // Each intermediate path must ensure that corresponding file does not exist + // + // If for any of the intermediate paths file already exists then bail out early. + // It is possible that the status of intermediate paths can change after + // we make this check therefore this is a good faith effort and not a guarantee. + for (GoogleCloudStorageItemInfo fileInfo : gcs.getItemInfos(fileIds)) { + if (fileInfo.exists()) { + throw new FileAlreadyExistsException( + "Cannot create directories because of existing file: " + fileInfo.getResourceId()); + } + } + } + + /** + * For objects whose name looks like a path (foo/bar/zoo), returns all directory paths. + * + * <p>For example: + * + * <ul> + * <li>foo/bar/zoo => returns: (foo/, foo/bar/) + * <li>foo/bar/zoo/ => returns: (foo/, foo/bar/, foo/bar/zoo/) + * <li>foo => returns: () + * </ul> + * + * @param objectName Name of an object. + * @return List of subdirectory like paths. + */ + static List<String> getDirs(String objectName) { + if (isNullOrEmpty(objectName)) { + return ImmutableList.of(); + } + List<String> dirs = new ArrayList<>(); + int index = 0; + while ((index = objectName.indexOf(PATH_DELIMITER, index)) >= 0) { + index = index + PATH_DELIMITER.length(); + dirs.add(objectName.substring(0, index)); + } + return dirs; + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java index 8bf2f057724..e4db8146186 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystem.java @@ -283,11 +283,11 @@ public FSDataOutputStream create(Path hadoopPath, FsPermission permission, boole LOG.trace("create(hadoopPath: {}, overwrite: {}, bufferSize: {} [ignored])", hadoopPath, overwrite, bufferSize); - CreateOptions.WriteMode writeMode = - overwrite ? CreateOptions.WriteMode.OVERWRITE : CreateOptions.WriteMode.CREATE_NEW; + CreateFileOptions.WriteMode writeMode = + overwrite ? CreateFileOptions.WriteMode.OVERWRITE : CreateFileOptions.WriteMode.CREATE_NEW; FSDataOutputStream response = new FSDataOutputStream( new GoogleHadoopOutputStream(this, getGcsPath(hadoopPath), - CreateOptions.builder().setWriteMode(writeMode).build(), statistics), statistics); + CreateFileOptions.builder().setWriteMode(writeMode).build(), statistics), statistics); return response; } @@ -596,12 +596,6 @@ public long getUsed() throws IOException { return result; } -// @Override -// public long getDefaultBlockSize() { -// LOG.trace("getDefaultBlockSize(): {}", defaultBlockSize); -// return defaultBlockSize; -// } - @Override public void setWorkingDirectory(final Path hadoopPath) { checkArgument(hadoopPath != null, "hadoopPath must not be null"); @@ -633,4 +627,4 @@ private FileStatus getFileStatus(FileInfo fileInfo, String userName) { LOG.trace("FileStatus(path: {}, userName: {}): {}", fileInfo.getPath(), userName, status); return status; } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java index 20831885fe6..4097b5e1f83 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopFileSystemConfiguration.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.gs; +import java.time.Duration; import java.util.regex.Pattern; import static java.lang.Math.toIntExact; @@ -44,7 +45,7 @@ class GoogleHadoopFileSystemConfiguration { /** * Configuration key for GCS project ID. Default value: none */ - static final HadoopConfigurationProperty<String> GCS_PROJECT_ID = + private static final HadoopConfigurationProperty<String> GCS_PROJECT_ID = new HadoopConfigurationProperty<>("fs.gs.project.id"); /** @@ -56,7 +57,7 @@ class GoogleHadoopFileSystemConfiguration { /** * Configuration key for setting write buffer size. */ - static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE = + private static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_BUFFER_SIZE = new HadoopConfigurationProperty<>("fs.gs.outputstream.buffer.size", 8L * 1024 * 1024); @@ -64,20 +65,20 @@ class GoogleHadoopFileSystemConfiguration { * If forward seeks are within this many bytes of the current position, seeks are performed by * reading and discarding bytes in-place rather than opening a new underlying stream. */ - public static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT = + private static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT = new HadoopConfigurationProperty<>( "fs.gs.inputstream.inplace.seek.limit", GCS_INPUT_STREAM_INPLACE_SEEK_LIMIT_DEFAULT); /** Tunes reading objects behavior to optimize HTTP GET requests for various use cases. */ - public static final HadoopConfigurationProperty<Fadvise> GCS_INPUT_STREAM_FADVISE = + private static final HadoopConfigurationProperty<Fadvise> GCS_INPUT_STREAM_FADVISE = new HadoopConfigurationProperty<>("fs.gs.inputstream.fadvise", Fadvise.RANDOM); /** * If false, reading a file with GZIP content encoding (HTTP header "Content-Encoding: gzip") will * result in failure (IOException is thrown). */ - public static final HadoopConfigurationProperty<Boolean> + private static final HadoopConfigurationProperty<Boolean> GCS_INPUT_STREAM_SUPPORT_GZIP_ENCODING_ENABLE = new HadoopConfigurationProperty<>( "fs.gs.inputstream.support.gzip.encoding.enable", @@ -87,7 +88,7 @@ class GoogleHadoopFileSystemConfiguration { * Minimum size in bytes of the HTTP Range header set in GCS request when opening new stream to * read an object. */ - public static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE = + private static final HadoopConfigurationProperty<Long> GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE = new HadoopConfigurationProperty<>( "fs.gs.inputstream.min.range.request.size", 2 * 1024 * 1024L); @@ -96,22 +97,39 @@ class GoogleHadoopFileSystemConfiguration { * Configuration key for number of request to track for adapting the access pattern i.e. fadvise: * AUTO & AUTO_RANDOM. */ - public static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT = + private static final HadoopConfigurationProperty<Integer> GCS_FADVISE_REQUEST_TRACK_COUNT = new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3); /** * Configuration key for specifying max number of bytes rewritten in a single rewrite request when * fs.gs.copy.with.rewrite.enable is set to 'true'. */ - public static final HadoopConfigurationProperty<Long> GCS_REWRITE_MAX_CHUNK_SIZE = + private static final HadoopConfigurationProperty<Long> GCS_REWRITE_MAX_CHUNK_SIZE = new HadoopConfigurationProperty<>( "fs.gs.rewrite.max.chunk.size", 512 * 1024 * 1024L); /** Configuration key for marker file pattern. Default value: none */ - public static final HadoopConfigurationProperty<String> GCS_MARKER_FILE_PATTERN = + private static final HadoopConfigurationProperty<String> GCS_MARKER_FILE_PATTERN = new HadoopConfigurationProperty<>("fs.gs.marker.file.pattern"); + /** + * Configuration key for enabling check to ensure that conflicting directories do not exist when + * creating files and conflicting files do not exist when creating directories. + */ + private static final HadoopConfigurationProperty<Boolean> GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE = + new HadoopConfigurationProperty<>( + "fs.gs.create.items.conflict.check.enable", + true); + + /** + * Configuration key for the minimal time interval between consecutive sync/hsync/hflush calls. + */ + private static final HadoopConfigurationProperty<Long> GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL = + new HadoopConfigurationProperty<>( + "fs.gs.outputstream.sync.min.interval", + 0L); + private final String workingDirectory; private final String projectId; private final Configuration config; @@ -188,4 +206,12 @@ public Pattern getMarkerFilePattern() { return fileMarkerFilePattern; } + + public boolean isEnsureNoConflictingItems() { + return GCS_CREATE_ITEMS_CONFLICT_CHECK_ENABLE.get(config, config::getBoolean); + } + + public Duration getMinSyncInterval() { + return GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL.getTimeDuration(config); + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java index 747d9f001c5..c41ce13edae 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/GoogleHadoopOutputStream.java @@ -25,23 +25,88 @@ import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.annotation.Nonnull; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.thirdparty.com.google.common.base.Ascii; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class GoogleHadoopOutputStream extends OutputStream { - public static final Logger LOG = LoggerFactory.getLogger(StorageResourceId.class); +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState; +import static org.apache.hadoop.thirdparty.com.google.common.base.Strings.isNullOrEmpty; + +class GoogleHadoopOutputStream extends OutputStream + implements StreamCapabilities, Syncable { + private static final Logger LOG = LoggerFactory.getLogger(StorageResourceId.class); + + // Prefix used for all temporary files created by this stream. + private static final String TMP_FILE_PREFIX = "_GHFS_SYNC_TMP_FILE_"; + + // Temporary files don't need to contain the desired attributes of the final destination file + // since metadata settings get clobbered on final compose() anyways; additionally, due to + // the way we pick temp file names and already ensured directories for the destination file, + // we can optimize tempfile creation by skipping various directory checks. + private static final CreateFileOptions TMP_FILE_CREATE_OPTIONS = + CreateFileOptions.builder().setEnsureNoDirectoryConflict(false).build(); + + // Deletion of temporary files occurs asynchronously for performance reasons, but in-flight + // deletions are awaited on close() so as long as all output streams are closed, there should + // be no remaining in-flight work occurring inside this threadpool. + private static final ExecutorService TMP_FILE_CLEANUP_THREADPOOL = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("ghfs-output-stream-sync-cleanup-%d") + .setDaemon(true) + .build()); private final GoogleHadoopFileSystem ghfs; + private final CreateObjectOptions composeObjectOptions; + // Path of the file to write to. private final URI dstGcsPath; - private OutputStream outputStream; + /** + * The last known generationId of the {@link #dstGcsPath} file, or possibly {@link + * StorageResourceId#UNKNOWN_GENERATION_ID} if unknown. + */ + private long dstGenerationId; + + // GCS path pointing at the "tail" file which will be appended to the destination + // on hflush()/hsync() call. + private URI tmpGcsPath; + + /** + * Stores the component index corresponding to {@link #tmpGcsPath}. If close() is called, the + * total number of components in the {@link #dstGcsPath} will be {@code tmpIndex + 1}. + */ + private int tmpIndex; + + // OutputStream pointing at the "tail" file which will be appended to the destination + // on hflush()/hsync() call. + private OutputStream tmpOut; + + private final RateLimiter syncRateLimiter; + + // List of temporary file-deletion futures accrued during the lifetime of this output stream. + private final List<Future<Void>> tmpDeletionFutures = new ArrayList<>(); // Statistics tracker provided by the parent GoogleHadoopFileSystem for recording // numbers of bytes written. @@ -57,19 +122,49 @@ class GoogleHadoopOutputStream extends OutputStream { * @throws IOException if an IO error occurs. */ GoogleHadoopOutputStream(GoogleHadoopFileSystem ghfs, URI dstGcsPath, - CreateOptions createFileOptions, FileSystem.Statistics statistics) throws IOException { + CreateFileOptions createFileOptions, FileSystem.Statistics statistics) throws IOException { LOG.trace("GoogleHadoopOutputStream(gcsPath: {}, createFileOptions: {})", dstGcsPath, createFileOptions); this.ghfs = ghfs; this.dstGcsPath = dstGcsPath; this.statistics = statistics; - this.outputStream = createOutputStream(ghfs.getGcsFs(), dstGcsPath, createFileOptions, - ghfs.getFileSystemConfiguration()); + Duration minSyncInterval = ghfs.getFileSystemConfiguration().getMinSyncInterval(); + + this.syncRateLimiter = + minSyncInterval.isNegative() || minSyncInterval.isZero() + ? null + : RateLimiter.create(/* permitsPerSecond= */ 1_000.0 / minSyncInterval.toMillis()); + this.composeObjectOptions = + GoogleCloudStorageFileSystem.objectOptionsFromFileOptions( + createFileOptions.toBuilder() + // Set write mode to OVERWRITE because we use compose operation to append new data + // to an existing object + .setWriteMode(CreateFileOptions.WriteMode.OVERWRITE) + .build()); + + if (createFileOptions.getWriteMode() == CreateFileOptions.WriteMode.APPEND) { + // When appending first component has to go to new temporary file. + this.tmpGcsPath = getNextTmpPath(); + this.tmpIndex = 1; + } else { + // The first component of the stream will go straight to the destination filename to optimize + // the case where no hsync() or a single hsync() is called during the lifetime of the stream; + // committing the first component thus doesn't require any compose() call under the hood. + this.tmpGcsPath = dstGcsPath; + this.tmpIndex = 0; + } + + this.tmpOut = + createOutputStream( + ghfs.getGcsFs(), + tmpGcsPath, + tmpIndex == 0 ? createFileOptions : TMP_FILE_CREATE_OPTIONS); + this.dstGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID; } - private static OutputStream createOutputStream(GoogleCloudStorageFileSystem gcsfs, URI gcsPath, - CreateOptions options, GoogleHadoopFileSystemConfiguration fileSystemConfiguration) + private OutputStream createOutputStream(GoogleCloudStorageFileSystem gcsfs, URI gcsPath, + CreateFileOptions options) throws IOException { WritableByteChannel channel; try { @@ -80,14 +175,14 @@ private static OutputStream createOutputStream(GoogleCloudStorageFileSystem gcsf String.format("'%s' already exists", gcsPath)).initCause(e); } OutputStream outputStream = Channels.newOutputStream(channel); - int bufferSize = fileSystemConfiguration.getOutStreamBufferSize(); + int bufferSize = gcsfs.getConfiguration().getOutStreamBufferSize(); return bufferSize > 0 ? new BufferedOutputStream(outputStream, bufferSize) : outputStream; } @Override public void write(int b) throws IOException { throwIfNotOpen(); - outputStream.write(b); + tmpOut.write(b); statistics.incrementBytesWritten(1); statistics.incrementWriteOps(1); } @@ -95,30 +190,176 @@ public void write(int b) throws IOException { @Override public void write(@Nonnull byte[] b, int offset, int len) throws IOException { throwIfNotOpen(); - outputStream.write(b, offset, len); + tmpOut.write(b, offset, len); statistics.incrementBytesWritten(len); statistics.incrementWriteOps(1); } + private void commitTempFile() throws IOException { + // TODO: return early when 0 bytes have been written in the temp files + tmpOut.close(); + + long tmpGenerationId = StorageResourceId.UNKNOWN_GENERATION_ID; + LOG.trace( + "tmpOut is an instance of {}; expected generationId {}.", + tmpOut.getClass(), tmpGenerationId); + + // On the first component, tmpGcsPath will equal finalGcsPath, and no compose() call is + // necessary. Otherwise, we compose in-place into the destination object and then delete + // the temporary object. + if (dstGcsPath.equals(tmpGcsPath)) { + // First commit was direct to the destination; the generationId of the object we just + // committed will be used as the destination generation id for future compose calls. + dstGenerationId = tmpGenerationId; + } else { + StorageResourceId dstId = + StorageResourceId.fromUriPath( + dstGcsPath, /* allowEmptyObjectName= */ false, dstGenerationId); + StorageResourceId tmpId = + StorageResourceId.fromUriPath( + tmpGcsPath, /* allowEmptyObjectName= */ false, tmpGenerationId); + checkState( + dstId.getBucketName().equals(tmpId.getBucketName()), + "Destination bucket in path '%s' doesn't match temp file bucket in path '%s'", + dstGcsPath, + tmpGcsPath); + GoogleCloudStorageFileSystem gcs = ghfs.getGcsFs(); + GoogleCloudStorageItemInfo composedObject = + gcs.composeObjects(ImmutableList.of(dstId, tmpId), dstId, composeObjectOptions); + dstGenerationId = composedObject.getContentGeneration(); + tmpDeletionFutures.add( + TMP_FILE_CLEANUP_THREADPOOL.submit( + () -> { + gcs.delete(ImmutableList.of(tmpId)); + return null; + })); + } + } + @Override public void close() throws IOException { - LOG.trace("close(): final destination: {}", dstGcsPath); + LOG.trace( + "close(): temp tail file: %s final destination: {}", tmpGcsPath, dstGcsPath); - if (outputStream == null) { + if (tmpOut == null) { LOG.trace("close(): Ignoring; stream already closed."); return; } + commitTempFile(); + try { - outputStream.close(); + tmpOut.close(); } finally { - outputStream = null; + tmpOut = null; + } + tmpGcsPath = null; + tmpIndex = -1; + + LOG.trace("close(): Awaiting {} deletionFutures", tmpDeletionFutures.size()); + for (Future<?> deletion : tmpDeletionFutures) { + try { + deletion.get(); + } catch (ExecutionException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new IOException( + String.format( + "Failed to delete temporary files while closing stream: '%s'", dstGcsPath), + e); + } } } private void throwIfNotOpen() throws IOException { - if (outputStream == null) { + if (tmpOut == null) { throw new ClosedChannelException(); } } + + @Override + public boolean hasCapability(String capability) { + checkArgument(!isNullOrEmpty(capability), "capability must not be null or empty string"); + switch (Ascii.toLowerCase(capability)) { + case StreamCapabilities.HFLUSH: + case StreamCapabilities.HSYNC: + return syncRateLimiter != null; + case StreamCapabilities.IOSTATISTICS: + return false; // TODO: Add support + default: + return false; + } + } + + /** + * There is no way to flush data to become available for readers without a full-fledged hsync(), + * If the output stream is only syncable, this method is a no-op. If the output stream is also + * flushable, this method will simply use the same implementation of hsync(). + * + * <p>If it is rate limited, unlike hsync(), which will try to acquire the permits and block, it + * will do nothing. + */ + @Override + public void hflush() throws IOException { + LOG.trace("hflush(): {}", dstGcsPath); + + long startMs = System.currentTimeMillis(); + throwIfNotOpen(); + // If rate limit not set or permit acquired than use hsync() + if (syncRateLimiter == null || syncRateLimiter.tryAcquire()) { + LOG.trace("hflush() uses hsyncInternal() for {}", dstGcsPath); + hsyncInternal(startMs); + return; + } + LOG.trace( + "hflush(): No-op due to rate limit ({}): readers will *not* yet see flushed data for {}", + syncRateLimiter, dstGcsPath); + + } + + @Override + public void hsync() throws IOException { + LOG.trace("hsync(): {}", dstGcsPath); + + long startMs = System.currentTimeMillis(); + throwIfNotOpen(); + if (syncRateLimiter != null) { + LOG.trace( + "hsync(): Rate limited ({}) with blocking permit acquisition for {}", + syncRateLimiter, dstGcsPath); + syncRateLimiter.acquire(); + } + hsyncInternal(startMs); + + } + + /** Internal implementation of hsync, can be reused by hflush() as well. */ + private void hsyncInternal(long startMs) throws IOException { + LOG.trace( + "hsyncInternal(): Committing tail file {} to final destination {}", tmpGcsPath, dstGcsPath); + commitTempFile(); + + // Use a different temporary path for each temporary component to reduce the possible avenues of + // race conditions in the face of low-level retries, etc. + ++tmpIndex; + tmpGcsPath = getNextTmpPath(); + + LOG.trace( + "hsync(): Opening next temporary tail file {} at {} index", tmpGcsPath, tmpIndex); + tmpOut = createOutputStream(ghfs.getGcsFs(), tmpGcsPath, TMP_FILE_CREATE_OPTIONS); + + long finishMs = System.currentTimeMillis(); + LOG.trace("Took {}ms to sync() for {}", finishMs - startMs, dstGcsPath); + } + /** Returns URI to be used for the next temp "tail" file in the series. */ + private URI getNextTmpPath() { + Path basePath = ghfs.getHadoopPath(dstGcsPath); + Path tempPath = + new Path( + basePath.getParent(), + String.format( + "%s%s.%d.%s", TMP_FILE_PREFIX, basePath.getName(), tmpIndex, UUID.randomUUID())); + return ghfs.getGcsPath(tempPath); + } } diff --git a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java index 9360290a09c..450459e6a8d 100644 --- a/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java +++ b/hadoop-tools/hadoop-gcp/src/main/java/org/apache/hadoop/fs/gs/HadoopConfigurationProperty.java @@ -20,6 +20,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; +import java.time.Duration; import java.util.List; import java.util.function.BiFunction; @@ -28,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * Hadoop configuration property. */ @@ -64,6 +67,13 @@ T get(Configuration config, BiFunction<String, T, T> getterFn) { return logProperty(lookupKey, getterFn.apply(lookupKey, defaultValue)); } + Duration getTimeDuration(Configuration config) { + String lookupKey = getLookupKey(config, key, (c, k) -> c.get(k) != null); + String defValStr = defaultValue == null ? null : String.valueOf(defaultValue); + return logProperty( + lookupKey, Duration.ofMillis(config.getTimeDuration(lookupKey, defValStr, MILLISECONDS))); + } + private String getLookupKey(Configuration config, String lookupKey, BiFunction<Configuration, String, Boolean> checkFn) { for (String prefix : keyPrefixes) { diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java similarity index 68% copy from hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java copy to hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java index 5d9459cac19..56ae35b4ff4 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractCreate.java @@ -15,29 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.fs.gs.contract; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.AbstractContractRenameTest; +import org.apache.hadoop.fs.contract.AbstractContractCreateTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.ContractTestUtils; -/** GCS contract tests covering file rename. */ -public class ITestGoogleContractRename extends AbstractContractRenameTest { +public class ITestGoogleContractCreate extends AbstractContractCreateTest { @Override protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } @Override - public void testRenameWithNonEmptySubDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } - - @Override - public void testRenameNonexistentFile() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); + public void testOverwriteEmptyDirectory() throws Throwable { + ContractTestUtils.skip("blobstores can't distinguish empty directories from files"); } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java index 26181f20385..27acc015ab8 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractMkdir.java @@ -29,33 +29,9 @@ protected AbstractFSContract createContract(Configuration conf) { return new GoogleContract(conf); } - @Override - public void testMkdirsDoesNotRemoveParentDirectories() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } - - @Override - public void testCreateDirWithExistingDir() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } - @Override public void testMkDirRmDir() { // TODO: Enable this ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); } - - @Override - public void testNoMkdirOverFile() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } - - @Override - public void testMkdirOverParentFile() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } diff --git a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java index 5d9459cac19..a159d46b006 100644 --- a/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java +++ b/hadoop-tools/hadoop-gcp/src/test/java/org/apache/hadoop/fs/gs/contract/ITestGoogleContractRename.java @@ -34,10 +34,4 @@ public void testRenameWithNonEmptySubDir() { // TODO: Enable this ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); } - - @Override - public void testRenameNonexistentFile() { - // TODO: Enable this - ContractTestUtils.skip("Skipping the test. This will be enabled in a subsequent change"); - } } diff --git a/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml b/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml index 1de34245a5d..542df5166b6 100644 --- a/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml +++ b/hadoop-tools/hadoop-gcp/src/test/resources/contract/gs.xml @@ -17,5 +17,103 @@ --> <configuration> + <property> + <name>fs.contract.test.root-tests-enabled</name> + <value>true</value> + </property> -</configuration> + <property> + <name>fs.contract.test.random-seek-count</name> + <value>10</value> + </property> + + <property> + <name>fs.contract.create-visibility-delayed</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.is-blobstore</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.is-case-sensitive</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.rename-returns-false-if-source-missing</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.rename-returns-false-if-dest-exists</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.rename-remove-dest-if-empty-dir</name> + <value>false</value> + </property> + + <property> + <name>fs.contract.supports-append</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-atomic-directory-delete</name> + <value>false</value> + </property> + + <property> + <name>fs.contract.supports-atomic-rename</name> + <value>false</value> + </property> + + <property> + <name>fs.contract.supports-block-locality</name> + <value>false</value> + </property> + + <property> + <name>fs.contract.supports-concat</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-getfilestatus</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-seek</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-seek-on-closed-file</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.rejects-seek-past-eof</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-strict-exceptions</name> + <value>true</value> + </property> + + <property> + <name>fs.contract.supports-unix-permissions</name> + <value>false</value> + </property> + + <property> + <name>fs.contract.rename-overwrites-dest</name> + <value>false</value> + </property> +</configuration> \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org