This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 527e821e3c9 Support managed jdbc io (SQLServer) (#36055)
527e821e3c9 is described below
commit 527e821e3c925d8252504e104fa67678514b1f01
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 24 15:40:10 2025 -0400
Support managed jdbc io (SQLServer) (#36055)
* Add sqlserver read and write to managed io
* Address reviewer's comment.
---
.../model/pipeline/v1/external_transforms.proto | 4 +
.../ReadFromSqlServerSchemaTransformProvider.java | 43 +++-
.../SqlServerSchemaTransformTranslation.java | 93 ++++++++
.../WriteToSqlServerSchemaTransformProvider.java | 43 +++-
.../SqlServerSchemaTransformTranslationTest.java | 235 +++++++++++++++++++++
.../java/org/apache/beam/sdk/managed/Managed.java | 3 +
sdks/python/apache_beam/transforms/managed.py | 3 +
7 files changed, 422 insertions(+), 2 deletions(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 31232eb6067..043a72dd34f 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -84,6 +84,10 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:mysql_read:v1"];
MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:mysql_write:v1"];
+ SQL_SERVER_READ = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:sql_server_read:v1"];
+ SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:sql_server_write:v1"];
}
}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
index e4767177bb2..eec6660aa88 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
@@ -18,20 +18,30 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class ReadFromSqlServerSchemaTransformProvider extends
JdbcReadSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadFromSqlServerSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:sql_server_read:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ);
}
@Override
@@ -43,4 +53,35 @@ public class ReadFromSqlServerSchemaTransformProvider
extends JdbcReadSchemaTran
protected String jdbcType() {
return MSSQL;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
+ configuration.getConnectionInitSql();
+ if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+ throw new IllegalArgumentException("SQL Server does not support
connectionInitSql.");
+ }
+
+ // Override "connectionInitSql" for sqlserver
+ configuration =
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+ return new SqlServerReadSchemaTransform(configuration);
+ }
+
+ public static class SqlServerReadSchemaTransform extends
JdbcReadSchemaTransform {
+ public SqlServerReadSchemaTransform(JdbcReadSchemaTransformConfiguration
config) {
+ super(config, MSSQL);
+ config.validate(MSSQL);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
new file mode 100644
index 00000000000..cea52f8d962
--- /dev/null
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform;
+import static
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class SqlServerSchemaTransformTranslation {
+ static class SqlServerReadSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<SqlServerReadSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new ReadFromSqlServerSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(SqlServerReadSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(SqlServerReadSchemaTransform.class, new
SqlServerReadSchemaTransformTranslator())
+ .build();
+ }
+ }
+
+ static class SqlServerWriteSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<SqlServerWriteSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new WriteToSqlServerSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(SqlServerWriteSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class WriteRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(SqlServerWriteSchemaTransform.class, new
SqlServerWriteSchemaTransformTranslator())
+ .build();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
index 9e849f4e49e..dc26c240958 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
@@ -18,20 +18,30 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class WriteToSqlServerSchemaTransformProvider extends
JdbcWriteSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WriteToSqlServerSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:sql_server_write:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE);
}
@Override
@@ -43,4 +53,35 @@ public class WriteToSqlServerSchemaTransformProvider extends
JdbcWriteSchemaTran
protected String jdbcType() {
return MSSQL;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.isEmpty() &&
!jdbcType.equals(jdbcType())) {
+ LOG.warn(
+ "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+ jdbcType(),
+ jdbcType,
+ jdbcType());
+ configuration =
configuration.toBuilder().setJdbcType(jdbcType()).build();
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
+ configuration.getConnectionInitSql();
+ if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+ throw new IllegalArgumentException("SQL Server does not support
connectionInitSql.");
+ }
+
+ // Override "connectionInitSql" for sqlserver
+ configuration =
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+ return new SqlServerWriteSchemaTransform(configuration);
+ }
+
+ public static class SqlServerWriteSchemaTransform extends
JdbcWriteSchemaTransform {
+ public SqlServerWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration
config) {
+ super(config, MSSQL);
+ config.validate(MSSQL);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..d8890987fbf
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerReadSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerWriteSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class SqlServerSchemaTransformTranslationTest {
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ static final WriteToSqlServerSchemaTransformProvider WRITE_PROVIDER =
+ new WriteToSqlServerSchemaTransformProvider();
+ static final ReadFromSqlServerSchemaTransformProvider READ_PROVIDER =
+ new ReadFromSqlServerSchemaTransformProvider();
+
+ static final Row READ_CONFIG =
+ Row.withSchema(READ_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url",
"jdbc:sqlserver://host:port;databaseName=database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("disable_auto_commit", true)
+ .withFieldValue("fetch_size", 10)
+ .withFieldValue("num_partitions", 5)
+ .withFieldValue("output_parallelization", true)
+ .withFieldValue("partition_column", "col")
+ .withFieldValue("read_query", null)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .build();
+
+ static final Row WRITE_CONFIG =
+ Row.withSchema(WRITE_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url",
"jdbc:sqlserver://host:port;databaseName=database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("autosharding", true)
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("batch_size", 100L)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .withFieldValue("write_statement", null)
+ .build();
+
+ @Test
+ public void testRecreateWriteTransformFromRow() {
+ SqlServerWriteSchemaTransform writeTransform =
+ (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+ SqlServerWriteSchemaTransformTranslator translator =
+ new SqlServerWriteSchemaTransformTranslator();
+ Row translatedRow = translator.toConfigRow(writeTransform);
+
+ SqlServerWriteSchemaTransform writeTransformFromRow =
+ translator.fromConfigRow(translatedRow,
PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testWriteTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+ Schema inputSchema = Schema.builder().addStringField("name").build();
+ PCollection<Row> input =
+ p.apply(
+ Create.of(
+ Collections.singletonList(
+ Row.withSchema(inputSchema).addValue("test").build())))
+ .setRowSchema(inputSchema);
+
+ SqlServerWriteSchemaTransform writeTransform =
+ (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+ PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+ // Then translate the pipeline to a proto and extract
SqlServerWriteSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(WRITE_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, writeTransformProto.size());
+ RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+ assertEquals(WRITE_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
SqlServerWriteSchemaTransform
+ SqlServerWriteSchemaTransformTranslator translator =
+ new SqlServerWriteSchemaTransformTranslator();
+ SqlServerWriteSchemaTransform writeTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+ }
+
+ @Test
+ public void testReCreateReadTransformFromRow() {
+ // setting a subset of fields here.
+ SqlServerReadSchemaTransform readTransform =
+ (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ SqlServerReadSchemaTransformTranslator translator =
+ new SqlServerReadSchemaTransformTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ SqlServerReadSchemaTransform readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testReadTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+
+ SqlServerReadSchemaTransform readTransform =
+ (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ // Mock inferBeamSchema since it requires database connection.
+ Schema expectedSchema = Schema.builder().addStringField("name").build();
+ try (MockedStatic<JdbcIO.ReadRows> mock =
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+ mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(),
Mockito.any()))
+ .thenReturn(expectedSchema);
+ PCollectionRowTuple.empty(p).apply(readTransform);
+ }
+
+ // Then translate the pipeline to a proto and extract
SqlServerReadSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> readTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(READ_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, readTransformProto.size());
+ RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+ assertEquals(READ_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
SqlServerReadSchemaTransform
+ SqlServerReadSchemaTransformTranslator translator =
+ new SqlServerReadSchemaTransformTranslator();
+ SqlServerReadSchemaTransform readTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+ }
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 4f45eeac861..a5e7d879b44 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -98,6 +98,7 @@ public class Managed {
public static final String BIGQUERY = "bigquery";
public static final String POSTGRES = "postgres";
public static final String MYSQL = "mysql";
+ public static final String SQL_SERVER = "sqlserver";
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
@@ -108,6 +109,7 @@ public class Managed {
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
.put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
.put(MYSQL,
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ))
+ .put(SQL_SERVER,
getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ))
.build();
public static final Map<String, String> WRITE_TRANSFORMS =
ImmutableMap.<String, String>builder()
@@ -116,6 +118,7 @@ public class Managed {
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
.put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
.put(MYSQL,
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE))
+ .put(SQL_SERVER,
getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE))
.build();
/**
diff --git a/sdks/python/apache_beam/transforms/managed.py
b/sdks/python/apache_beam/transforms/managed.py
index 03449236ac9..33ba8d41a99 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -87,6 +87,7 @@ KAFKA = "kafka"
BIGQUERY = "bigquery"
POSTGRES = "postgres"
MYSQL = "mysql"
+SQL_SERVER = "sqlserver"
__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
@@ -100,6 +101,7 @@ class Read(PTransform):
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn,
+ SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn,
}
def __init__(
@@ -143,6 +145,7 @@ class Write(PTransform):
BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn,
+ SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_WRITE.urn
}
def __init__(