This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch 
revert-31451-expansion_service_upgrade_2_dependencies
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5f1b33b5fa0acc078ccfd0e3c1f92c62f133b6e0
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Wed Jun 12 15:30:03 2024 +0200

    Revert "Updates Expansion Service Container to support upgrading using the 
sc…"
    
    This reverts commit 7b6f9415c108924732f10b4645cd2ecc2a81482a.
---
 .../IO_Iceberg_Integration_Tests.json              |   3 -
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 -
 .../construction/ExternalTranslationOptions.java   |   4 +-
 .../util/construction/PTransformTranslation.java   |   2 +-
 .../sdk/util/construction/TransformUpgrader.java   |  56 +++------
 .../util/construction/TransformUpgraderTest.java   | 139 +++------------------
 .../container/expansion_service_config.yml         |   3 -
 .../sdk/expansion/service/ExpansionService.java    |  67 +++-------
 sdks/java/io/expansion-service/build.gradle        |   5 -
 sdks/java/io/iceberg/build.gradle                  |   4 +-
 .../managed/ManagedSchemaTransformProvider.java    |  63 ++++------
 11 files changed, 78 insertions(+), 270 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
deleted file mode 100644
index a03c067d2c4..00000000000
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{
-    "comment": "Modify this file in a trivial way to cause this test suite to 
run"
-}
diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 65fcf733340..677e1763144 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -693,9 +693,7 @@ class BeamModulePlugin implements Plugin<Project> {
         aws_java_sdk2_profiles                      : 
"software.amazon.awssdk:profiles:$aws_java_sdk2_version",
         azure_sdk_bom                               : 
"com.azure:azure-sdk-bom:1.2.14",
         bigdataoss_gcsio                            : 
"com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
-        bigdataoss_gcs_connector                    : 
"com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version",
         bigdataoss_util                             : 
"com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
-        bigdataoss_util_hadoop                      : 
"com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version",
         byte_buddy                                  : 
"net.bytebuddy:byte-buddy:1.14.12",
         cassandra_driver_core                       : 
"com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
         cassandra_driver_mapping                    : 
"com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java
index f444d2e3e30..5ed14faf31e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java
@@ -25,9 +25,7 @@ import 
org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions.EmptyLi
 
 public interface ExternalTranslationOptions extends PipelineOptions {
 
-  @Description(
-      "Set of URNs of transforms to be overriden using the transform service. 
The provided strings "
-          + "can be transform URNs of schema-transform IDs")
+  @Description("Set of URNs of transforms to be overriden using the transform 
service.")
   @Default.InstanceFactory(EmptyListDefault.class)
   List<String> getTransformsToOverride();
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
index 3d914272373..e2b6d95057f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java
@@ -535,7 +535,7 @@ public class PTransformTranslation {
             if (underlyingIdentifier == null) {
               throw new IllegalStateException(
                   String.format(
-                      "Encountered a Managed Transform that has an empty 
\"transform_identifier\": %n%s",
+                      "Encountered a Managed Transform that has an empty 
\"transform_identifier\": \n%s",
                       configRow));
             }
             transformBuilder.putAnnotations(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
index f88d55a1f5c..941a5daf689 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.util.construction;
 
-import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
-
 import com.fasterxml.jackson.core.Version;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -53,7 +51,6 @@ import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayl
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
-import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannelBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
@@ -116,22 +113,6 @@ public class TransformUpgrader implements AutoCloseable {
                   if (urn != null && urnsToOverride.contains(urn)) {
                     return true;
                   }
-
-                  // Also check if the URN is a schema-transform ID.
-                  if (urn.equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
-                    try {
-                      ExternalTransforms.SchemaTransformPayload 
schemaTransformPayload =
-                          ExternalTransforms.SchemaTransformPayload.parseFrom(
-                              entry.getValue().getSpec().getPayload());
-                      String schemaTransformId = 
schemaTransformPayload.getIdentifier();
-                      if (urnsToOverride.contains(schemaTransformId)) {
-                        return true;
-                      }
-                    } catch (InvalidProtocolBufferException e) {
-                      throw new RuntimeException(e);
-                    }
-                  }
-
                   return false;
                 })
             .map(
@@ -203,27 +184,20 @@ public class TransformUpgrader implements AutoCloseable {
     if (transformToUpgrade == null) {
       throw new IllegalArgumentException("Could not find a transform with the 
ID " + transformId);
     }
-
-    byte[] payloadBytes = null;
-
-    if 
(!transformToUpgrade.getSpec().getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM)))
 {
-      ByteString configRowBytes =
-          transformToUpgrade.getAnnotationsOrThrow(
-              
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY));
-      ByteString configRowSchemaBytes =
-          transformToUpgrade.getAnnotationsOrThrow(
-              
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY));
-      SchemaApi.Schema configRowSchemaProto =
-          SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray());
-      payloadBytes =
-          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
-              .setSchema(configRowSchemaProto)
-              .setPayload(configRowBytes)
-              .build()
-              .toByteArray();
-    } else {
-      payloadBytes = transformToUpgrade.getSpec().getPayload().toByteArray();
-    }
+    ByteString configRowBytes =
+        transformToUpgrade.getAnnotationsOrThrow(
+            
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY));
+    ByteString configRowSchemaBytes =
+        transformToUpgrade.getAnnotationsOrThrow(
+            
BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY));
+    SchemaApi.Schema configRowSchemaProto =
+        SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray());
+
+    ExternalTransforms.ExternalConfigurationPayload payload =
+        ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+            .setSchema(configRowSchemaProto)
+            .setPayload(configRowBytes)
+            .build();
 
     RunnerApi.PTransform.Builder ptransformBuilder =
         RunnerApi.PTransform.newBuilder()
@@ -231,7 +205,7 @@ public class TransformUpgrader implements AutoCloseable {
             .setSpec(
                 RunnerApi.FunctionSpec.newBuilder()
                     .setUrn(transformToUpgrade.getSpec().getUrn())
-                    .setPayload(ByteString.copyFrom(payloadBytes))
+                    .setPayload(ByteString.copyFrom(payload.toByteArray()))
                     .build());
 
     for (Map.Entry<String, String> entry : 
transformToUpgrade.getInputsMap().entrySet()) {
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java
index a54383b46e7..65b3e8e89ca 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java
@@ -39,22 +39,16 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
-import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
-import 
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
-import 
org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -164,60 +158,6 @@ public class TransformUpgraderTest {
     }
   }
 
-  public static class TestSchemaTransformProvider implements 
SchemaTransformProvider {
-
-    @Override
-    public String identifier() {
-      return "dummy_schema_transform";
-    }
-
-    @Override
-    public Schema configurationSchema() {
-      return Schema.builder().build();
-    }
-
-    @Override
-    public SchemaTransform from(Row configuration) {
-      return new TestSchemaTransform();
-    }
-  }
-
-  public static class TestSchemaTransform extends SchemaTransform {
-
-    @Override
-    public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      return input;
-    }
-  }
-
-  static class TestSchemaTransformTranslator
-      extends SchemaTransformPayloadTranslator<TestSchemaTransform> {
-    @Override
-    public SchemaTransformProvider provider() {
-      return new TestSchemaTransformProvider();
-    }
-
-    @Override
-    public Row toConfigRow(TestSchemaTransform transform) {
-      return Row.withSchema(Schema.builder().build()).build();
-    }
-  }
-
-  @AutoService(TransformPayloadTranslatorRegistrar.class)
-  public static class TestSchemaTransformPayloadTranslatorRegistrar
-      implements TransformPayloadTranslatorRegistrar {
-    @Override
-    @SuppressWarnings({
-      "rawtypes",
-    })
-    public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
-        getTransformPayloadTranslators() {
-      return ImmutableMap.<Class<? extends PTransform>, 
TransformPayloadTranslator>builder()
-          .put(TestSchemaTransform.class, new TestSchemaTransformTranslator())
-          .build();
-    }
-  }
-
   static class TestExpansionServiceClientFactory implements 
ExpansionServiceClientFactory {
     ExpansionApi.ExpansionResponse response;
 
@@ -243,18 +183,6 @@ public class TransformUpgraderTest {
                     .getTransformsMap()
                     .get("TransformUpgraderTest-TestTransform2");
           }
-
-          boolean schemaTransformTest = false;
-          if (transformToUpgrade == null) {
-            // This is running a schema-transform test.
-            transformToUpgrade =
-                request
-                    .getComponents()
-                    .getTransformsMap()
-                    .get("TransformUpgraderTest-TestSchemaTransform");
-            schemaTransformTest = true;
-          }
-
           if (!transformToUpgrade
               .getSpec()
               .getUrn()
@@ -262,30 +190,27 @@ public class TransformUpgraderTest {
             throw new RuntimeException("Could not find a valid transform to 
upgrade");
           }
 
+          Integer oldParam;
+          try {
+            ByteArrayInputStream byteArrayInputStream =
+                new 
ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray());
+            ObjectInputStream objectInputStream = new 
ObjectInputStream(byteArrayInputStream);
+            oldParam = (Integer) objectInputStream.readObject();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+
           RunnerApi.PTransform.Builder upgradedTransform = 
transformToUpgrade.toBuilder();
           FunctionSpec.Builder specBuilder = 
upgradedTransform.getSpecBuilder();
 
-          if (!schemaTransformTest) {
-            Integer oldParam;
-            try {
-              ByteArrayInputStream byteArrayInputStream =
-                  new 
ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray());
-              ObjectInputStream objectInputStream = new 
ObjectInputStream(byteArrayInputStream);
-              oldParam = (Integer) objectInputStream.readObject();
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-
-            ByteStringOutputStream byteStringOutputStream = new 
ByteStringOutputStream();
-            try {
-              ObjectOutputStream objectOutputStream =
-                  new ObjectOutputStream(byteStringOutputStream);
-              objectOutputStream.writeObject(oldParam * 2);
-              objectOutputStream.flush();
-              specBuilder.setPayload(byteStringOutputStream.toByteString());
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
+          ByteStringOutputStream byteStringOutputStream = new 
ByteStringOutputStream();
+          try {
+            ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(byteStringOutputStream);
+            objectOutputStream.writeObject(oldParam * 2);
+            objectOutputStream.flush();
+            specBuilder.setPayload(byteStringOutputStream.toByteString());
+          } catch (IOException e) {
+            throw new RuntimeException(e);
           }
 
           upgradedTransform.setSpec(specBuilder.build());
@@ -366,34 +291,6 @@ public class TransformUpgraderTest {
     
assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY));
   }
 
-  @Test
-  public void testTransformUpgradeSchemaTransform() throws Exception {
-    Pipeline pipeline = Pipeline.create();
-
-    // Build the pipeline
-    PCollectionRowTuple.empty(pipeline).apply(new TestSchemaTransform());
-
-    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, 
false);
-    ExternalTranslationOptions options =
-        PipelineOptionsFactory.create().as(ExternalTranslationOptions.class);
-    List<String> urnsToOverride = ImmutableList.of("dummy_schema_transform");
-    options.setTransformsToOverride(urnsToOverride);
-    options.setTransformServiceAddress("dummyaddress");
-
-    RunnerApi.Pipeline upgradedPipelineProto =
-        TransformUpgrader.of(new TestExpansionServiceClientFactory())
-            .upgradeTransformsViaTransformService(pipelineProto, 
urnsToOverride, options);
-
-    RunnerApi.PTransform upgradedTransform =
-        upgradedPipelineProto
-            .getComponents()
-            .getTransformsMap()
-            .get("TransformUpgraderTest-TestSchemaTransform");
-
-    // Confirm that the upgraded transform includes the upgrade annotation.
-    
assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY));
-  }
-
   @Test
   public void testTransformUpgradeMultipleOccurrences() throws Exception {
     Pipeline pipeline = Pipeline.create();
diff --git a/sdks/java/expansion-service/container/expansion_service_config.yml 
b/sdks/java/expansion-service/container/expansion_service_config.yml
index 4f48efd5947..653629aa153 100644
--- a/sdks/java/expansion-service/container/expansion_service_config.yml
+++ b/sdks/java/expansion-service/container/expansion_service_config.yml
@@ -17,9 +17,6 @@ allowlist:
   - "beam:transform:org.apache.beam:schemaio_jdbc_read:v1"
   - "beam:transform:org.apache.beam:schemaio_jdbc_write:v1"
   - "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
-# By default, the Expansion Service container will include all dependencies in
-# the classpath. Following config can be used to override this behavior per
-# transform URN or schema-transform ID.
 dependencies:
   "beam:transform:org.apache.beam:kafka_read_with_metadata:v1":
     - path: "jars/beam-sdks-java-io-expansion-service.jar"
diff --git 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
index 770da14fa1c..5d436451178 100644
--- 
a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
+++ 
b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
@@ -81,7 +81,6 @@ import 
org.apache.beam.sdk.util.construction.RehydratedComponents;
 import org.apache.beam.sdk.util.construction.SdkComponents;
 import org.apache.beam.sdk.util.construction.SplittableParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
@@ -247,11 +246,6 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
 
     private final TransformPayloadTranslator<PTransform<InputT, OutputT>> 
payloadTranslator;
 
-    // Returns true if the underlying transform represented by this is a 
schema-aware transform.
-    private boolean isSchemaTransform() {
-      return (payloadTranslator instanceof SchemaTransformPayloadTranslator);
-    }
-
     private TransformProviderForPayloadTranslator(
         TransformPayloadTranslator<PTransform<InputT, OutputT>> 
payloadTranslator) {
       this.payloadTranslator = payloadTranslator;
@@ -260,51 +254,28 @@ public class ExpansionService extends 
ExpansionServiceGrpc.ExpansionServiceImplB
     @Override
     public PTransform<InputT, OutputT> getTransform(
         RunnerApi.FunctionSpec spec, PipelineOptions options) {
-      if (isSchemaTransform()) {
-        return ExpansionServiceSchemaTransformProvider.of().getTransform(spec, 
options);
-      } else {
-        try {
-          ExternalConfigurationPayload payload =
-              ExternalConfigurationPayload.parseFrom(spec.getPayload());
-          Row configRow =
-              
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
-                  .decode(new 
ByteArrayInputStream(payload.getPayload().toByteArray()));
-          PTransform transformFromRow = 
payloadTranslator.fromConfigRow(configRow, options);
-          if (transformFromRow != null) {
-            return transformFromRow;
-          } else {
-            throw new RuntimeException(
-                String.format(
-                    "A transform cannot be initiated using the provided config 
row %s and the"
-                        + " TransformPayloadTranslator %s",
-                    configRow, payloadTranslator));
-          }
-        } catch (Exception e) {
+      try {
+        ExternalConfigurationPayload payload =
+            ExternalConfigurationPayload.parseFrom(spec.getPayload());
+        Row configRow =
+            RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
+                .decode(new 
ByteArrayInputStream(payload.getPayload().toByteArray()));
+        PTransform transformFromRow = 
payloadTranslator.fromConfigRow(configRow, options);
+        if (transformFromRow != null) {
+          return transformFromRow;
+        } else {
           throw new RuntimeException(
               String.format(
-                  "Failed to build transform %s from spec %s: %s",
-                  spec.getUrn(), spec, e.getMessage()),
-              e);
+                  "A transform cannot be initiated using the provided config 
row %s and the"
+                      + " TransformPayloadTranslator %s",
+                  configRow, payloadTranslator));
         }
-      }
-    }
-
-    @Override
-    public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
-      if (isSchemaTransform()) {
-        return (InputT) 
ExpansionServiceSchemaTransformProvider.of().createInput(p, inputs);
-      } else {
-        return TransformProvider.super.createInput(p, inputs);
-      }
-    }
-
-    @Override
-    public Map<String, PCollection<?>> extractOutputs(OutputT output) {
-      if (isSchemaTransform()) {
-        return ExpansionServiceSchemaTransformProvider.of()
-            .extractOutputs((PCollectionRowTuple) output);
-      } else {
-        return TransformProvider.super.extractOutputs(output);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format(
+                "Failed to build transform %s from spec %s: %s",
+                spec.getUrn(), spec, e.getMessage()),
+            e);
       }
     }
 
diff --git a/sdks/java/io/expansion-service/build.gradle 
b/sdks/java/io/expansion-service/build.gradle
index 95a843e51fd..e9fe4c1fe63 100644
--- a/sdks/java/io/expansion-service/build.gradle
+++ b/sdks/java/io/expansion-service/build.gradle
@@ -43,11 +43,6 @@ dependencies {
   permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
   implementation project(":sdks:java:io:kafka:upgrade")
   permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
-
-  // Needed by Iceberg I/O users that use GCS for the warehouse location.
-  implementation library.java.bigdataoss_gcs_connector
-  permitUnusedDeclared library.java.bigdataoss_gcs_connector
-
   runtimeOnly library.java.kafka_clients
   runtimeOnly library.java.slf4j_jdk14
 }
diff --git a/sdks/java/io/iceberg/build.gradle 
b/sdks/java/io/iceberg/build.gradle
index 7965cde86e7..765fa494875 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -57,8 +57,8 @@ dependencies {
 
     testImplementation library.java.hadoop_client
     testImplementation library.java.bigdataoss_gcsio
-    testImplementation library.java.bigdataoss_gcs_connector
-    testImplementation library.java.bigdataoss_util_hadoop
+    testImplementation 
"com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16"
+    testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16"
     testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
     testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
     testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index 6f97983d326..1060c542ac0 100644
--- 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -63,19 +63,28 @@ public class ManagedSchemaTransformProvider
     return "beam:transform:managed:v1";
   }
 
-  // Use 'getAllProviders()' instead of this cache.
-  private final Map<String, SchemaTransformProvider> 
schemaTransformProvidersCache =
-      new HashMap<>();
-  private boolean providersCached = false;
+  private final Map<String, SchemaTransformProvider> schemaTransformProviders 
= new HashMap<>();
 
-  private @Nullable Collection<String> supportedIdentifiers;
-
-  public ManagedSchemaTransformProvider() {
-    this(null);
-  }
+  public ManagedSchemaTransformProvider() {}
 
   ManagedSchemaTransformProvider(@Nullable Collection<String> 
supportedIdentifiers) {
-    this.supportedIdentifiers = supportedIdentifiers;
+    try {
+      for (SchemaTransformProvider schemaTransformProvider :
+          ServiceLoader.load(SchemaTransformProvider.class)) {
+        if 
(schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) {
+          throw new IllegalArgumentException(
+              "Found multiple SchemaTransformProvider implementations with the 
same identifier "
+                  + schemaTransformProvider.identifier());
+        }
+        if (supportedIdentifiers == null
+            || 
supportedIdentifiers.contains(schemaTransformProvider.identifier())) {
+          schemaTransformProviders.put(
+              schemaTransformProvider.identifier(), schemaTransformProvider);
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e.getMessage());
+    }
   }
 
   @DefaultSchema(AutoValueSchema.class)
@@ -140,7 +149,7 @@ public class ManagedSchemaTransformProvider
     managedConfig.validate();
     SchemaTransformProvider schemaTransformProvider =
         Preconditions.checkNotNull(
-            getAllProviders().get(managedConfig.getTransformIdentifier()),
+            
schemaTransformProviders.get(managedConfig.getTransformIdentifier()),
             "Could not find a transform with the identifier "
                 + "%s. This could be either due to the dependency with the "
                 + "transform not being available in the classpath or due to "
@@ -227,35 +236,7 @@ public class ManagedSchemaTransformProvider
     return YamlUtils.toBeamRow(configMap, transformSchema, false);
   }
 
-  // We load providers seperately, after construction, to prevent the
-  // 'ManagedSchemaTransformProvider' from being initialized in a recursive 
loop
-  // when being loaded using 'AutoValue'.
-  synchronized Map<String, SchemaTransformProvider> getAllProviders() {
-    if (this.providersCached) {
-      return schemaTransformProvidersCache;
-    }
-    try {
-      for (SchemaTransformProvider schemaTransformProvider :
-          ServiceLoader.load(SchemaTransformProvider.class)) {
-        if 
(schemaTransformProvidersCache.containsKey(schemaTransformProvider.identifier()))
 {
-          throw new IllegalArgumentException(
-              "Found multiple SchemaTransformProvider implementations with the 
same identifier "
-                  + schemaTransformProvider.identifier());
-        }
-        if (supportedIdentifiers == null
-            || 
supportedIdentifiers.contains(schemaTransformProvider.identifier())) {
-          if 
(schemaTransformProvider.identifier().equals("beam:transform:managed:v1")) {
-            // Prevent recursively adding the 'ManagedSchemaTransformProvider'.
-            continue;
-          }
-          schemaTransformProvidersCache.put(
-              schemaTransformProvider.identifier(), schemaTransformProvider);
-        }
-      }
-      this.providersCached = true;
-      return schemaTransformProvidersCache;
-    } catch (Exception e) {
-      throw new RuntimeException(e.getMessage());
-    }
+  Map<String, SchemaTransformProvider> getAllProviders() {
+    return schemaTransformProviders;
   }
 }

Reply via email to