This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1541d027c52 Remove test-only functions in GcsUtil and change related
unit tests to use V1. (#37597)
1541d027c52 is described below
commit 1541d027c528cc6886180de4faf07b450d172742
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Feb 24 10:47:20 2026 -0500
Remove test-only functions in GcsUtil and change related unit tests to use
V1. (#37597)
---
.../beam/sdk/extensions/gcp/util/GcsUtil.java | 156 +---------------
.../beam/sdk/extensions/gcp/util/GcsUtilTest.java | 207 +++++++++------------
2 files changed, 100 insertions(+), 263 deletions(-)
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
index 33399ef87b6..396fde45298 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
@@ -17,15 +17,10 @@
*/
package org.apache.beam.sdk.extensions.gcp.util;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
import com.google.api.gax.paging.Page;
-import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
-import com.google.auth.Credentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage.BlobGetOption;
@@ -38,9 +33,6 @@ import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.function.Supplier;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
@@ -87,73 +79,31 @@ public class GcsUtil {
public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
@Override
public GcsUtil create(PipelineOptions options) {
- GcsOptions gcsOptions = options.as(GcsOptions.class);
- Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
- return new GcsUtil(
- storageBuilder.build(),
- storageBuilder.getHttpRequestInitializer(),
- gcsOptions.getExecutorService(),
- ExperimentalOptions.hasExperiment(options, "use_grpc_for_gcs"),
- gcsOptions.getGcpCredential(),
- gcsOptions.getGcsUploadBufferSizeBytes(),
- gcsOptions.getGcsRewriteDataOpBatchLimit(),
- GcsCountersOptions.create(
- gcsOptions.getEnableBucketReadMetricCounter()
- ? gcsOptions.getGcsReadCounterPrefix()
- : null,
- gcsOptions.getEnableBucketWriteMetricCounter()
- ? gcsOptions.getGcsWriteCounterPrefix()
- : null),
- gcsOptions);
+ return new GcsUtil(options);
}
}
+ /** @deprecated use {@link GcsPath#getNonWildcardPrefix(String)} instead. */
+ @Deprecated
public static String getNonWildcardPrefix(String globExp) {
return GcsPath.getNonWildcardPrefix(globExp);
}
+ /** @deprecated use {@link GcsPath#isWildcard(GcsPath)} instead. */
+ @Deprecated
public static boolean isWildcard(GcsPath spec) {
return GcsPath.isWildcard(spec);
}
- @VisibleForTesting
- GcsUtil(
- Storage storageClient,
- HttpRequestInitializer httpRequestInitializer,
- ExecutorService executorService,
- Boolean shouldUseGrpc,
- Credentials credentials,
- @Nullable Integer uploadBufferSizeBytes,
- @Nullable Integer rewriteDataOpBatchLimit,
- GcsCountersOptions gcsCountersOptions,
- GcsOptions gcsOptions) {
- this.delegate =
- new GcsUtilV1(
- storageClient,
- httpRequestInitializer,
- executorService,
- shouldUseGrpc,
- credentials,
- uploadBufferSizeBytes,
- rewriteDataOpBatchLimit,
- gcsCountersOptions.delegate,
- gcsOptions);
-
- if (ExperimentalOptions.hasExperiment(gcsOptions, "use_gcsutil_v2")) {
- this.delegateV2 = new GcsUtilV2(gcsOptions);
+ GcsUtil(PipelineOptions options) {
+ this.delegate = new GcsUtilV1.GcsUtilFactory().create(options);
+ if (ExperimentalOptions.hasExperiment(options, "use_gcsutil_v2")) {
+ this.delegateV2 = new GcsUtilV2.GcsUtilFactory().create(options);
} else {
this.delegateV2 = null;
}
}
- protected void setStorageClient(Storage storageClient) {
- delegate.setStorageClient(storageClient);
- }
-
- protected void setBatchRequestSupplier(Supplier<GcsUtilV1.BatchInterface>
supplier) {
- delegate.setBatchRequestSupplier(supplier);
- }
-
public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
if (delegateV2 != null) {
return delegateV2.expand(gcsPattern);
@@ -161,12 +111,6 @@ public class GcsUtil {
return delegate.expand(gcsPattern);
}
- @VisibleForTesting
- @Nullable
- Integer getUploadBufferSizeBytes() {
- return delegate.getUploadBufferSizeBytes();
- }
-
public long fileSize(GcsPath path) throws IOException {
if (delegateV2 != null) {
return delegateV2.fileSize(path);
@@ -180,13 +124,6 @@ public class GcsUtil {
return delegate.getObject(gcsPath);
}
- /** @deprecated use {@link #getBlob(GcsPath, BlobGetOption...)}. */
- @Deprecated
- @VisibleForTesting
- StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper)
throws IOException {
- return delegate.getObject(gcsPath, backoff, sleeper);
- }
-
public Blob getBlob(GcsPath gcsPath, BlobGetOption... options) throws
IOException {
if (delegateV2 != null) {
return delegateV2.getBlob(gcsPath, options);
@@ -248,11 +185,6 @@ public class GcsUtil {
throw new IOException("GcsUtil V2 not initialized.");
}
- @VisibleForTesting
- List<Long> fileSizes(List<GcsPath> paths) throws IOException {
- return delegate.fileSizes(paths);
- }
-
public SeekableByteChannel open(GcsPath path) throws IOException {
return delegate.open(path);
}
@@ -389,50 +321,6 @@ public class GcsUtil {
}
}
- @VisibleForTesting
- boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper)
throws IOException {
- return delegate.bucketAccessible(path, backoff, sleeper);
- }
-
- @VisibleForTesting
- void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper)
throws IOException {
- delegate.verifyBucketAccessible(path, backoff, sleeper);
- }
-
- @VisibleForTesting
- @Nullable
- Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws
IOException {
- return delegate.getBucket(path, backoff, sleeper);
- }
-
- @VisibleForTesting
- void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper
sleeper)
- throws IOException {
- delegate.createBucket(projectId, bucket, backoff, sleeper);
- }
-
- @VisibleForTesting
- void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws
IOException {
- delegate.removeBucket(bucket, backoff, sleeper);
- }
-
- @VisibleForTesting
- List<GcsUtilV1.BatchInterface> makeGetBatches(
- Collection<GcsPath> paths, List<StorageObjectOrIOException[]> results)
throws IOException {
- List<GcsUtilV1.StorageObjectOrIOException[]> legacyResults = new
java.util.ArrayList<>();
- List<GcsUtilV1.BatchInterface> legacyBatch =
delegate.makeGetBatches(paths, legacyResults);
-
- for (GcsUtilV1.StorageObjectOrIOException[] legacyResult : legacyResults) {
- StorageObjectOrIOException[] result = new
StorageObjectOrIOException[legacyResult.length];
- for (int i = 0; i < legacyResult.length; ++i) {
- result[i] = StorageObjectOrIOException.fromLegacy(legacyResult[i]);
- }
- results.add(result);
- }
-
- return legacyBatch;
- }
-
public void copy(Iterable<String> srcFilenames, Iterable<String>
destFilenames)
throws IOException {
delegate.copy(srcFilenames, destFilenames);
@@ -497,32 +385,6 @@ public class GcsUtil {
}
}
- @VisibleForTesting
- @SuppressWarnings("JdkObsolete") // for LinkedList
- java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
- Iterable<String> srcFilenames,
- Iterable<String> destFilenames,
- boolean deleteSource,
- boolean ignoreMissingSource,
- boolean ignoreExistingDest)
- throws IOException {
- return delegate.makeRewriteOps(
- srcFilenames, destFilenames, deleteSource, ignoreMissingSource,
ignoreExistingDest);
- }
-
- @VisibleForTesting
- @SuppressWarnings("JdkObsolete") // for LinkedList
- List<GcsUtilV1.BatchInterface> makeRewriteBatches(
- java.util.LinkedList<GcsUtilV1.RewriteOp> rewrites) throws IOException {
- return delegate.makeRewriteBatches(rewrites);
- }
-
- @VisibleForTesting
- List<GcsUtilV1.BatchInterface> makeRemoveBatches(Collection<String>
filenames)
- throws IOException {
- return delegate.makeRemoveBatches(filenames);
- }
-
public void remove(Collection<String> filenames) throws IOException {
delegate.remove(filenames);
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
index 0b02e11eade..a2b0e0af502 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java
@@ -151,7 +151,7 @@ public class GcsUtilTest {
public void testUploadBufferSizeDefault() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil util = pipelineOptions.getGcsUtil();
- assertNull(util.getUploadBufferSizeBytes());
+ assertNull(util.delegate.getUploadBufferSizeBytes());
}
@Test
@@ -159,7 +159,7 @@ public class GcsUtilTest {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
pipelineOptions.setGcsUploadBufferSizeBytes(12345);
GcsUtil util = pipelineOptions.getGcsUtil();
- assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
+ assertEquals((Integer) 12345, util.delegate.getUploadBufferSizeBytes());
}
@Test
@@ -243,7 +243,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -332,7 +332,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -383,7 +383,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -408,7 +408,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -436,7 +436,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -461,7 +461,7 @@ public class GcsUtilTest {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
- gcsUtil.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
+ gcsUtil.delegate.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
thrown.expect(FileNotFoundException.class);
gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
@@ -473,7 +473,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet =
Mockito.mock(Storage.Objects.Get.class);
@@ -491,6 +491,7 @@ public class GcsUtilTest {
assertEquals(
1000,
gcsUtil
+ .delegate
.getObject(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
@@ -546,8 +547,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- gcsUtil.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
- gcsUtil.fileSizes(
+ gcsUtil.delegate.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
+ gcsUtil.delegate.fileSizes(
ImmutableList.of(
GcsPath.fromComponents("testbucket", "testobject"),
GcsPath.fromComponents("testbucket", "testobject2")));
@@ -567,8 +568,9 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- gcsUtil.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
- gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket",
"testobject")));
+ gcsUtil.delegate.setStorageClient(new Storage(mockTransport,
Transport.getJsonFactory(), null));
+ gcsUtil.delegate.fileSizes(
+ ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
}
@Test
@@ -637,9 +639,9 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- gcsUtil.setStorageClient(
+ gcsUtil.delegate.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new
RetryHttpRequestInitializer()));
- gcsUtil.fileSizes(
+ gcsUtil.delegate.fileSizes(
ImmutableList.of(
GcsPath.fromComponents("testbucket", "testobject"),
GcsPath.fromComponents("testbucket", "testobject2")));
@@ -678,9 +680,10 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- gcsUtil.setStorageClient(
+ gcsUtil.delegate.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new
RetryHttpRequestInitializer()));
- gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket",
"testobject")));
+ gcsUtil.delegate.fileSizes(
+ ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
}
@Test
@@ -725,7 +728,7 @@ public class GcsUtilTest {
new
MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
- gcsUtil.setStorageClient(
+ gcsUtil.delegate.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new
RetryHttpRequestInitializer()));
gcsUtil.remove(Arrays.asList("gs://some-bucket/already-deleted"));
}
@@ -737,7 +740,7 @@ public class GcsUtilTest {
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets.Insert mockStorageInsert =
Mockito.mock(Storage.Buckets.Insert.class);
@@ -750,7 +753,8 @@ public class GcsUtilTest {
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new
FastNanoClockAndSleeper()::sleep);
+ gcsUtil.delegate.createBucket(
+ "a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
}
@Test
@@ -759,7 +763,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Insert mockStorageInsert =
Mockito.mock(Storage.Buckets.Insert.class);
@@ -778,7 +782,8 @@ public class GcsUtilTest {
thrown.expect(AccessDeniedException.class);
- gcsUtil.createBucket("a", new Bucket(), mockBackOff, new
FastNanoClockAndSleeper()::sleep);
+ gcsUtil.delegate.createBucket(
+ "a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()::sleep);
}
@Test
@@ -787,7 +792,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -801,7 +806,7 @@ public class GcsUtilTest {
.thenReturn(new Bucket());
assertTrue(
- gcsUtil.bucketAccessible(
+ gcsUtil.delegate.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep));
@@ -813,7 +818,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -830,7 +835,7 @@ public class GcsUtilTest {
when(mockStorageGet.execute()).thenThrow(expectedException);
assertFalse(
- gcsUtil.bucketAccessible(
+ gcsUtil.delegate.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep));
@@ -842,7 +847,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -857,7 +862,7 @@ public class GcsUtilTest {
HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist",
"Nothing here to see"));
assertFalse(
- gcsUtil.bucketAccessible(
+ gcsUtil.delegate.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep));
@@ -869,7 +874,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -882,7 +887,7 @@ public class GcsUtilTest {
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
- gcsUtil.verifyBucketAccessible(
+ gcsUtil.delegate.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep);
@@ -894,7 +899,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -910,7 +915,7 @@ public class GcsUtilTest {
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute()).thenThrow(expectedException);
- gcsUtil.verifyBucketAccessible(
+ gcsUtil.delegate.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep);
@@ -922,7 +927,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -936,7 +941,7 @@ public class GcsUtilTest {
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist",
"Nothing here to see"));
- gcsUtil.verifyBucketAccessible(
+ gcsUtil.delegate.verifyBucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep);
@@ -948,7 +953,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -962,7 +967,7 @@ public class GcsUtilTest {
.thenReturn(new Bucket());
assertNotNull(
- gcsUtil.getBucket(
+ gcsUtil.delegate.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep));
@@ -974,7 +979,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet =
Mockito.mock(Storage.Buckets.Get.class);
@@ -990,7 +995,7 @@ public class GcsUtilTest {
thrown.expect(FileNotFoundException.class);
thrown.expectMessage("It don't exist");
- gcsUtil.getBucket(
+ gcsUtil.delegate.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()::sleep);
@@ -1147,7 +1152,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
LinkedList<RewriteOp> rewrites =
- gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1),
false, false, false);
+ gcsUtil.delegate.makeRewriteOps(
+ makeStrings("s", 1), makeStrings("d", 1), false, false, false);
assertEquals(1, rewrites.size());
RewriteOp rewrite = rewrites.pop();
@@ -1167,7 +1173,8 @@ public class GcsUtilTest {
gcsUtil.delegate.maxBytesRewrittenPerCall = 1337L;
LinkedList<RewriteOp> rewrites =
- gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1),
false, false, false);
+ gcsUtil.delegate.makeRewriteOps(
+ makeStrings("s", 1), makeStrings("d", 1), false, false, false);
assertEquals(1, rewrites.size());
RewriteOp rewrite = rewrites.pop();
@@ -1182,23 +1189,24 @@ public class GcsUtilTest {
// Small number of files fits in 1 batch
List<BatchInterface> batches =
- gcsUtil.makeRewriteBatches(
- gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3),
false, false, false));
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(
+ makeStrings("s", 3), makeStrings("d", 3), false, false,
false));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
// 1 batch of files fits in 1 batch
batches =
- gcsUtil.makeRewriteBatches(
- gcsUtil.makeRewriteOps(
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(
makeStrings("s", 100), makeStrings("d", 100), false, false,
false));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
// A little more than 5 batches of files fits in 6 batches
batches =
- gcsUtil.makeRewriteBatches(
- gcsUtil.makeRewriteOps(
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(
makeStrings("s", 501), makeStrings("d", 501), false, false,
false));
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
@@ -1212,15 +1220,16 @@ public class GcsUtilTest {
// Small number of files in same bucket fits in 1 batch
List<BatchInterface> batches =
- gcsUtil.makeRewriteBatches(
- gcsUtil.makeRewriteOps(makeStrings("s", 5), makeStrings("d", 5),
false, false, false));
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(
+ makeStrings("s", 5), makeStrings("d", 5), false, false,
false));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(5));
// Files copying between buckets use smaller batch size
batches =
- gcsUtil.makeRewriteBatches(
- gcsUtil.makeRewriteOps(
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(
makeStrings("bucket1", "s", 5),
makeStrings("bucket2", "d", 5),
false,
@@ -1240,7 +1249,8 @@ public class GcsUtilTest {
toFiles.addAll(makeStrings("bucket5", "g", 1));
batches =
- gcsUtil.makeRewriteBatches(gcsUtil.makeRewriteOps(fromFiles, toFiles,
false, false, false));
+ gcsUtil.delegate.makeRewriteBatches(
+ gcsUtil.delegate.makeRewriteOps(fromFiles, toFiles, false, false,
false));
assertThat(batches.size(), equalTo(4));
assertThat(batches.get(0).size(), equalTo(91));
assertThat(sumBatchSizes(batches), equalTo(97));
@@ -1252,7 +1262,7 @@ public class GcsUtilTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Number of source files 3");
- gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1), false,
false, false);
+ gcsUtil.delegate.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1),
false, false, false);
}
private class FakeBatcher implements BatchInterface {
@@ -1318,8 +1328,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1350,8 +1360,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite1 =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1380,8 +1390,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1401,8 +1411,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Rewrite mockStorageRewrite =
Mockito.mock(Storage.Objects.Rewrite.class);
@@ -1426,7 +1436,7 @@ public class GcsUtilTest {
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
+ gcsUtil.delegate.setStorageClient(mockStorage);
assertThrows(
UnsupportedOperationException.class,
@@ -1444,8 +1454,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest1 =
Mockito.mock(Storage.Objects.Get.class);
@@ -1476,8 +1486,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1518,17 +1528,17 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
// Small number of files fits in 1 batch
- List<BatchInterface> batches = gcsUtil.makeRemoveBatches(makeStrings("s",
3));
+ List<BatchInterface> batches =
gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 3));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
// 1 batch of files fits in 1 batch
- batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
+ batches = gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 100));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
// A little more than 5 batches of files fits in 6 batches
- batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
+ batches = gcsUtil.delegate.makeRemoveBatches(makeStrings("s", 501));
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
}
@@ -1538,22 +1548,22 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
// Small number of files fits in 1 batch
- List<StorageObjectOrIOException[]> results = Lists.newArrayList();
- List<BatchInterface> batches = gcsUtil.makeGetBatches(makeGcsPaths("s",
3), results);
+ List<GcsUtilV1.StorageObjectOrIOException[]> results =
Lists.newArrayList();
+ List<BatchInterface> batches =
gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 3), results);
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
assertEquals(3, results.size());
// 1 batch of files fits in 1 batch
results = Lists.newArrayList();
- batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
+ batches = gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 100), results);
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
assertEquals(100, results.size());
// A little more than 5 batches of files fits in 6 batches
results = Lists.newArrayList();
- batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
+ batches = gcsUtil.delegate.makeGetBatches(makeGcsPaths("s", 501), results);
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
assertEquals(501, results.size());
@@ -1564,8 +1574,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1584,8 +1594,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockGetRequest =
Mockito.mock(Storage.Objects.Get.class);
@@ -1610,8 +1620,8 @@ public class GcsUtilTest {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
- gcsUtil.setStorageClient(mockStorage);
- gcsUtil.setBatchRequestSupplier(FakeBatcher::new);
+ gcsUtil.delegate.setStorageClient(mockStorage);
+ gcsUtil.delegate.setBatchRequestSupplier(FakeBatcher::new);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
when(mockStorage.objects()).thenReturn(mockStorageObjects);
@@ -1638,46 +1648,11 @@ public class GcsUtilTest {
}
public static GcsUtilMock createMock(PipelineOptions options) {
- GcsOptions gcsOptions = options.as(GcsOptions.class);
- Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
- return new GcsUtilMock(
- storageBuilder.build(),
- storageBuilder.getHttpRequestInitializer(),
- gcsOptions.getExecutorService(),
- hasExperiment(options, "use_grpc_for_gcs"),
- gcsOptions.getGcpCredential(),
- gcsOptions.getGcsUploadBufferSizeBytes(),
- gcsOptions.getGcsRewriteDataOpBatchLimit(),
- GcsUtil.GcsCountersOptions.create(
- gcsOptions.getEnableBucketReadMetricCounter()
- ? gcsOptions.getGcsReadCounterPrefix()
- : null,
- gcsOptions.getEnableBucketWriteMetricCounter()
- ? gcsOptions.getGcsWriteCounterPrefix()
- : null),
- gcsOptions);
+ return new GcsUtilMock(options);
}
- private GcsUtilMock(
- Storage storageClient,
- HttpRequestInitializer httpRequestInitializer,
- ExecutorService executorService,
- Boolean shouldUseGrpc,
- Credentials credentials,
- @Nullable Integer uploadBufferSizeBytes,
- @Nullable Integer rewriteDataOpBatchLimit,
- GcsUtil.GcsCountersOptions gcsCountersOptions,
- GcsOptions gcsOptions) {
- super(
- storageClient,
- httpRequestInitializer,
- executorService,
- shouldUseGrpc,
- credentials,
- uploadBufferSizeBytes,
- rewriteDataOpBatchLimit,
- gcsCountersOptions,
- gcsOptions);
+ private GcsUtilMock(PipelineOptions options) {
+ super(options);
}
}