Abacn commented on code in PR #36876:
URL: https://github.com/apache/beam/pull/36876#discussion_r2593325020


##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java:
##########
@@ -49,7 +50,7 @@ public interface GcsOptions extends ApplicationNameOptions, 
GcpOptions, Pipeline
   @JsonIgnore
   @Description(
       "The GoogleCloudStorageReadOptions instance that should be used to read 
from Google Cloud Storage.")
-  @Default.InstanceFactory(GcsUtil.GcsReadOptionsFactory.class)
+  @Default.InstanceFactory(GcsUtilLegacy.GcsReadOptionsFactory.class)

Review Comment:
   How about "GcsUtilV1" (and latest version of the class still as `GcsUtil`)? 
this follows other branched classes naming convention, and in case they both 
exist in Beam code base for extended time and a "GcsUtilV3" appear in the 
future (hopefuly won't be the case).



##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -17,143 +17,70 @@
  */
 package org.apache.beam.sdk.extensions.gcp.util;
 
-import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import 
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest;
-import com.google.api.client.http.HttpHeaders;
 import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.client.http.HttpStatusCodes;
-import com.google.api.client.http.HttpTransport;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
 import com.google.api.services.storage.model.Bucket;
 import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.RewriteResponse;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.auth.Credentials;
-import com.google.auto.value.AutoValue;
-import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
-import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
-import com.google.cloud.hadoop.gcsio.StorageResourceId;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.nio.file.AccessDeniedException;
-import java.nio.file.FileAlreadyExistsException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
-import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
-import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingSeekableByteChannel;
-import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingWritableByteChannel;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.fs.MoveOptions;
-import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
-import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.FluentBackoff;
-import org.apache.beam.sdk.util.MoreFutures;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Provides operations on GCS. */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+
 public class GcsUtil {
+  @VisibleForTesting GcsUtilLegacy delegate;
+
+  public static class GcsCountersOptions {
+    final GcsUtilLegacy.GcsCountersOptions delegate;
+
+    private GcsCountersOptions(GcsUtilLegacy.GcsCountersOptions delegate) {
+      this.delegate = delegate;
+    }
 
-  @AutoValue
-  public abstract static class GcsCountersOptions {
-    public abstract @Nullable String getReadCounterPrefix();
+    public @Nullable String getReadCounterPrefix() {
+      return delegate.getReadCounterPrefix();
+    }
 
-    public abstract @Nullable String getWriteCounterPrefix();
+    public @Nullable String getWriteCounterPrefix() {
+      return delegate.getWriteCounterPrefix();
+    }
 
     public boolean hasAnyPrefix() {
-      return getWriteCounterPrefix() != null || getReadCounterPrefix() != null;
+      return delegate.hasAnyPrefix();
     }
 
     public static GcsCountersOptions create(
         @Nullable String readCounterPrefix, @Nullable String 
writeCounterPrefix) {
-      return new AutoValue_GcsUtil_GcsCountersOptions(readCounterPrefix, 
writeCounterPrefix);
+      return new GcsCountersOptions(
+          GcsUtilLegacy.GcsCountersOptions.create(readCounterPrefix, 
writeCounterPrefix));
     }
   }
 
-  public static class GcsReadOptionsFactory
-      implements DefaultValueFactory<GoogleCloudStorageReadOptions> {
-    @Override
-    public GoogleCloudStorageReadOptions create(PipelineOptions options) {
-      return GoogleCloudStorageReadOptions.DEFAULT;
-    }
-  }
-
-  /**
-   * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} 
using any transport
-   * flags specified on the {@link PipelineOptions}.
-   */
   public static class GcsUtilFactory implements DefaultValueFactory<GcsUtil> {
-    /**
-     * Returns an instance of {@link GcsUtil} based on the {@link 
PipelineOptions}.
-     *
-     * <p>If no instance has previously been created, one is created and the 
value stored in {@code
-     * options}.
-     */
     @Override
     public GcsUtil create(PipelineOptions options) {
-      LOG.debug("Creating new GcsUtil");
       GcsOptions gcsOptions = options.as(GcsOptions.class);
       Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions);
       return new GcsUtil(
           storageBuilder.build(),
           storageBuilder.getHttpRequestInitializer(),
           gcsOptions.getExecutorService(),
-          hasExperiment(options, "use_grpc_for_gcs"),
+          org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment(

Review Comment:
   nit: import hasExperiment



##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilLegacy.java:
##########
@@ -0,0 +1,1448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.client.googleapis.batch.BatchRequest;
+import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import 
com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest;
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpStatusCodes;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.RewriteResponse;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.auth.Credentials;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorage;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions;
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
+import com.google.cloud.hadoop.gcsio.StorageResourceId;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.cloud.hadoop.util.AsyncWriteChannelOptions;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingSeekableByteChannel;
+import 
org.apache.beam.sdk.extensions.gcp.util.channels.CountingWritableByteChannel;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.MoreFutures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Provides operations on GCS. */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class GcsUtilLegacy {

Review Comment:
   shall we make it package private?



##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java:
##########
@@ -276,1177 +113,283 @@ public static boolean isWildcard(GcsPath spec) {
       @Nullable Integer uploadBufferSizeBytes,
       @Nullable Integer rewriteDataOpBatchLimit,
       GcsCountersOptions gcsCountersOptions,
-      GoogleCloudStorageReadOptions gcsReadOptions) {
-    this.storageClient = storageClient;
-    this.httpRequestInitializer = httpRequestInitializer;
-    this.uploadBufferSizeBytes = uploadBufferSizeBytes;
-    this.executorService = executorService;
-    this.credentials = credentials;
-    this.maxBytesRewrittenPerCall = null;
-    this.numRewriteTokensUsed = null;
-    googleCloudStorageOptions =
-        GoogleCloudStorageOptions.builder()
-            .setAppName("Beam")
-            .setReadChannelOptions(gcsReadOptions)
-            .setGrpcEnabled(shouldUseGrpc)
-            .build();
-    googleCloudStorage =
-        createGoogleCloudStorage(googleCloudStorageOptions, storageClient, 
credentials);
-    this.batchRequestSupplier =
-        () -> {
-          // Capture reference to this so that the most recent storageClient 
and initializer
-          // are used.
-          GcsUtil util = this;
-          return new BatchInterface() {
-            final BatchRequest batch = 
util.storageClient.batch(util.httpRequestInitializer);
-
-            @Override
-            public <T> void queue(
-                AbstractGoogleJsonClientRequest<T> request, 
JsonBatchCallback<T> cb)
-                throws IOException {
-              request.queue(batch, cb);
-            }
-
-            @Override
-            public void execute() throws IOException {
-              batch.execute();
-            }
-
-            @Override
-            public int size() {
-              return batch.size();
-            }
-          };
-        };
-    this.rewriteDataOpBatchLimit =
-        rewriteDataOpBatchLimit == null ? MAX_REQUESTS_PER_COPY_BATCH : 
rewriteDataOpBatchLimit;
-    this.gcsCountersOptions = gcsCountersOptions;
+      GcsOptions gcsOptions) {
+    this.delegate =
+        new GcsUtilLegacy(
+            storageClient,
+            httpRequestInitializer,
+            executorService,
+            shouldUseGrpc,
+            credentials,
+            uploadBufferSizeBytes,
+            rewriteDataOpBatchLimit,
+            gcsCountersOptions.delegate,
+            gcsOptions);
   }
 
-  // Use this only for testing purposes.
   protected void setStorageClient(Storage storageClient) {
-    this.storageClient = storageClient;
+    delegate.setStorageClient(storageClient);
   }
 
-  // Use this only for testing purposes.
-  protected void setBatchRequestSupplier(Supplier<BatchInterface> supplier) {
-    this.batchRequestSupplier = supplier;
+  protected void 
setBatchRequestSupplier(Supplier<GcsUtilLegacy.BatchInterface> supplier) {
+    delegate.setBatchRequestSupplier(supplier);
   }
 
-  /**
-   * Expands a pattern into matched paths. The pattern path may contain globs, 
which are expanded in
-   * the result. For patterns that only match a single object, we ensure that 
the object exists.
-   */
   public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
-    Pattern p = null;
-    String prefix = null;
-    if (isWildcard(gcsPattern)) {
-      // Part before the first wildcard character.
-      prefix = getNonWildcardPrefix(gcsPattern.getObject());
-      p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject()));
-    } else {
-      // Not a wildcard.
-      try {
-        // Use a get request to fetch the metadata of the object, and ignore 
the return value.
-        // The request has strong global consistency.
-        getObject(gcsPattern);
-        return ImmutableList.of(gcsPattern);
-      } catch (FileNotFoundException e) {
-        // If the path was not found, return an empty list.
-        return ImmutableList.of();
-      }
-    }
-
-    LOG.debug(
-        "matching files in bucket {}, prefix {} against pattern {}",
-        gcsPattern.getBucket(),
-        prefix,
-        p.toString());
-
-    String pageToken = null;
-    List<GcsPath> results = new ArrayList<>();
-    do {
-      Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken);
-      if (objects.getItems() == null) {
-        break;
-      }
-
-      // Filter objects based on the regex.
-      for (StorageObject o : objects.getItems()) {
-        String name = o.getName();
-        // Skip directories, which end with a slash.
-        if (p.matcher(name).matches() && !name.endsWith("/")) {
-          LOG.debug("Matched object: {}", name);
-          results.add(GcsPath.fromObject(o));
-        }
-      }
-      pageToken = objects.getNextPageToken();
-    } while (pageToken != null);
-
-    return results;
+    return delegate.expand(gcsPattern);
   }
 
   @VisibleForTesting
   @Nullable
   Integer getUploadBufferSizeBytes() {
-    return uploadBufferSizeBytes;
-  }
-
-  private static BackOff createBackOff() {
-    return BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
+    return delegate.getUploadBufferSizeBytes();
   }
 
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException} if 
the resource does not
-   * exist.
-   */
   public long fileSize(GcsPath path) throws IOException {
-    return getObject(path).getSize().longValue();
+    return delegate.fileSize(path);
   }
 
-  /** Returns the {@link StorageObject} for the given {@link GcsPath}. */
   public StorageObject getObject(GcsPath gcsPath) throws IOException {
-    return getObject(gcsPath, createBackOff(), Sleeper.DEFAULT);
+    return delegate.getObject(gcsPath);
   }
 
   @VisibleForTesting
   StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) 
throws IOException {
-    Storage.Objects.Get getObject =
-        storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject());
-    try {
-      return ResilientOperation.retry(
-          getObject::execute, backoff, RetryDeterminer.SOCKET_ERRORS, 
IOException.class, sleeper);
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      if (e instanceof IOException && 
errorExtractor.itemNotFound((IOException) e)) {
-        throw new FileNotFoundException(gcsPath.toString());
-      }
-      throw new IOException(
-          String.format("Unable to get the file object for path %s.", 
gcsPath), e);
-    }
+    return delegate.getObject(gcsPath, backoff, sleeper);
   }
 
-  /**
-   * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} 
for the given {@link
-   * GcsPath GcsPaths}.
-   */
   public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) 
throws IOException {
-    if (gcsPaths.isEmpty()) {
-      return ImmutableList.of();
-    } else if (gcsPaths.size() == 1) {
-      GcsPath path = gcsPaths.get(0);
-      try {
-        StorageObject object = getObject(path);
-        return ImmutableList.of(StorageObjectOrIOException.create(object));
-      } catch (IOException e) {
-        return ImmutableList.of(StorageObjectOrIOException.create(e));
-      } catch (Exception e) {
-        IOException ioException =
-            new IOException(String.format("Error trying to get %s: %s", path, 
e));
-        return 
ImmutableList.of(StorageObjectOrIOException.create(ioException));
-      }
-    }
-
-    List<StorageObjectOrIOException[]> results = new ArrayList<>();
-    executeBatches(makeGetBatches(gcsPaths, results));
-    ImmutableList.Builder<StorageObjectOrIOException> ret = 
ImmutableList.builder();
-    for (StorageObjectOrIOException[] result : results) {
-      ret.add(result[0]);
-    }
-    return ret.build();
+    List<GcsUtilLegacy.StorageObjectOrIOException> legacy = 
delegate.getObjects(gcsPaths);
+    return legacy.stream()
+        .map(StorageObjectOrIOException::fromLegacy)
+        .collect(java.util.stream.Collectors.toList());
   }
 
   public Objects listObjects(String bucket, String prefix, @Nullable String 
pageToken)
       throws IOException {
-    return listObjects(bucket, prefix, pageToken, null);
+    return delegate.listObjects(bucket, prefix, pageToken);
   }
 
-  /**
-   * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code 
pageToken}.
-   *
-   * <p>For more details, see 
https://cloud.google.com/storage/docs/json_api/v1/objects/list.
-   */
   public Objects listObjects(
       String bucket, String prefix, @Nullable String pageToken, @Nullable 
String delimiter)
       throws IOException {
-    // List all objects that start with the prefix (including objects in 
sub-directories).
-    Storage.Objects.List listObject = storageClient.objects().list(bucket);
-    listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL);
-    listObject.setPrefix(prefix);
-    listObject.setDelimiter(delimiter);
-
-    if (pageToken != null) {
-      listObject.setPageToken(pageToken);
-    }
-
-    try {
-      return ResilientOperation.retry(
-          listObject::execute, createBackOff(), RetryDeterminer.SOCKET_ERRORS, 
IOException.class);
-    } catch (Exception e) {
-      throw new IOException(
-          String.format("Unable to match files in bucket %s, prefix %s.", 
bucket, prefix), e);
-    }
+    return delegate.listObjects(bucket, prefix, pageToken, delimiter);
   }
 
-  /**
-   * Returns the file size from GCS or throws {@link FileNotFoundException} if 
the resource does not
-   * exist.
-   */
   @VisibleForTesting
   List<Long> fileSizes(List<GcsPath> paths) throws IOException {
-    List<StorageObjectOrIOException> results = getObjects(paths);
-
-    ImmutableList.Builder<Long> ret = ImmutableList.builder();
-    for (StorageObjectOrIOException result : results) {
-      ret.add(toFileSize(result));
-    }
-    return ret.build();
-  }
-
-  private Long toFileSize(StorageObjectOrIOException 
storageObjectOrIOException)
-      throws IOException {
-    if (storageObjectOrIOException.ioException() != null) {
-      throw storageObjectOrIOException.ioException();
-    } else {
-      return storageObjectOrIOException.storageObject().getSize().longValue();
-    }
-  }
-
-  @VisibleForTesting
-  void setCloudStorageImpl(GoogleCloudStorage g) {
-    googleCloudStorage = g;
+    return delegate.fileSizes(paths);
   }
 
-  @VisibleForTesting
-  void setCloudStorageImpl(GoogleCloudStorageOptions g) {
-    googleCloudStorageOptions = g;
-  }
-
-  /**
-   * Create an integer consumer that updates the counter identified by a 
prefix and a bucket name.
-   */
-  private static Consumer<Integer> createCounterConsumer(String 
counterNamePrefix, String bucket) {
-    return Metrics.counter(GcsUtil.class, String.format("%s_%s", 
counterNamePrefix, bucket))::inc;
-  }
-
-  private WritableByteChannel wrapInCounting(
-      WritableByteChannel writableByteChannel, String bucket) {
-    if (writableByteChannel instanceof CountingWritableByteChannel) {
-      return writableByteChannel;
-    }
-    return Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
-        .<WritableByteChannel>map(
-            prefix -> {
-              LOG.debug(
-                  "wrapping writable byte channel using counter name prefix {} 
and bucket {}",
-                  prefix,
-                  bucket);
-              return new CountingWritableByteChannel(
-                  writableByteChannel, createCounterConsumer(prefix, bucket));
-            })
-        .orElse(writableByteChannel);
-  }
-
-  private SeekableByteChannel wrapInCounting(
-      SeekableByteChannel seekableByteChannel, String bucket) {
-    if (seekableByteChannel instanceof CountingSeekableByteChannel
-        || !gcsCountersOptions.hasAnyPrefix()) {
-      return seekableByteChannel;
-    }
-
-    return new CountingSeekableByteChannel(
-        seekableByteChannel,
-        Optional.ofNullable(gcsCountersOptions.getReadCounterPrefix())
-            .map(
-                prefix -> {
-                  LOG.debug(
-                      "wrapping seekable byte channel with \"bytes read\" 
counter name prefix {}"
-                          + " and bucket {}",
-                      prefix,
-                      bucket);
-                  return createCounterConsumer(prefix, bucket);
-                })
-            .orElse(null),
-        Optional.ofNullable(gcsCountersOptions.getWriteCounterPrefix())
-            .map(
-                prefix -> {
-                  LOG.debug(
-                      "wrapping seekable byte channel with \"bytes written\" 
counter name prefix {}"
-                          + " and bucket {}",
-                      prefix,
-                      bucket);
-                  return createCounterConsumer(prefix, bucket);
-                })
-            .orElse(null));
-  }
-
-  /**
-   * Opens an object in GCS.
-   *
-   * <p>Returns a SeekableByteChannel that provides access to data in the 
bucket.
-   *
-   * @param path the GCS filename to read from
-   * @return a SeekableByteChannel that can read the object data
-   */
   public SeekableByteChannel open(GcsPath path) throws IOException {
-    String bucket = path.getBucket();
-    SeekableByteChannel channel =
-        googleCloudStorage.open(
-            new StorageResourceId(path.getBucket(), path.getObject()),
-            this.googleCloudStorageOptions.getReadChannelOptions());
-    return wrapInCounting(channel, bucket);
-  }
-
-  /**
-   * Opens an object in GCS.
-   *
-   * <p>Returns a SeekableByteChannel that provides access to data in the 
bucket.
-   *
-   * @param path the GCS filename to read from
-   * @param readOptions Fine-grained options for behaviors of retries, 
buffering, etc.
-   * @return a SeekableByteChannel that can read the object data
-   */
-  @VisibleForTesting
-  SeekableByteChannel open(GcsPath path, GoogleCloudStorageReadOptions 
readOptions)
-      throws IOException {
-    HashMap<String, String> baseLabels = new HashMap<>();
-    baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-    baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
-    baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsGet");
-    baseLabels.put(
-        MonitoringInfoConstants.Labels.RESOURCE,
-        GcpResourceIdentifiers.cloudStorageBucket(path.getBucket()));
-    baseLabels.put(
-        MonitoringInfoConstants.Labels.GCS_PROJECT_ID,
-        String.valueOf(googleCloudStorageOptions.getProjectId()));
-    baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, 
path.getBucket());
-
-    ServiceCallMetric serviceCallMetric =
-        new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
baseLabels);
-    try {
-      SeekableByteChannel channel =
-          googleCloudStorage.open(
-              new StorageResourceId(path.getBucket(), path.getObject()), 
readOptions);
-      serviceCallMetric.call("ok");
-      return wrapInCounting(channel, path.getBucket());
-    } catch (IOException e) {
-      if (e.getCause() instanceof GoogleJsonResponseException) {
-        serviceCallMetric.call(((GoogleJsonResponseException) 
e.getCause()).getDetails().getCode());
-      }
-      throw e;
-    }
+    return delegate.open(path);
   }
 
   /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
   @Deprecated
   public WritableByteChannel create(GcsPath path, String type) throws 
IOException {
-    CreateOptions.Builder builder = 
CreateOptions.builder().setContentType(type);
-    return create(path, builder.build());
+    return delegate.create(path, type);
   }
 
   /** @deprecated Use {@link #create(GcsPath, CreateOptions)} instead. */
   @Deprecated
   public WritableByteChannel create(GcsPath path, String type, Integer 
uploadBufferSizeBytes)
       throws IOException {
-    CreateOptions.Builder builder =
-        CreateOptions.builder()
-            .setContentType(type)
-            .setUploadBufferSizeBytes(uploadBufferSizeBytes);
-    return create(path, builder.build());
+    return delegate.create(path, type, uploadBufferSizeBytes);
   }
 
-  @AutoValue
-  public abstract static class CreateOptions {
-    /**
-     * If true, the created file is expected to not exist. Instead of checking 
for file presence
-     * before writing a write exception may occur if the file does exist.
-     */
-    public abstract boolean getExpectFileToNotExist();
+  public static class CreateOptions {
+    final GcsUtilLegacy.CreateOptions delegate;
 
-    /**
-     * If non-null, the upload buffer size to be used. If null, the buffer 
size corresponds to {code
-     * GCSUtil.getUploadBufferSizeBytes}
-     */
-    public abstract @Nullable Integer getUploadBufferSizeBytes();
+    private CreateOptions(GcsUtilLegacy.CreateOptions delegate) {
+      this.delegate = delegate;
+    }
+
+    public boolean getExpectFileToNotExist() {
+      return delegate.getExpectFileToNotExist();
+    }
+
+    public @Nullable Integer getUploadBufferSizeBytes() {
+      return delegate.getUploadBufferSizeBytes();
+    }
 
-    /** The content type for the created file, eg "text/plain". */
-    public abstract @Nullable String getContentType();
+    public @Nullable String getContentType() {
+      return delegate.getContentType();
+    }
 
     public static Builder builder() {
-      return new 
AutoValue_GcsUtil_CreateOptions.Builder().setExpectFileToNotExist(false);
+      return new Builder(GcsUtilLegacy.CreateOptions.builder());
     }
 
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setContentType(String value);
+    public static class Builder {
+      private final GcsUtilLegacy.CreateOptions.Builder delegateBuilder;
 
-      public abstract Builder setUploadBufferSizeBytes(int value);
+      private Builder(GcsUtilLegacy.CreateOptions.Builder delegateBuilder) {
+        this.delegateBuilder = delegateBuilder;
+      }
 
-      public abstract Builder setExpectFileToNotExist(boolean value);
+      public Builder setContentType(String value) {
+        delegateBuilder.setContentType(value);
+        return this;
+      }
 
-      public abstract CreateOptions build();
-    }
-  }
+      public Builder setUploadBufferSizeBytes(int value) {
+        delegateBuilder.setUploadBufferSizeBytes(value);
+        return this;
+      }
 
-  /**
-   * Creates an object in GCS and prepares for uploading its contents.
-   *
-   * @param path the GCS file to write to
-   * @param options to be used for creating and configuring file upload
-   * @return a WritableByteChannel that can be used to write data to the 
object.
-   */
-  public WritableByteChannel create(GcsPath path, CreateOptions options) 
throws IOException {
-    AsyncWriteChannelOptions wcOptions = 
googleCloudStorageOptions.getWriteChannelOptions();
-    @Nullable
-    Integer uploadBufferSizeBytes =
-        options.getUploadBufferSizeBytes() != null
-            ? options.getUploadBufferSizeBytes()
-            : getUploadBufferSizeBytes();
-    if (uploadBufferSizeBytes != null) {
-      wcOptions = 
wcOptions.toBuilder().setUploadChunkSize(uploadBufferSizeBytes).build();
-    }
-    GoogleCloudStorageOptions newGoogleCloudStorageOptions =
-        
googleCloudStorageOptions.toBuilder().setWriteChannelOptions(wcOptions).build();
-    GoogleCloudStorage gcpStorage =
-        createGoogleCloudStorage(
-            newGoogleCloudStorageOptions, this.storageClient, 
this.credentials);
-    StorageResourceId resourceId =
-        new StorageResourceId(
-            path.getBucket(),
-            path.getObject(),
-            // If we expect the file not to exist, we set a generation id of 
0. This avoids a read
-            // to identify the object exists already and should be overwritten.
-            // See {@link GoogleCloudStorage#create(StorageResourceId, 
GoogleCloudStorageOptions)}
-            options.getExpectFileToNotExist() ? 0L : 
StorageResourceId.UNKNOWN_GENERATION_ID);
-    CreateObjectOptions.Builder createBuilder =
-        CreateObjectOptions.builder().setOverwriteExisting(true);
-    if (options.getContentType() != null) {
-      createBuilder = createBuilder.setContentType(options.getContentType());
-    }
+      public Builder setExpectFileToNotExist(boolean value) {
+        delegateBuilder.setExpectFileToNotExist(value);
+        return this;
+      }
 
-    HashMap<String, String> baseLabels = new HashMap<>();
-    baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-    baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Storage");
-    baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "GcsInsert");
-    baseLabels.put(
-        MonitoringInfoConstants.Labels.RESOURCE,
-        GcpResourceIdentifiers.cloudStorageBucket(path.getBucket()));
-    baseLabels.put(
-        MonitoringInfoConstants.Labels.GCS_PROJECT_ID,
-        String.valueOf(googleCloudStorageOptions.getProjectId()));
-    baseLabels.put(MonitoringInfoConstants.Labels.GCS_BUCKET, 
path.getBucket());
-
-    ServiceCallMetric serviceCallMetric =
-        new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, 
baseLabels);
-    try {
-      WritableByteChannel channel = gcpStorage.create(resourceId, 
createBuilder.build());
-      serviceCallMetric.call("ok");
-      return wrapInCounting(channel, path.getBucket());
-    } catch (IOException e) {
-      if (e.getCause() instanceof GoogleJsonResponseException) {
-        serviceCallMetric.call(((GoogleJsonResponseException) 
e.getCause()).getDetails().getCode());
+      public CreateOptions build() {
+        return new CreateOptions(delegateBuilder.build());
       }
-      throw e;
     }
   }
 
-  GoogleCloudStorage createGoogleCloudStorage(
-      GoogleCloudStorageOptions options, Storage storage, Credentials 
credentials) {
-    try {
-      return new GoogleCloudStorageImpl(options, storage, credentials);
-    } catch (NoSuchMethodError e) {
-      // gcs-connector 3.x drops the direct constructor and exclusively uses 
Builder
-      // TODO eliminate reflection once Beam drops Java 8 support and upgrades 
to gcsio 3.x
-      try {
-        final Method builderMethod = 
GoogleCloudStorageImpl.class.getMethod("builder");
-        Object builder = builderMethod.invoke(null);
-        final Class<?> builderClass =
-            Class.forName(
-                
"com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder");
-
-        final Method setOptionsMethod =
-            builderClass.getMethod("setOptions", 
GoogleCloudStorageOptions.class);
-        setOptionsMethod.setAccessible(true);
-        builder = setOptionsMethod.invoke(builder, options);
-
-        final Method setHttpTransportMethod =
-            builderClass.getMethod("setHttpTransport", HttpTransport.class);
-        setHttpTransportMethod.setAccessible(true);
-        builder =
-            setHttpTransportMethod.invoke(builder, 
storage.getRequestFactory().getTransport());
-
-        final Method setCredentialsMethod =
-            builderClass.getMethod("setCredentials", Credentials.class);
-        setCredentialsMethod.setAccessible(true);
-        builder = setCredentialsMethod.invoke(builder, credentials);
-
-        final Method setHttpRequestInitializerMethod =
-            builderClass.getMethod("setHttpRequestInitializer", 
HttpRequestInitializer.class);
-        setHttpRequestInitializerMethod.setAccessible(true);
-        builder = setHttpRequestInitializerMethod.invoke(builder, 
httpRequestInitializer);
-
-        final Method buildMethod = builderClass.getMethod("build");
-        buildMethod.setAccessible(true);
-        return (GoogleCloudStorage) buildMethod.invoke(builder);
-      } catch (Exception reflectionError) {
-        throw new RuntimeException(
-            "Failed to construct GoogleCloudStorageImpl from gcsio 3.x 
Builder", reflectionError);
-      }
-    }
+  public WritableByteChannel create(GcsPath path, CreateOptions options) 
throws IOException {
+    return delegate.create(path, options.delegate);
   }
 
-  /**
-   * Checks whether the GCS bucket exists. Similar to {@link 
#bucketAccessible(GcsPath)}, but throws
-   * exception if the bucket is inaccessible due to permissions or does not 
exist.
-   */
   public void verifyBucketAccessible(GcsPath path) throws IOException {
-    verifyBucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
+    delegate.verifyBucketAccessible(path);
   }
 
-  /** Returns whether the GCS bucket exists and is accessible. */
   public boolean bucketAccessible(GcsPath path) throws IOException {
-    return bucketAccessible(path, createBackOff(), Sleeper.DEFAULT);
+    return delegate.bucketAccessible(path);
   }
 
-  /**
-   * Returns the project number of the project which owns this bucket. If the 
bucket exists, it must
-   * be accessible otherwise the permissions exception will be propagated. If 
the bucket does not
-   * exist, an exception will be thrown.
-   */
   public long bucketOwner(GcsPath path) throws IOException {
-    return getBucket(path, createBackOff(), 
Sleeper.DEFAULT).getProjectNumber().longValue();
+    return delegate.bucketOwner(path);
   }
 
-  /**
-   * Creates a {@link Bucket} under the specified project in Cloud Storage or 
propagates an
-   * exception.
-   */
   public void createBucket(String projectId, Bucket bucket) throws IOException 
{
-    createBucket(projectId, bucket, createBackOff(), Sleeper.DEFAULT);
+    delegate.createBucket(projectId, bucket);
   }
 
-  /** Get the {@link Bucket} from Cloud Storage path or propagates an 
exception. */
-  @Nullable
-  public Bucket getBucket(GcsPath path) throws IOException {
-    return getBucket(path, createBackOff(), Sleeper.DEFAULT);
+  public @Nullable Bucket getBucket(GcsPath path) throws IOException {
+    return delegate.getBucket(path);
   }
 
-  /** Remove an empty {@link Bucket} in Cloud Storage or propagates an 
exception. */
   public void removeBucket(Bucket bucket) throws IOException {
-    removeBucket(bucket, createBackOff(), Sleeper.DEFAULT);
+    delegate.removeBucket(bucket);
   }
 
-  /**
-   * Returns whether the GCS bucket exists. This will return false if the 
bucket is inaccessible due
-   * to permissions.
-   */
   @VisibleForTesting
   boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) 
throws IOException {
-    try {
-      return getBucket(path, backoff, sleeper) != null;
-    } catch (AccessDeniedException | FileNotFoundException e) {
-      return false;
-    }
+    return delegate.bucketAccessible(path, backoff, sleeper);
   }
 
-  /**
-   * Checks whether the GCS bucket exists. Similar to {@link 
#bucketAccessible(GcsPath, BackOff,
-   * Sleeper)}, but throws exception if the bucket is inaccessible due to 
permissions or does not
-   * exist.
-   */
   @VisibleForTesting
   void verifyBucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) 
throws IOException {
-    getBucket(path, backoff, sleeper);
+    delegate.verifyBucketAccessible(path, backoff, sleeper);
   }
 
   @VisibleForTesting
   @Nullable
   Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws 
IOException {
-    Storage.Buckets.Get getBucket = 
storageClient.buckets().get(path.getBucket());
-
-    try {
-      return ResilientOperation.retry(
-          getBucket::execute,
-          backoff,
-          new RetryDeterminer<IOException>() {
-            @Override
-            public boolean shouldRetry(IOException e) {
-              if (errorExtractor.itemNotFound(e) || 
errorExtractor.accessDenied(e)) {
-                return false;
-              }
-              return RETRY_DETERMINER.shouldRetry(e);
-            }
-          },
-          IOException.class,
-          sleeper);
-    } catch (GoogleJsonResponseException e) {
-      if (errorExtractor.accessDenied(e)) {
-        throw new AccessDeniedException(path.toString(), null, e.getMessage());
-      }
-      if (errorExtractor.itemNotFound(e)) {
-        throw new FileNotFoundException(e.getMessage());
-      }
-      throw e;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-          String.format(
-              "Error while attempting to verify existence of bucket gs://%s", 
path.getBucket()),
-          e);
-    }
+    return delegate.getBucket(path, backoff, sleeper);
   }
 
   @VisibleForTesting
   void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper 
sleeper)
       throws IOException {
-    Storage.Buckets.Insert insertBucket = 
storageClient.buckets().insert(projectId, bucket);
-    insertBucket.setPredefinedAcl("projectPrivate");
-    insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
-
-    try {
-      ResilientOperation.retry(
-          insertBucket::execute,
-          backoff,
-          new RetryDeterminer<IOException>() {
-            @Override
-            public boolean shouldRetry(IOException e) {
-              if (errorExtractor.itemAlreadyExists(e) || 
errorExtractor.accessDenied(e)) {
-                return false;
-              }
-              return RETRY_DETERMINER.shouldRetry(e);
-            }
-          },
-          IOException.class,
-          sleeper);
-      return;
-    } catch (GoogleJsonResponseException e) {
-      if (errorExtractor.accessDenied(e)) {
-        throw new AccessDeniedException(bucket.getName(), null, 
e.getMessage());
-      }
-      if (errorExtractor.itemAlreadyExists(e)) {
-        throw new FileAlreadyExistsException(bucket.getName(), null, 
e.getMessage());
-      }
-      throw e;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-          String.format(
-              "Error while attempting to create bucket gs://%s for project %s",
-              bucket.getName(), projectId),
-          e);
-    }
+    delegate.createBucket(projectId, bucket, backoff, sleeper);
   }
 
   @VisibleForTesting
   void removeBucket(Bucket bucket, BackOff backoff, Sleeper sleeper) throws 
IOException {
-    Storage.Buckets.Delete getBucket = 
storageClient.buckets().delete(bucket.getName());
-
-    try {
-      ResilientOperation.retry(
-          getBucket::execute,
-          backoff,
-          new RetryDeterminer<IOException>() {
-            @Override
-            public boolean shouldRetry(IOException e) {
-              if (errorExtractor.itemNotFound(e) || 
errorExtractor.accessDenied(e)) {
-                return false;
-              }
-              return RETRY_DETERMINER.shouldRetry(e);
-            }
-          },
-          IOException.class,
-          sleeper);
-    } catch (GoogleJsonResponseException e) {
-      if (errorExtractor.accessDenied(e)) {
-        throw new AccessDeniedException(bucket.getName(), null, 
e.getMessage());
-      }
-      if (errorExtractor.itemNotFound(e)) {
-        throw new FileNotFoundException(e.getMessage());
-      }
-      throw e;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-          String.format("Error while attempting to remove bucket gs://%s", 
bucket.getName()), e);
-    }
+    delegate.removeBucket(bucket, backoff, sleeper);
   }
 
-  private static void executeBatches(List<BatchInterface> batches) throws 
IOException {
-    ExecutorService executor =
-        MoreExecutors.listeningDecorator(
-            new ThreadPoolExecutor(
-                MAX_CONCURRENT_BATCHES,
-                MAX_CONCURRENT_BATCHES,
-                0L,
-                TimeUnit.MILLISECONDS,
-                new LinkedBlockingQueue<>()));
-
-    List<CompletionStage<Void>> futures = new ArrayList<>();
-    for (final BatchInterface batch : batches) {
-      futures.add(MoreFutures.runAsync(batch::execute, executor));
-    }
-
-    try {
-      try {
-        MoreFutures.get(MoreFutures.allOf(futures));
-      } catch (ExecutionException e) {
-        if (e.getCause() instanceof FileNotFoundException) {
-          throw (FileNotFoundException) e.getCause();
-        }
-        throw new IOException("Error executing batch GCS request", e);
-      } finally {
-        // Give the other batches a chance to complete in error cases.
-        executor.shutdown();
-        if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
-          LOG.warn("Taking over 5 minutes to flush gcs op batches after 
error");
-          executor.shutdownNow();
-          if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
-            LOG.warn("Took over 10 minutes to flush gcs op batches after error 
and interruption.");
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while executing batch GCS request", 
e);
-    }
-  }
-
-  /**
-   * Makes get {@link BatchInterface BatchInterfaces}.
-   *
-   * @param paths {@link GcsPath GcsPaths}.
-   * @param results mutable {@link List} for return values.
-   * @return {@link BatchInterface BatchInterfaces} to execute.
-   * @throws IOException
-   */
   @VisibleForTesting
-  List<BatchInterface> makeGetBatches(
+  List<GcsUtilLegacy.BatchInterface> makeGetBatches(
       Collection<GcsPath> paths, List<StorageObjectOrIOException[]> results) 
throws IOException {
-    List<BatchInterface> batches = new ArrayList<>();
-    for (List<GcsPath> filesToGet :
-        Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {
-      BatchInterface batch = batchRequestSupplier.get();
-      for (GcsPath path : filesToGet) {
-        results.add(enqueueGetFileSize(path, batch));
-      }
-      batches.add(batch);
-    }
-    return batches;
-  }
-
-  /**
-   * Wrapper for rewriting that supports multiple calls as well as possibly 
deleting the source
-   * file.
-   *
-   * <p>Usage: create, enqueue(), and execute batch. Then, check 
getReadyToEnqueue() if another
-   * round of enqueue() and execute is required. Repeat until 
getReadyToEnqueue() returns false.
-   */
-  class RewriteOp extends JsonBatchCallback<RewriteResponse> {
-    private final GcsPath from;
-    private final GcsPath to;
-    private final boolean deleteSource;
-    private final boolean ignoreMissingSource;
-    private boolean readyToEnqueue;
-    private boolean performDelete;
-    private @Nullable GoogleJsonError lastError;
-    @VisibleForTesting Storage.Objects.Rewrite rewriteRequest;
-
-    public boolean getReadyToEnqueue() {
-      return readyToEnqueue;
-    }
-
-    public @Nullable GoogleJsonError getLastError() {
-      return lastError;
-    }
-
-    public GcsPath getFrom() {
-      return from;
-    }
-
-    public GcsPath getTo() {
-      return to;
-    }
-
-    public boolean isMetadataOperation() {
-      return performDelete || from.getBucket().equals(to.getBucket());
-    }
-
-    public void enqueue(BatchInterface batch) throws IOException {
-      if (!readyToEnqueue) {
-        throw new IOException(
-            String.format(
-                "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s",
-                from, to, readyToEnqueue));
-      }
-      if (!performDelete) {
-        batch.queue(rewriteRequest, this);
-        return;
-      }
-      Storage.Objects.Delete deleteRequest =
-          storageClient.objects().delete(from.getBucket(), from.getObject());
-      batch.queue(
-          deleteRequest,
-          new JsonBatchCallback<Void>() {
-            @Override
-            public void onSuccess(Void obj, HttpHeaders responseHeaders) {
-              LOG.debug("Successfully deleted {} after moving to {}", from, 
to);
-              readyToEnqueue = false;
-              lastError = null;
-            }
-
-            @Override
-            public void onFailure(GoogleJsonError e, HttpHeaders 
responseHeaders)
-                throws IOException {
-              if (e.getCode() == 404) {
-                LOG.info(
-                    "Ignoring failed deletion of moved file {} which already 
does not exist: {}",
-                    from,
-                    e);
-                readyToEnqueue = false;
-                lastError = null;
-              } else {
-                readyToEnqueue = true;
-                lastError = e;
-              }
-            }
-          });
-    }
-
-    public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean 
ignoreMissingSource)
-        throws IOException {
-      this.from = from;
-      this.to = to;
-      this.deleteSource = deleteSource;
-      this.ignoreMissingSource = ignoreMissingSource;
-      rewriteRequest =
-          storageClient
-              .objects()
-              .rewrite(from.getBucket(), from.getObject(), to.getBucket(), 
to.getObject(), null);
-      if (maxBytesRewrittenPerCall != null) {
-        rewriteRequest.setMaxBytesRewrittenPerCall(maxBytesRewrittenPerCall);
-      }
-      readyToEnqueue = true;
-    }
+    List<GcsUtilLegacy.StorageObjectOrIOException[]> legacyResults = new 
java.util.ArrayList<>();
+    List<GcsUtilLegacy.BatchInterface> legacyBatch = 
delegate.makeGetBatches(paths, legacyResults);
 
-    @Override
-    public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders 
responseHeaders)
-        throws IOException {
-      lastError = null;
-      if (rewriteResponse.getDone()) {
-        if (deleteSource) {
-          readyToEnqueue = true;
-          performDelete = true;
-        } else {
-          readyToEnqueue = false;
-        }
-      } else {
-        LOG.debug(
-            "Rewrite progress: {} of {} bytes, {} to {}",
-            rewriteResponse.getTotalBytesRewritten(),
-            rewriteResponse.getObjectSize(),
-            from,
-            to);
-        rewriteRequest.setRewriteToken(rewriteResponse.getRewriteToken());
-        readyToEnqueue = true;
-        if (numRewriteTokensUsed != null) {
-          numRewriteTokensUsed.incrementAndGet();
-        }
+    for (GcsUtilLegacy.StorageObjectOrIOException[] legacyResult : 
legacyResults) {
+      StorageObjectOrIOException[] result = new 
StorageObjectOrIOException[legacyResult.length];
+      for (int i = 0; i < legacyResult.length; ++i) {
+        result[i] = StorageObjectOrIOException.fromLegacy(legacyResult[i]);
       }
+      results.add(result);
     }
 
-    @Override
-    public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) 
throws IOException {
-      if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
-        if (ignoreMissingSource) {
-          // Treat a missing source as a successful rewrite.
-          readyToEnqueue = false;
-          lastError = null;
-        } else {
-          throw new FileNotFoundException(
-              String.format(
-                  "Rewrite from %s to %s has failed. Either source or sink not 
found. "
-                      + "Failed with error: %s",
-                  from.toString(), to.toString(), e.getMessage()));
-        }
-      } else if (e.getCode() == 403
-          && e.getErrors().size() == 1
-          && e.getErrors().get(0).getReason().equals("retentionPolicyNotMet")) 
{
-        List<StorageObjectOrIOException> srcAndDestObjects = 
getObjects(Arrays.asList(from, to));
-        String srcHash = srcAndDestObjects.get(0).storageObject().getMd5Hash();
-        String destHash = 
srcAndDestObjects.get(1).storageObject().getMd5Hash();
-        if (srcHash != null && srcHash.equals(destHash)) {
-          // Source and destination are identical. Treat this as a successful 
rewrite
-          LOG.warn(
-              "Caught retentionPolicyNotMet error while rewriting to a bucket 
with retention "
-                  + "policy. Skipping because destination {} and source {} are 
considered identical "
-                  + "because their MD5 Hashes are equal.",
-              getFrom(),
-              getTo());
-
-          if (deleteSource) {
-            readyToEnqueue = true;
-            performDelete = true;
-          } else {
-            readyToEnqueue = false;
-          }
-          lastError = null;
-        } else {
-          // User is attempting to write to a file that hasn't met its 
retention policy yet.
-          // Not a transient error so likely will not be fixed by a retry
-          throw new IOException(e.getMessage());
-        }
-      } else {
-        lastError = e;
-        readyToEnqueue = true;
-      }
-    }
+    return legacyBatch;
   }
 
   public void copy(Iterable<String> srcFilenames, Iterable<String> 
destFilenames)
       throws IOException {
-    rewriteHelper(
-        srcFilenames,
-        destFilenames,
-        /*deleteSource=*/ false,
-        /*ignoreMissingSource=*/ false,
-        /*ignoreExistingDest=*/ false);
+    delegate.copy(srcFilenames, destFilenames);
   }
 
   public void rename(
       Iterable<String> srcFilenames, Iterable<String> destFilenames, 
MoveOptions... moveOptions)
       throws IOException {
-    // Rename is implemented as a rewrite followed by deleting the source. If 
the new object is in
-    // the same location, the copy is a metadata-only operation.
-    Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
-    final boolean ignoreMissingSrc =
-        moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES);
-    final boolean ignoreExistingDest =
-        moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
-    rewriteHelper(
-        srcFilenames, destFilenames, /*deleteSource=*/ true, ignoreMissingSrc, 
ignoreExistingDest);
-  }
-
-  private void rewriteHelper(
-      Iterable<String> srcFilenames,
-      Iterable<String> destFilenames,
-      boolean deleteSource,
-      boolean ignoreMissingSource,
-      boolean ignoreExistingDest)
-      throws IOException {
-    LinkedList<RewriteOp> rewrites =
-        makeRewriteOps(
-            srcFilenames, destFilenames, deleteSource, ignoreMissingSource, 
ignoreExistingDest);
-    org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff();
-    while (true) {
-      List<BatchInterface> batches = makeRewriteBatches(rewrites); // Removes 
completed rewrite ops.
-      if (batches.isEmpty()) {
-        break;
-      }
-      Preconditions.checkState(!rewrites.isEmpty());
-      RewriteOp sampleErrorOp =
-          rewrites.stream().filter(op -> op.getLastError() != 
null).findFirst().orElse(null);
-      if (sampleErrorOp != null) {
-        long backOffMillis = backoff.nextBackOffMillis();
-        if (backOffMillis == org.apache.beam.sdk.util.BackOff.STOP) {
-          throw new IOException(
-              String.format(
-                  "Error completing file copies with retries, sample: from %s 
to %s due to %s",
-                  sampleErrorOp.getFrom().toString(),
-                  sampleErrorOp.getTo().toString(),
-                  sampleErrorOp.getLastError()));
-        }
-        LOG.warn(
-            "Retrying with backoff unsuccessful copy requests, sample request: 
from {} to {} due to {}",
-            sampleErrorOp.getFrom(),
-            sampleErrorOp.getTo(),
-            sampleErrorOp.getLastError());
-        try {
-          Thread.sleep(backOffMillis);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException(
-              String.format(
-                  "Interrupted backoff of file copies with retries, sample: 
from %s to %s due to %s",
-                  sampleErrorOp.getFrom().toString(),
-                  sampleErrorOp.getTo().toString(),
-                  sampleErrorOp.getLastError()));
-        }
-      }
-      executeBatches(batches);
-    }
+    delegate.rename(srcFilenames, destFilenames, moveOptions);
   }
 
-  LinkedList<RewriteOp> makeRewriteOps(
+  @VisibleForTesting
+  @SuppressWarnings("JdkObsolete")

Review Comment:
   please add comments for "SuppressWarnings". Here it is // for LinkedList
   
   Also this appears only in higher version of Java. That's likely why it is 
not captured before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to