This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 527e821e3c9 Support managed jdbc io (SQLServer)  (#36055)
527e821e3c9 is described below

commit 527e821e3c925d8252504e104fa67678514b1f01
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Sep 24 15:40:10 2025 -0400

    Support managed jdbc io (SQLServer)  (#36055)
    
    * Add sqlserver read and write to managed io
    
    * Address reviewer's comment.
---
 .../model/pipeline/v1/external_transforms.proto    |   4 +
 .../ReadFromSqlServerSchemaTransformProvider.java  |  43 +++-
 .../SqlServerSchemaTransformTranslation.java       |  93 ++++++++
 .../WriteToSqlServerSchemaTransformProvider.java   |  43 +++-
 .../SqlServerSchemaTransformTranslationTest.java   | 235 +++++++++++++++++++++
 .../java/org/apache/beam/sdk/managed/Managed.java  |   3 +
 sdks/python/apache_beam/transforms/managed.py      |   3 +
 7 files changed, 422 insertions(+), 2 deletions(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index 31232eb6067..043a72dd34f 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -84,6 +84,10 @@ message ManagedTransforms {
       "beam:schematransform:org.apache.beam:mysql_read:v1"];
     MYSQL_WRITE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
       "beam:schematransform:org.apache.beam:mysql_write:v1"];
+    SQL_SERVER_READ = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:sql_server_read:v1"];
+    SQL_SERVER_WRITE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+      "beam:schematransform:org.apache.beam:sql_server_write:v1"];
   }
 }
 
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
index e4767177bb2..eec6660aa88 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromSqlServerSchemaTransformProvider.java
@@ -18,20 +18,30 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class ReadFromSqlServerSchemaTransformProvider extends 
JdbcReadSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReadFromSqlServerSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:sql_server_read:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ);
   }
 
   @Override
@@ -43,4 +53,35 @@ public class ReadFromSqlServerSchemaTransformProvider 
extends JdbcReadSchemaTran
   protected String jdbcType() {
     return MSSQL;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcReadSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.isEmpty() && 
!jdbcType.equals(jdbcType())) {
+      LOG.warn(
+          "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+          jdbcType(),
+          jdbcType,
+          jdbcType());
+      configuration = 
configuration.toBuilder().setJdbcType(jdbcType()).build();
+    }
+
+    List<@org.checkerframework.checker.nullness.qual.Nullable String> 
connectionInitSql =
+        configuration.getConnectionInitSql();
+    if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+      throw new IllegalArgumentException("SQL Server does not support 
connectionInitSql.");
+    }
+
+    // Override "connectionInitSql" for sqlserver
+    configuration = 
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+    return new SqlServerReadSchemaTransform(configuration);
+  }
+
+  public static class SqlServerReadSchemaTransform extends 
JdbcReadSchemaTransform {
+    public SqlServerReadSchemaTransform(JdbcReadSchemaTransformConfiguration 
config) {
+      super(config, MSSQL);
+      config.validate(MSSQL);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
new file mode 100644
index 00000000000..cea52f8d962
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform;
+import static 
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import 
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class SqlServerSchemaTransformTranslation {
+  static class SqlServerReadSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<SqlServerReadSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new ReadFromSqlServerSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(SqlServerReadSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class ReadRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(SqlServerReadSchemaTransform.class, new 
SqlServerReadSchemaTransformTranslator())
+          .build();
+    }
+  }
+
+  static class SqlServerWriteSchemaTransformTranslator
+      extends SchemaTransformPayloadTranslator<SqlServerWriteSchemaTransform> {
+    @Override
+    public SchemaTransformProvider provider() {
+      return new WriteToSqlServerSchemaTransformProvider();
+    }
+
+    @Override
+    public Row toConfigRow(SqlServerWriteSchemaTransform transform) {
+      return transform.getConfigurationRow();
+    }
+  }
+
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class WriteRegistrar implements 
TransformPayloadTranslatorRegistrar {
+    @Override
+    @SuppressWarnings({
+      "rawtypes",
+    })
+    public Map<
+            ? extends Class<? extends PTransform>,
+            ? extends PTransformTranslation.TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap
+          .<Class<? extends PTransform>, 
PTransformTranslation.TransformPayloadTranslator>builder()
+          .put(SqlServerWriteSchemaTransform.class, new 
SqlServerWriteSchemaTransformTranslator())
+          .build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
index 9e849f4e49e..dc26c240958 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToSqlServerSchemaTransformProvider.java
@@ -18,20 +18,30 @@
 package org.apache.beam.sdk.io.jdbc.providers;
 
 import static org.apache.beam.sdk.io.jdbc.JdbcUtil.MSSQL;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @AutoService(SchemaTransformProvider.class)
 public class WriteToSqlServerSchemaTransformProvider extends 
JdbcWriteSchemaTransformProvider {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(WriteToSqlServerSchemaTransformProvider.class);
+
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:sql_server_write:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE);
   }
 
   @Override
@@ -43,4 +53,35 @@ public class WriteToSqlServerSchemaTransformProvider extends 
JdbcWriteSchemaTran
   protected String jdbcType() {
     return MSSQL;
   }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+      JdbcWriteSchemaTransformConfiguration configuration) {
+    String jdbcType = configuration.getJdbcType();
+    if (jdbcType != null && !jdbcType.isEmpty() && 
!jdbcType.equals(jdbcType())) {
+      LOG.warn(
+          "Wrong JDBC type. Expected '{}' but got '{}'. Overriding with '{}'.",
+          jdbcType(),
+          jdbcType,
+          jdbcType());
+      configuration = 
configuration.toBuilder().setJdbcType(jdbcType()).build();
+    }
+
+    List<@org.checkerframework.checker.nullness.qual.Nullable String> 
connectionInitSql =
+        configuration.getConnectionInitSql();
+    if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+      throw new IllegalArgumentException("SQL Server does not support 
connectionInitSql.");
+    }
+
+    // Override "connectionInitSql" for sqlserver
+    configuration = 
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+    return new SqlServerWriteSchemaTransform(configuration);
+  }
+
+  public static class SqlServerWriteSchemaTransform extends 
JdbcWriteSchemaTransform {
+    public SqlServerWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration 
config) {
+      super(config, MSSQL);
+      config.validate(MSSQL);
+    }
+  }
 }
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..d8890987fbf
--- /dev/null
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/SqlServerSchemaTransformTranslationTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static 
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static 
org.apache.beam.sdk.io.jdbc.providers.ReadFromSqlServerSchemaTransformProvider.SqlServerReadSchemaTransform;
+import static 
org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerReadSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.SqlServerSchemaTransformTranslation.SqlServerWriteSchemaTransformTranslator;
+import static 
org.apache.beam.sdk.io.jdbc.providers.WriteToSqlServerSchemaTransformProvider.SqlServerWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import 
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class SqlServerSchemaTransformTranslationTest {
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  static final WriteToSqlServerSchemaTransformProvider WRITE_PROVIDER =
+      new WriteToSqlServerSchemaTransformProvider();
+  static final ReadFromSqlServerSchemaTransformProvider READ_PROVIDER =
+      new ReadFromSqlServerSchemaTransformProvider();
+
+  static final Row READ_CONFIG =
+      Row.withSchema(READ_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", 
"jdbc:sqlserver://host:port;databaseName=database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("disable_auto_commit", true)
+          .withFieldValue("fetch_size", 10)
+          .withFieldValue("num_partitions", 5)
+          .withFieldValue("output_parallelization", true)
+          .withFieldValue("partition_column", "col")
+          .withFieldValue("read_query", null)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .build();
+
+  static final Row WRITE_CONFIG =
+      Row.withSchema(WRITE_PROVIDER.configurationSchema())
+          .withFieldValue("jdbc_url", 
"jdbc:sqlserver://host:port;databaseName=database")
+          .withFieldValue("location", "test_table")
+          .withFieldValue("autosharding", true)
+          .withFieldValue("connection_init_sql", 
ImmutableList.<String>builder().build())
+          .withFieldValue("connection_properties", "some_property")
+          .withFieldValue("driver_class_name", null)
+          .withFieldValue("driver_jars", null)
+          .withFieldValue("batch_size", 100L)
+          .withFieldValue("username", "my_user")
+          .withFieldValue("password", "my_pass")
+          .withFieldValue("write_statement", null)
+          .build();
+
+  @Test
+  public void testRecreateWriteTransformFromRow() {
+    SqlServerWriteSchemaTransform writeTransform =
+        (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+    SqlServerWriteSchemaTransformTranslator translator =
+        new SqlServerWriteSchemaTransformTranslator();
+    Row translatedRow = translator.toConfigRow(writeTransform);
+
+    SqlServerWriteSchemaTransform writeTransformFromRow =
+        translator.fromConfigRow(translatedRow, 
PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testWriteTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+    Schema inputSchema = Schema.builder().addStringField("name").build();
+    PCollection<Row> input =
+        p.apply(
+                Create.of(
+                    Collections.singletonList(
+                        Row.withSchema(inputSchema).addValue("test").build())))
+            .setRowSchema(inputSchema);
+
+    SqlServerWriteSchemaTransform writeTransform =
+        (SqlServerWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+    PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+    // Then translate the pipeline to a proto and extract 
SqlServerWriteSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> writeTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(WRITE_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, writeTransformProto.size());
+    RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+    assertEquals(WRITE_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
SqlServerWriteSchemaTransform
+    SqlServerWriteSchemaTransformTranslator translator =
+        new SqlServerWriteSchemaTransformTranslator();
+    SqlServerWriteSchemaTransform writeTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+  }
+
+  @Test
+  public void testReCreateReadTransformFromRow() {
+    // setting a subset of fields here.
+    SqlServerReadSchemaTransform readTransform =
+        (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    SqlServerReadSchemaTransformTranslator translator =
+        new SqlServerReadSchemaTransformTranslator();
+    Row row = translator.toConfigRow(readTransform);
+
+    SqlServerReadSchemaTransform readTransformFromRow =
+        translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+  }
+
+  @Test
+  public void testReadTransformProtoTranslation()
+      throws InvalidProtocolBufferException, IOException {
+    // First build a pipeline
+    Pipeline p = Pipeline.create();
+
+    SqlServerReadSchemaTransform readTransform =
+        (SqlServerReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+    // Mock inferBeamSchema since it requires database connection.
+    Schema expectedSchema = Schema.builder().addStringField("name").build();
+    try (MockedStatic<JdbcIO.ReadRows> mock = 
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+      mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(), 
Mockito.any()))
+          .thenReturn(expectedSchema);
+      PCollectionRowTuple.empty(p).apply(readTransform);
+    }
+
+    // Then translate the pipeline to a proto and extract 
SqlServerReadSchemaTransform proto
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    List<RunnerApi.PTransform> readTransformProto =
+        pipelineProto.getComponents().getTransformsMap().values().stream()
+            .filter(
+                tr -> {
+                  RunnerApi.FunctionSpec spec = tr.getSpec();
+                  try {
+                    return 
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+                        && SchemaTransformPayload.parseFrom(spec.getPayload())
+                            .getIdentifier()
+                            .equals(READ_PROVIDER.identifier());
+                  } catch (InvalidProtocolBufferException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .collect(Collectors.toList());
+    assertEquals(1, readTransformProto.size());
+    RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+    // Check that the proto contains correct values
+    SchemaTransformPayload payload = 
SchemaTransformPayload.parseFrom(spec.getPayload());
+    Schema schemaFromSpec = 
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+    assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+    Row rowFromSpec = 
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+    assertEquals(READ_CONFIG, rowFromSpec);
+
+    // Use the information in the proto to recreate the 
SqlServerReadSchemaTransform
+    SqlServerReadSchemaTransformTranslator translator =
+        new SqlServerReadSchemaTransformTranslator();
+    SqlServerReadSchemaTransform readTransformFromSpec =
+        translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+    assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+  }
+}
diff --git 
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java 
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 4f45eeac861..a5e7d879b44 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -98,6 +98,7 @@ public class Managed {
   public static final String BIGQUERY = "bigquery";
   public static final String POSTGRES = "postgres";
   public static final String MYSQL = "mysql";
+  public static final String SQL_SERVER = "sqlserver";
 
   // Supported SchemaTransforms
   public static final Map<String, String> READ_TRANSFORMS =
@@ -108,6 +109,7 @@ public class Managed {
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
           .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
           .put(MYSQL, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_READ))
+          .put(SQL_SERVER, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_READ))
           .build();
   public static final Map<String, String> WRITE_TRANSFORMS =
       ImmutableMap.<String, String>builder()
@@ -116,6 +118,7 @@ public class Managed {
           .put(BIGQUERY, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
           .put(POSTGRES, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
           .put(MYSQL, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.MYSQL_WRITE))
+          .put(SQL_SERVER, 
getUrn(ExternalTransforms.ManagedTransforms.Urns.SQL_SERVER_WRITE))
           .build();
 
   /**
diff --git a/sdks/python/apache_beam/transforms/managed.py 
b/sdks/python/apache_beam/transforms/managed.py
index 03449236ac9..33ba8d41a99 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -87,6 +87,7 @@ KAFKA = "kafka"
 BIGQUERY = "bigquery"
 POSTGRES = "postgres"
 MYSQL = "mysql"
+SQL_SERVER = "sqlserver"
 
 __all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
 
@@ -100,6 +101,7 @@ class Read(PTransform):
       BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
       POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
       MYSQL: ManagedTransforms.Urns.MYSQL_READ.urn,
+      SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_READ.urn,
   }
 
   def __init__(
@@ -143,6 +145,7 @@ class Write(PTransform):
       BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
       POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
       MYSQL: ManagedTransforms.Urns.MYSQL_WRITE.urn,
+      SQL_SERVER: ManagedTransforms.Urns.SQL_SERVER_WRITE.urn
   }
 
   def __init__(

Reply via email to