This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch revert-31451-expansion_service_upgrade_2_dependencies in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5f1b33b5fa0acc078ccfd0e3c1f92c62f133b6e0 Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Wed Jun 12 15:30:03 2024 +0200 Revert "Updates Expansion Service Container to support upgrading using the sc…" This reverts commit 7b6f9415c108924732f10b4645cd2ecc2a81482a. --- .../IO_Iceberg_Integration_Tests.json | 3 - .../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 - .../construction/ExternalTranslationOptions.java | 4 +- .../util/construction/PTransformTranslation.java | 2 +- .../sdk/util/construction/TransformUpgrader.java | 56 +++------ .../util/construction/TransformUpgraderTest.java | 139 +++------------------ .../container/expansion_service_config.yml | 3 - .../sdk/expansion/service/ExpansionService.java | 67 +++------- sdks/java/io/expansion-service/build.gradle | 5 - sdks/java/io/iceberg/build.gradle | 4 +- .../managed/ManagedSchemaTransformProvider.java | 63 ++++------ 11 files changed, 78 insertions(+), 270 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json deleted file mode 100644 index a03c067d2c4..00000000000 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "comment": "Modify this file in a trivial way to cause this test suite to run" -} diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 65fcf733340..677e1763144 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -693,9 +693,7 @@ class BeamModulePlugin implements Plugin<Project> { aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version", azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14", bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", - bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version", - bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version", byte_buddy : "net.bytebuddy:byte-buddy:1.14.12", cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java index f444d2e3e30..5ed14faf31e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java @@ -25,9 +25,7 @@ import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions.EmptyLi public interface ExternalTranslationOptions extends PipelineOptions { - @Description( - "Set of URNs of transforms to be overriden using the transform service. The provided strings " - + "can be transform URNs of schema-transform IDs") + @Description("Set of URNs of transforms to be overriden using the transform service.") @Default.InstanceFactory(EmptyListDefault.class) List<String> getTransformsToOverride(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index 3d914272373..e2b6d95057f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -535,7 +535,7 @@ public class PTransformTranslation { if (underlyingIdentifier == null) { throw new IllegalStateException( String.format( - "Encountered a Managed Transform that has an empty \"transform_identifier\": %n%s", + "Encountered a Managed Transform that has an empty \"transform_identifier\": \n%s", configRow)); } transformBuilder.putAnnotations( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index f88d55a1f5c..941a5daf689 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.util.construction; -import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; - import com.fasterxml.jackson.core.Version; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -53,7 +51,6 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayl import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannelBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; @@ -116,22 +113,6 @@ public class TransformUpgrader implements AutoCloseable { if (urn != null && urnsToOverride.contains(urn)) { return true; } - - // Also check if the URN is a schema-transform ID. - if (urn.equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { - try { - ExternalTransforms.SchemaTransformPayload schemaTransformPayload = - ExternalTransforms.SchemaTransformPayload.parseFrom( - entry.getValue().getSpec().getPayload()); - String schemaTransformId = schemaTransformPayload.getIdentifier(); - if (urnsToOverride.contains(schemaTransformId)) { - return true; - } - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - return false; }) .map( @@ -203,27 +184,20 @@ public class TransformUpgrader implements AutoCloseable { if (transformToUpgrade == null) { throw new IllegalArgumentException("Could not find a transform with the ID " + transformId); } - - byte[] payloadBytes = null; - - if (!transformToUpgrade.getSpec().getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { - ByteString configRowBytes = - transformToUpgrade.getAnnotationsOrThrow( - BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY)); - ByteString configRowSchemaBytes = - transformToUpgrade.getAnnotationsOrThrow( - BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY)); - SchemaApi.Schema configRowSchemaProto = - SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); - payloadBytes = - ExternalTransforms.ExternalConfigurationPayload.newBuilder() - .setSchema(configRowSchemaProto) - .setPayload(configRowBytes) - .build() - .toByteArray(); - } else { - payloadBytes = transformToUpgrade.getSpec().getPayload().toByteArray(); - } + ByteString configRowBytes = + transformToUpgrade.getAnnotationsOrThrow( + BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY)); + ByteString configRowSchemaBytes = + transformToUpgrade.getAnnotationsOrThrow( + BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY)); + SchemaApi.Schema configRowSchemaProto = + SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(configRowSchemaProto) + .setPayload(configRowBytes) + .build(); RunnerApi.PTransform.Builder ptransformBuilder = RunnerApi.PTransform.newBuilder() @@ -231,7 +205,7 @@ public class TransformUpgrader implements AutoCloseable { .setSpec( RunnerApi.FunctionSpec.newBuilder() .setUrn(transformToUpgrade.getSpec().getUrn()) - .setPayload(ByteString.copyFrom(payloadBytes)) + .setPayload(ByteString.copyFrom(payload.toByteArray())) .build()); for (Map.Entry<String, String> entry : transformToUpgrade.getInputsMap().entrySet()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java index a54383b46e7..65b3e8e89ca 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java @@ -39,22 +39,16 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.transforms.SchemaTransform; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; -import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -164,60 +158,6 @@ public class TransformUpgraderTest { } } - public static class TestSchemaTransformProvider implements SchemaTransformProvider { - - @Override - public String identifier() { - return "dummy_schema_transform"; - } - - @Override - public Schema configurationSchema() { - return Schema.builder().build(); - } - - @Override - public SchemaTransform from(Row configuration) { - return new TestSchemaTransform(); - } - } - - public static class TestSchemaTransform extends SchemaTransform { - - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return input; - } - } - - static class TestSchemaTransformTranslator - extends SchemaTransformPayloadTranslator<TestSchemaTransform> { - @Override - public SchemaTransformProvider provider() { - return new TestSchemaTransformProvider(); - } - - @Override - public Row toConfigRow(TestSchemaTransform transform) { - return Row.withSchema(Schema.builder().build()).build(); - } - } - - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class TestSchemaTransformPayloadTranslatorRegistrar - implements TransformPayloadTranslatorRegistrar { - @Override - @SuppressWarnings({ - "rawtypes", - }) - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder() - .put(TestSchemaTransform.class, new TestSchemaTransformTranslator()) - .build(); - } - } - static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory { ExpansionApi.ExpansionResponse response; @@ -243,18 +183,6 @@ public class TransformUpgraderTest { .getTransformsMap() .get("TransformUpgraderTest-TestTransform2"); } - - boolean schemaTransformTest = false; - if (transformToUpgrade == null) { - // This is running a schema-transform test. - transformToUpgrade = - request - .getComponents() - .getTransformsMap() - .get("TransformUpgraderTest-TestSchemaTransform"); - schemaTransformTest = true; - } - if (!transformToUpgrade .getSpec() .getUrn() @@ -262,30 +190,27 @@ public class TransformUpgraderTest { throw new RuntimeException("Could not find a valid transform to upgrade"); } + Integer oldParam; + try { + ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray()); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + oldParam = (Integer) objectInputStream.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + RunnerApi.PTransform.Builder upgradedTransform = transformToUpgrade.toBuilder(); FunctionSpec.Builder specBuilder = upgradedTransform.getSpecBuilder(); - if (!schemaTransformTest) { - Integer oldParam; - try { - ByteArrayInputStream byteArrayInputStream = - new ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray()); - ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); - oldParam = (Integer) objectInputStream.readObject(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); - try { - ObjectOutputStream objectOutputStream = - new ObjectOutputStream(byteStringOutputStream); - objectOutputStream.writeObject(oldParam * 2); - objectOutputStream.flush(); - specBuilder.setPayload(byteStringOutputStream.toByteString()); - } catch (IOException e) { - throw new RuntimeException(e); - } + ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); + try { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOutputStream); + objectOutputStream.writeObject(oldParam * 2); + objectOutputStream.flush(); + specBuilder.setPayload(byteStringOutputStream.toByteString()); + } catch (IOException e) { + throw new RuntimeException(e); } upgradedTransform.setSpec(specBuilder.build()); @@ -366,34 +291,6 @@ public class TransformUpgraderTest { assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY)); } - @Test - public void testTransformUpgradeSchemaTransform() throws Exception { - Pipeline pipeline = Pipeline.create(); - - // Build the pipeline - PCollectionRowTuple.empty(pipeline).apply(new TestSchemaTransform()); - - RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); - ExternalTranslationOptions options = - PipelineOptionsFactory.create().as(ExternalTranslationOptions.class); - List<String> urnsToOverride = ImmutableList.of("dummy_schema_transform"); - options.setTransformsToOverride(urnsToOverride); - options.setTransformServiceAddress("dummyaddress"); - - RunnerApi.Pipeline upgradedPipelineProto = - TransformUpgrader.of(new TestExpansionServiceClientFactory()) - .upgradeTransformsViaTransformService(pipelineProto, urnsToOverride, options); - - RunnerApi.PTransform upgradedTransform = - upgradedPipelineProto - .getComponents() - .getTransformsMap() - .get("TransformUpgraderTest-TestSchemaTransform"); - - // Confirm that the upgraded transform includes the upgrade annotation. - assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY)); - } - @Test public void testTransformUpgradeMultipleOccurrences() throws Exception { Pipeline pipeline = Pipeline.create(); diff --git a/sdks/java/expansion-service/container/expansion_service_config.yml b/sdks/java/expansion-service/container/expansion_service_config.yml index 4f48efd5947..653629aa153 100644 --- a/sdks/java/expansion-service/container/expansion_service_config.yml +++ b/sdks/java/expansion-service/container/expansion_service_config.yml @@ -17,9 +17,6 @@ allowlist: - "beam:transform:org.apache.beam:schemaio_jdbc_read:v1" - "beam:transform:org.apache.beam:schemaio_jdbc_write:v1" - "beam:schematransform:org.apache.beam:bigquery_storage_write:v1" -# By default, the Expansion Service container will include all dependencies in -# the classpath. Following config can be used to override this behavior per -# transform URN or schema-transform ID. dependencies: "beam:transform:org.apache.beam:kafka_read_with_metadata:v1": - path: "jars/beam-sdks-java-io-expansion-service.jar" diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 770da14fa1c..5d436451178 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -81,7 +81,6 @@ import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -247,11 +246,6 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB private final TransformPayloadTranslator<PTransform<InputT, OutputT>> payloadTranslator; - // Returns true if the underlying transform represented by this is a schema-aware transform. - private boolean isSchemaTransform() { - return (payloadTranslator instanceof SchemaTransformPayloadTranslator); - } - private TransformProviderForPayloadTranslator( TransformPayloadTranslator<PTransform<InputT, OutputT>> payloadTranslator) { this.payloadTranslator = payloadTranslator; @@ -260,51 +254,28 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB @Override public PTransform<InputT, OutputT> getTransform( RunnerApi.FunctionSpec spec, PipelineOptions options) { - if (isSchemaTransform()) { - return ExpansionServiceSchemaTransformProvider.of().getTransform(spec, options); - } else { - try { - ExternalConfigurationPayload payload = - ExternalConfigurationPayload.parseFrom(spec.getPayload()); - Row configRow = - RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema())) - .decode(new ByteArrayInputStream(payload.getPayload().toByteArray())); - PTransform transformFromRow = payloadTranslator.fromConfigRow(configRow, options); - if (transformFromRow != null) { - return transformFromRow; - } else { - throw new RuntimeException( - String.format( - "A transform cannot be initiated using the provided config row %s and the" - + " TransformPayloadTranslator %s", - configRow, payloadTranslator)); - } - } catch (Exception e) { + try { + ExternalConfigurationPayload payload = + ExternalConfigurationPayload.parseFrom(spec.getPayload()); + Row configRow = + RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema())) + .decode(new ByteArrayInputStream(payload.getPayload().toByteArray())); + PTransform transformFromRow = payloadTranslator.fromConfigRow(configRow, options); + if (transformFromRow != null) { + return transformFromRow; + } else { throw new RuntimeException( String.format( - "Failed to build transform %s from spec %s: %s", - spec.getUrn(), spec, e.getMessage()), - e); + "A transform cannot be initiated using the provided config row %s and the" + + " TransformPayloadTranslator %s", + configRow, payloadTranslator)); } - } - } - - @Override - public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) { - if (isSchemaTransform()) { - return (InputT) ExpansionServiceSchemaTransformProvider.of().createInput(p, inputs); - } else { - return TransformProvider.super.createInput(p, inputs); - } - } - - @Override - public Map<String, PCollection<?>> extractOutputs(OutputT output) { - if (isSchemaTransform()) { - return ExpansionServiceSchemaTransformProvider.of() - .extractOutputs((PCollectionRowTuple) output); - } else { - return TransformProvider.super.extractOutputs(output); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to build transform %s from spec %s: %s", + spec.getUrn(), spec, e.getMessage()), + e); } } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 95a843e51fd..e9fe4c1fe63 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -43,11 +43,6 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 - - // Needed by Iceberg I/O users that use GCS for the warehouse location. - implementation library.java.bigdataoss_gcs_connector - permitUnusedDeclared library.java.bigdataoss_gcs_connector - runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 7965cde86e7..765fa494875 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -57,8 +57,8 @@ dependencies { testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio - testImplementation library.java.bigdataoss_gcs_connector - testImplementation library.java.bigdataoss_util_hadoop + testImplementation "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16" + testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16" testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 6f97983d326..1060c542ac0 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -63,19 +63,28 @@ public class ManagedSchemaTransformProvider return "beam:transform:managed:v1"; } - // Use 'getAllProviders()' instead of this cache. - private final Map<String, SchemaTransformProvider> schemaTransformProvidersCache = - new HashMap<>(); - private boolean providersCached = false; + private final Map<String, SchemaTransformProvider> schemaTransformProviders = new HashMap<>(); - private @Nullable Collection<String> supportedIdentifiers; - - public ManagedSchemaTransformProvider() { - this(null); - } + public ManagedSchemaTransformProvider() {} ManagedSchemaTransformProvider(@Nullable Collection<String> supportedIdentifiers) { - this.supportedIdentifiers = supportedIdentifiers; + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); + } + if (supportedIdentifiers == null + || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { + schemaTransformProviders.put( + schemaTransformProvider.identifier(), schemaTransformProvider); + } + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } } @DefaultSchema(AutoValueSchema.class) @@ -140,7 +149,7 @@ public class ManagedSchemaTransformProvider managedConfig.validate(); SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( - getAllProviders().get(managedConfig.getTransformIdentifier()), + schemaTransformProviders.get(managedConfig.getTransformIdentifier()), "Could not find a transform with the identifier " + "%s. This could be either due to the dependency with the " + "transform not being available in the classpath or due to " @@ -227,35 +236,7 @@ public class ManagedSchemaTransformProvider return YamlUtils.toBeamRow(configMap, transformSchema, false); } - // We load providers seperately, after construction, to prevent the - // 'ManagedSchemaTransformProvider' from being initialized in a recursive loop - // when being loaded using 'AutoValue'. - synchronized Map<String, SchemaTransformProvider> getAllProviders() { - if (this.providersCached) { - return schemaTransformProvidersCache; - } - try { - for (SchemaTransformProvider schemaTransformProvider : - ServiceLoader.load(SchemaTransformProvider.class)) { - if (schemaTransformProvidersCache.containsKey(schemaTransformProvider.identifier())) { - throw new IllegalArgumentException( - "Found multiple SchemaTransformProvider implementations with the same identifier " - + schemaTransformProvider.identifier()); - } - if (supportedIdentifiers == null - || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { - if (schemaTransformProvider.identifier().equals("beam:transform:managed:v1")) { - // Prevent recursively adding the 'ManagedSchemaTransformProvider'. - continue; - } - schemaTransformProvidersCache.put( - schemaTransformProvider.identifier(), schemaTransformProvider); - } - } - this.providersCached = true; - return schemaTransformProvidersCache; - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } + Map<String, SchemaTransformProvider> getAllProviders() { + return schemaTransformProviders; } }