This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new be4fb977e18 Support managed jdbc io (Postgres) (#36034)
be4fb977e18 is described below
commit be4fb977e181e86fd68753218563c5884744df02
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Sep 8 15:35:39 2025 -0400
Support managed jdbc io (Postgres) (#36034)
* Add postgres read to managed io
* Add postgres write to managed io
* Add integration tests for both managed and unmanaged postgres read and
write.
* Fix error in analyzeClassesDependencies gradle task
* Fix spotless failure.
* Fix python lint
* Add schema transform translation for postgres read and write.
* Add test for postgres schema transform translation.
* Address reviewer's feedback.
---
.../model/pipeline/v1/external_transforms.proto | 4 +
sdks/java/io/jdbc/build.gradle | 3 +
.../io/jdbc/JdbcReadSchemaTransformProvider.java | 18 ++
.../io/jdbc/JdbcWriteSchemaTransformProvider.java | 18 ++
.../PostgresSchemaTransformTranslation.java | 93 ++++++++
.../ReadFromPostgresSchemaTransformProvider.java | 48 ++++-
.../WriteToPostgresSchemaTransformProvider.java | 38 +++-
.../apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java | 178 ++++++++++++++++
.../PostgresSchemaTransformTranslationTest.java | 233 +++++++++++++++++++++
.../java/org/apache/beam/sdk/managed/Managed.java | 3 +
sdks/python/apache_beam/transforms/external.py | 4 +-
sdks/python/apache_beam/transforms/managed.py | 7 +-
12 files changed, 642 insertions(+), 5 deletions(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
index add8a1999ca..02a5dd18e2c 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
@@ -76,6 +76,10 @@ message ManagedTransforms {
"beam:schematransform:org.apache.beam:bigquery_write:v1"];
ICEBERG_CDC_READ = 6 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:schematransform:org.apache.beam:iceberg_cdc_read:v1"];
+ POSTGRES_READ = 7 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:postgres_read:v1"];
+ POSTGRES_WRITE = 8 [(org.apache.beam.model.pipeline.v1.beam_urn) =
+ "beam:schematransform:org.apache.beam:postgres_write:v1"];
}
}
diff --git a/sdks/java/io/jdbc/build.gradle b/sdks/java/io/jdbc/build.gradle
index 8c5fa685fda..87a231a5a42 100644
--- a/sdks/java/io/jdbc/build.gradle
+++ b/sdks/java/io/jdbc/build.gradle
@@ -29,6 +29,7 @@ ext.summary = "IO to read and write on JDBC datasource."
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation project(path: ":model:pipeline", configuration: "shadow")
implementation library.java.dbcp2
implementation library.java.joda_time
implementation "org.apache.commons:commons-pool2:2.11.1"
@@ -39,8 +40,10 @@ dependencies {
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
testImplementation project(path: ":sdks:java:extensions:avro",
configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:common")
+ testImplementation project(path: ":sdks:java:managed")
testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation library.java.junit
+ testImplementation library.java.mockito_inline
testImplementation library.java.slf4j_api
testImplementation library.java.postgres
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
index 6777be50ab5..da75c9baaa4 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public class JdbcReadSchemaTransformProvider
}
return PCollectionRowTuple.of("output",
input.getPipeline().apply(readRows));
}
+
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming
conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(JdbcReadSchemaTransformConfiguration.class)
+ .apply(config)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -401,6 +417,8 @@ public class JdbcReadSchemaTransformProvider
.Builder();
}
+ public abstract Builder toBuilder();
+
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDriverClassName(String value);
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
index 6f10df56aab..4dbb9b396f0 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java
@@ -27,7 +27,9 @@ import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -265,6 +267,20 @@ public class JdbcWriteSchemaTransformProvider
.setRowSchema(Schema.of());
return PCollectionRowTuple.of("post_write", postWrite);
}
+
+ public Row getConfigurationRow() {
+ try {
+ // To stay consistent with our SchemaTransform configuration naming
conventions,
+ // we sort lexicographically
+ return SchemaRegistry.createDefault()
+ .getToRowFunction(JdbcWriteSchemaTransformConfiguration.class)
+ .apply(config)
+ .sorted()
+ .toSnakeCase();
+ } catch (NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
@Override
@@ -382,6 +398,8 @@ public class JdbcWriteSchemaTransformProvider
.Builder();
}
+ public abstract Builder toBuilder();
+
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDriverClassName(String value);
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
new file mode 100644
index 00000000000..288b29642c5
--- /dev/null
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslation.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform;
+import static
org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import
org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+public class PostgresSchemaTransformTranslation {
+ static class PostgresReadSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<PostgresReadSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new ReadFromPostgresSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(PostgresReadSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class ReadRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(PostgresReadSchemaTransform.class, new
PostgresReadSchemaTransformTranslator())
+ .build();
+ }
+ }
+
+ static class PostgresWriteSchemaTransformTranslator
+ extends SchemaTransformPayloadTranslator<PostgresWriteSchemaTransform> {
+ @Override
+ public SchemaTransformProvider provider() {
+ return new WriteToPostgresSchemaTransformProvider();
+ }
+
+ @Override
+ public Row toConfigRow(PostgresWriteSchemaTransform transform) {
+ return transform.getConfigurationRow();
+ }
+ }
+
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class WriteRegistrar implements
TransformPayloadTranslatorRegistrar {
+ @Override
+ @SuppressWarnings({
+ "rawtypes",
+ })
+ public Map<
+ ? extends Class<? extends PTransform>,
+ ? extends PTransformTranslation.TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap
+ .<Class<? extends PTransform>,
PTransformTranslation.TransformPayloadTranslator>builder()
+ .put(PostgresWriteSchemaTransform.class, new
PostgresWriteSchemaTransformTranslator())
+ .build();
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
index 62ff14c23e0..8755ce0ecca 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/ReadFromPostgresSchemaTransformProvider.java
@@ -18,20 +18,30 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcReadSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class ReadFromPostgresSchemaTransformProvider extends
JdbcReadSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReadFromPostgresSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:postgres_read:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ);
}
@Override
@@ -43,4 +53,40 @@ public class ReadFromPostgresSchemaTransformProvider extends
JdbcReadSchemaTrans
protected String jdbcType() {
return POSTGRES;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcReadSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.equals(jdbcType())) {
+ throw new IllegalArgumentException(
+ String.format("Wrong JDBC type. Expected '%s' but got '%s'",
jdbcType(), jdbcType));
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
+ configuration.getConnectionInitSql();
+ if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+ LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+ }
+
+ Boolean disableAutoCommit = configuration.getDisableAutoCommit();
+ if (disableAutoCommit != null && !disableAutoCommit) {
+ LOG.warn("Postgres reads require disableAutoCommit to be true,
overriding to true.");
+ }
+
+ // Override "connectionInitSql" and "disableAutoCommit" for postgres
+ configuration =
+ configuration
+ .toBuilder()
+ .setConnectionInitSql(Collections.emptyList())
+ .setDisableAutoCommit(true)
+ .build();
+ return new PostgresReadSchemaTransform(configuration);
+ }
+
+ public static class PostgresReadSchemaTransform extends
JdbcReadSchemaTransform {
+ public PostgresReadSchemaTransform(JdbcReadSchemaTransformConfiguration
config) {
+ super(config, POSTGRES);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
index c50b8431163..411e1ff2c47 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/providers/WriteToPostgresSchemaTransformProvider.java
@@ -18,20 +18,30 @@
package org.apache.beam.sdk.io.jdbc.providers;
import static org.apache.beam.sdk.io.jdbc.JdbcUtil.POSTGRES;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.jdbc.JdbcWriteSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@AutoService(SchemaTransformProvider.class)
public class WriteToPostgresSchemaTransformProvider extends
JdbcWriteSchemaTransformProvider {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WriteToPostgresSchemaTransformProvider.class);
+
@Override
public @UnknownKeyFor @NonNull @Initialized String identifier() {
- return "beam:schematransform:org.apache.beam:postgres_write:v1";
+ return getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE);
}
@Override
@@ -43,4 +53,30 @@ public class WriteToPostgresSchemaTransformProvider extends
JdbcWriteSchemaTrans
protected String jdbcType() {
return POSTGRES;
}
+
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
+ JdbcWriteSchemaTransformConfiguration configuration) {
+ String jdbcType = configuration.getJdbcType();
+ if (jdbcType != null && !jdbcType.equals(jdbcType())) {
+ throw new IllegalArgumentException(
+ String.format("Wrong JDBC type. Expected '%s' but got '%s'",
jdbcType(), jdbcType));
+ }
+
+ List<@org.checkerframework.checker.nullness.qual.Nullable String>
connectionInitSql =
+ configuration.getConnectionInitSql();
+ if (connectionInitSql != null && !connectionInitSql.isEmpty()) {
+ LOG.warn("Postgres does not support connectionInitSql, ignoring.");
+ }
+
+ // Override "connectionInitSql" for postgres
+ configuration =
configuration.toBuilder().setConnectionInitSql(Collections.emptyList()).build();
+ return new PostgresWriteSchemaTransform(configuration);
+ }
+
+ public static class PostgresWriteSchemaTransform extends
JdbcWriteSchemaTransform {
+ public PostgresWriteSchemaTransform(JdbcWriteSchemaTransformConfiguration
config) {
+ super(config, POSTGRES);
+ }
+ }
}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
new file mode 100644
index 00000000000..d5878309692
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOPostgresIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.common.DatabaseTestHelper;
+import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
+import
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider;
+import
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider;
+import org.apache.beam.sdk.managed.Managed;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.postgresql.ds.PGSimpleDataSource;
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.jdbc.JdbcIO} on an independent
Postgres instance.
+ *
+ * <p>Similar to JdbcIOIT, this test requires a running instance of Postgres.
Pass in connection
+ * information using PipelineOptions:
+ *
+ * <pre>
+ * ./gradlew integrationTest -p sdks/java/io/jdbc
-DintegrationTestPipelineOptions='[
+ * "--postgresServerName=1.2.3.4",
+ * "--postgresUsername=postgres",
+ * "--postgresDatabaseName=myfancydb",
+ * "--postgresPassword=mypass",
+ * "--postgresSsl=false" ]'
+ * --tests org.apache.beam.sdk.io.jdbc.JdbcIOPostgresIT
+ * -DintegrationTestRunner=direct
+ * </pre>
+ */
+@RunWith(JUnit4.class)
+public class JdbcIOPostgresIT {
+ private static final Schema INPUT_SCHEMA =
+ Schema.of(
+ Schema.Field.of("id", Schema.FieldType.INT32),
+ Schema.Field.of("name", Schema.FieldType.STRING));
+
+ private static final List<Row> ROWS =
+ Arrays.asList(
+ Row.withSchema(INPUT_SCHEMA)
+ .withFieldValue("id", 1)
+ .withFieldValue("name", "foo")
+ .build(),
+ Row.withSchema(INPUT_SCHEMA)
+ .withFieldValue("id", 2)
+ .withFieldValue("name", "bar")
+ .build(),
+ Row.withSchema(INPUT_SCHEMA)
+ .withFieldValue("id", 3)
+ .withFieldValue("name", "baz")
+ .build());
+
+ private static PGSimpleDataSource dataSource;
+ private static String jdbcUrl;
+
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setup() {
+ PostgresIOTestPipelineOptions options;
+ try {
+ options = readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
+ } catch (IllegalArgumentException e) {
+ options = null;
+ }
+ org.junit.Assume.assumeNotNull(options);
+ dataSource = DatabaseTestHelper.getPostgresDataSource(options);
+ jdbcUrl = DatabaseTestHelper.getPostgresDBUrl(options);
+ }
+
+ @Test
+ public void testWriteThenRead() throws SQLException {
+ String tableName = DatabaseTestHelper.getTestTableName("JdbcIOPostgresIT");
+ DatabaseTestHelper.createTable(dataSource, tableName);
+
+ JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration
writeConfig =
+
JdbcWriteSchemaTransformProvider.JdbcWriteSchemaTransformConfiguration.builder()
+ .setJdbcUrl(jdbcUrl)
+ .setUsername(dataSource.getUser())
+ .setPassword(dataSource.getPassword())
+ .setLocation(tableName)
+ .build();
+
+ JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration
readConfig =
+
JdbcReadSchemaTransformProvider.JdbcReadSchemaTransformConfiguration.builder()
+ .setJdbcUrl(jdbcUrl)
+ .setUsername(dataSource.getUser())
+ .setPassword(dataSource.getPassword())
+ .setLocation(tableName)
+ .build();
+
+ try {
+ PCollection<Row> input =
writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA);
+ PCollectionRowTuple inputTuple = PCollectionRowTuple.of("input", input);
+ inputTuple.apply(
+ new
WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform(writeConfig));
+ writePipeline.run().waitUntilFinish();
+
+ PCollectionRowTuple pbeginTuple =
PCollectionRowTuple.empty(readPipeline);
+ PCollectionRowTuple outputTuple =
+ pbeginTuple.apply(
+ new
ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform(readConfig));
+ PCollection<Row> output = outputTuple.get("output");
+ PAssert.that(output).containsInAnyOrder(ROWS);
+ readPipeline.run().waitUntilFinish();
+ } finally {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+ }
+
+ @Test
+ public void testManagedWriteThenManagedRead() throws SQLException {
+ String tableName =
DatabaseTestHelper.getTestTableName("ManagedJdbcIOPostgresIT");
+ DatabaseTestHelper.createTable(dataSource, tableName);
+
+ Map<String, Object> writeConfig =
+ ImmutableMap.<String, Object>builder()
+ .put("jdbc_url", jdbcUrl)
+ .put("username", dataSource.getUser())
+ .put("password", dataSource.getPassword())
+ .put("location", tableName)
+ .build();
+
+ Map<String, Object> readConfig =
+ ImmutableMap.<String, Object>builder()
+ .put("jdbc_url", jdbcUrl)
+ .put("username", dataSource.getUser())
+ .put("password", dataSource.getPassword())
+ .put("location", tableName)
+ .build();
+
+ try {
+ PCollection<Row> input =
writePipeline.apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA);
+ input.apply(Managed.write(Managed.POSTGRES).withConfig(writeConfig));
+ writePipeline.run().waitUntilFinish();
+
+ PCollectionRowTuple output =
+
readPipeline.apply(Managed.read(Managed.POSTGRES).withConfig(readConfig));
+ PAssert.that(output.get("output")).containsInAnyOrder(ROWS);
+ readPipeline.run().waitUntilFinish();
+ } finally {
+ DatabaseTestHelper.deleteTable(dataSource, tableName);
+ }
+ }
+}
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
new file mode 100644
index 00000000000..503baaefc33
--- /dev/null
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/providers/PostgresSchemaTransformTranslationTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc.providers;
+
+import static
org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
+import static
org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresReadSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.PostgresSchemaTransformTranslation.PostgresWriteSchemaTransformTranslator;
+import static
org.apache.beam.sdk.io.jdbc.providers.ReadFromPostgresSchemaTransformProvider.PostgresReadSchemaTransform;
+import static
org.apache.beam.sdk.io.jdbc.providers.WriteToPostgresSchemaTransformProvider.PostgresWriteSchemaTransform;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import
org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaTranslation;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.construction.BeamUrns;
+import org.apache.beam.sdk.util.construction.PipelineTranslation;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+public class PostgresSchemaTransformTranslationTest {
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ static final WriteToPostgresSchemaTransformProvider WRITE_PROVIDER =
+ new WriteToPostgresSchemaTransformProvider();
+ static final ReadFromPostgresSchemaTransformProvider READ_PROVIDER =
+ new ReadFromPostgresSchemaTransformProvider();
+
+ static final Row READ_CONFIG =
+ Row.withSchema(READ_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("disable_auto_commit", true)
+ .withFieldValue("fetch_size", 10)
+ .withFieldValue("num_partitions", 5)
+ .withFieldValue("output_parallelization", true)
+ .withFieldValue("partition_column", "col")
+ .withFieldValue("read_query", null)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .build();
+
+ static final Row WRITE_CONFIG =
+ Row.withSchema(WRITE_PROVIDER.configurationSchema())
+ .withFieldValue("jdbc_url", "jdbc:postgresql://host:port/database")
+ .withFieldValue("location", "test_table")
+ .withFieldValue("autosharding", true)
+ .withFieldValue("connection_init_sql",
ImmutableList.<String>builder().build())
+ .withFieldValue("connection_properties", "some_property")
+ .withFieldValue("driver_class_name", null)
+ .withFieldValue("driver_jars", null)
+ .withFieldValue("batch_size", 100L)
+ .withFieldValue("username", "my_user")
+ .withFieldValue("password", "my_pass")
+ .withFieldValue("write_statement", null)
+ .build();
+
+ @Test
+ public void testRecreateWriteTransformFromRow() {
+ PostgresWriteSchemaTransform writeTransform =
+ (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+
+ PostgresWriteSchemaTransformTranslator translator =
+ new PostgresWriteSchemaTransformTranslator();
+ Row translatedRow = translator.toConfigRow(writeTransform);
+
+ PostgresWriteSchemaTransform writeTransformFromRow =
+ translator.fromConfigRow(translatedRow,
PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testWriteTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+ Schema inputSchema = Schema.builder().addStringField("name").build();
+ PCollection<Row> input =
+ p.apply(
+ Create.of(
+ Collections.singletonList(
+ Row.withSchema(inputSchema).addValue("test").build())))
+ .setRowSchema(inputSchema);
+
+ PostgresWriteSchemaTransform writeTransform =
+ (PostgresWriteSchemaTransform) WRITE_PROVIDER.from(WRITE_CONFIG);
+ PCollectionRowTuple.of("input", input).apply(writeTransform);
+
+ // Then translate the pipeline to a proto and extract
PostgresWriteSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> writeTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(WRITE_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, writeTransformProto.size());
+ RunnerApi.FunctionSpec spec = writeTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(WRITE_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+
+ assertEquals(WRITE_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
PostgresWriteSchemaTransform
+ PostgresWriteSchemaTransformTranslator translator =
+ new PostgresWriteSchemaTransformTranslator();
+ PostgresWriteSchemaTransform writeTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(WRITE_CONFIG, writeTransformFromSpec.getConfigurationRow());
+ }
+
+ @Test
+ public void testReCreateReadTransformFromRow() {
+ // setting a subset of fields here.
+ PostgresReadSchemaTransform readTransform =
+ (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ PostgresReadSchemaTransformTranslator translator = new
PostgresReadSchemaTransformTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ PostgresReadSchemaTransform readTransformFromRow =
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromRow.getConfigurationRow());
+ }
+
+ @Test
+ public void testReadTransformProtoTranslation()
+ throws InvalidProtocolBufferException, IOException {
+ // First build a pipeline
+ Pipeline p = Pipeline.create();
+
+ PostgresReadSchemaTransform readTransform =
+ (PostgresReadSchemaTransform) READ_PROVIDER.from(READ_CONFIG);
+
+ // Mock inferBeamSchema since it requires database connection.
+ Schema expectedSchema = Schema.builder().addStringField("name").build();
+ try (MockedStatic<JdbcIO.ReadRows> mock =
Mockito.mockStatic(JdbcIO.ReadRows.class)) {
+ mock.when(() -> JdbcIO.ReadRows.inferBeamSchema(Mockito.any(),
Mockito.any()))
+ .thenReturn(expectedSchema);
+ PCollectionRowTuple.empty(p).apply(readTransform);
+ }
+
+ // Then translate the pipeline to a proto and extract
PostgresReadSchemaTransform proto
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+ List<RunnerApi.PTransform> readTransformProto =
+ pipelineProto.getComponents().getTransformsMap().values().stream()
+ .filter(
+ tr -> {
+ RunnerApi.FunctionSpec spec = tr.getSpec();
+ try {
+ return
spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))
+ && SchemaTransformPayload.parseFrom(spec.getPayload())
+ .getIdentifier()
+ .equals(READ_PROVIDER.identifier());
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ assertEquals(1, readTransformProto.size());
+ RunnerApi.FunctionSpec spec = readTransformProto.get(0).getSpec();
+
+ // Check that the proto contains correct values
+ SchemaTransformPayload payload =
SchemaTransformPayload.parseFrom(spec.getPayload());
+ Schema schemaFromSpec =
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
+ assertEquals(READ_PROVIDER.configurationSchema(), schemaFromSpec);
+ Row rowFromSpec =
RowCoder.of(schemaFromSpec).decode(payload.getConfigurationRow().newInput());
+ assertEquals(READ_CONFIG, rowFromSpec);
+
+ // Use the information in the proto to recreate the
PostgresReadSchemaTransform
+ PostgresReadSchemaTransformTranslator translator = new
PostgresReadSchemaTransformTranslator();
+ PostgresReadSchemaTransform readTransformFromSpec =
+ translator.fromConfigRow(rowFromSpec, PipelineOptionsFactory.create());
+
+ assertEquals(READ_CONFIG, readTransformFromSpec.getConfigurationRow());
+ }
+}
diff --git
a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 06aed06c71c..cda84629a7d 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -96,6 +96,7 @@ public class Managed {
public static final String ICEBERG_CDC = "iceberg_cdc";
public static final String KAFKA = "kafka";
public static final String BIGQUERY = "bigquery";
+ public static final String POSTGRES = "postgres";
// Supported SchemaTransforms
public static final Map<String, String> READ_TRANSFORMS =
@@ -104,12 +105,14 @@ public class Managed {
.put(ICEBERG_CDC,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_CDC_READ))
.put(KAFKA,
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_READ))
+ .put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_READ))
.build();
public static final Map<String, String> WRITE_TRANSFORMS =
ImmutableMap.<String, String>builder()
.put(ICEBERG,
getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
.put(KAFKA,
getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
.put(BIGQUERY,
getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE))
+ .put(POSTGRES,
getUrn(ExternalTransforms.ManagedTransforms.Urns.POSTGRES_WRITE))
.build();
/**
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index 782fa3d030b..b22ed6e0c64 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -80,7 +80,9 @@ MANAGED_TRANSFORM_URN_TO_JAR_TARGET_MAPPING = {
ManagedTransforms.Urns.KAFKA_READ.urn: _IO_EXPANSION_SERVICE_JAR_TARGET,
ManagedTransforms.Urns.KAFKA_WRITE.urn: _IO_EXPANSION_SERVICE_JAR_TARGET,
ManagedTransforms.Urns.BIGQUERY_READ.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET,
- ManagedTransforms.Urns.BIGQUERY_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET
+ ManagedTransforms.Urns.BIGQUERY_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
+ ManagedTransforms.Urns.POSTGRES_READ.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET,
+ ManagedTransforms.Urns.POSTGRES_WRITE.urn:
_GCP_EXPANSION_SERVICE_JAR_TARGET, # pylint: disable=line-too-long
}
diff --git a/sdks/python/apache_beam/transforms/managed.py
b/sdks/python/apache_beam/transforms/managed.py
index bf680d5fd35..72dfb6fd9a0 100644
--- a/sdks/python/apache_beam/transforms/managed.py
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -85,6 +85,7 @@ ICEBERG = "iceberg"
_ICEBERG_CDC = "iceberg_cdc"
KAFKA = "kafka"
BIGQUERY = "bigquery"
+POSTGRES = "postgres"
__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"]
@@ -95,7 +96,8 @@ class Read(PTransform):
ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
_ICEBERG_CDC: ManagedTransforms.Urns.ICEBERG_CDC_READ.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
- BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn
+ BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn,
+ POSTGRES: ManagedTransforms.Urns.POSTGRES_READ.urn,
}
def __init__(
@@ -136,7 +138,8 @@ class Write(PTransform):
_WRITE_TRANSFORMS = {
ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
- BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn
+ BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn,
+ POSTGRES: ManagedTransforms.Urns.POSTGRES_WRITE.urn,
}
def __init__(