svetakvsundhar commented on code in PR #34393:
URL: https://github.com/apache/beam/pull/34393#discussion_r2009269295
##########
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java:
##########
@@ -889,6 +927,305 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
+ /** Implementation of {@link #readRowsWithPartitions}. */
+ @AutoValue
+ public abstract static class ReadRowsWithPartitions<PartitionColumnT>
+ extends PTransform<PBegin, PCollection<Row>> {
+
+ @Pure
+ abstract @Nullable SerializableFunction<Void, DataSource>
getDataSourceProviderFn();
+
+ @Pure
+ abstract int getFetchSize();
+
+ @Pure
+ abstract boolean getDisableAutoCommit();
+
+ @Pure
+ abstract @Nullable Schema getSchema();
+
+ @Pure
+ abstract @Nullable Integer getNumPartitions();
+
+ @Pure
+ abstract @Nullable String getPartitionColumn();
+
+ @Pure
+ abstract @Nullable PartitionColumnT getLowerBound();
+
+ @Pure
+ abstract @Nullable PartitionColumnT getUpperBound();
+
+ @Pure
+ abstract @Nullable String getTable();
+
+ @Pure
+ abstract @Nullable TypeDescriptor<PartitionColumnT>
getPartitionColumnType();
+
+ @Pure
+ abstract @Nullable JdbcReadWithPartitionsHelper<PartitionColumnT>
getPartitionsHelper();
+
+ @Pure
+ abstract boolean getUseBeamSchema();
+
+ @Pure
+ abstract Builder<PartitionColumnT> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<PartitionColumnT> {
+
+ abstract Builder<PartitionColumnT> setDataSourceProviderFn(
+ SerializableFunction<Void, DataSource> dataSourceProviderFn);
+
+ abstract Builder<PartitionColumnT> setFetchSize(int fetchSize);
+
+ abstract Builder<PartitionColumnT> setNumPartitions(int numPartitions);
+
+ abstract Builder<PartitionColumnT> setPartitionColumn(String
partitionColumn);
+
+ abstract Builder<PartitionColumnT> setLowerBound(PartitionColumnT
lowerBound);
+
+ abstract Builder<PartitionColumnT> setUpperBound(PartitionColumnT
upperBound);
+
+ abstract Builder<PartitionColumnT> setUseBeamSchema(boolean
useBeamSchema);
+
+ abstract Builder<PartitionColumnT> setTable(String tableName);
+
+ abstract Builder<PartitionColumnT> setPartitionColumnType(
+ TypeDescriptor<PartitionColumnT> partitionColumnType);
+
+ abstract Builder<PartitionColumnT> setPartitionsHelper(
+ JdbcReadWithPartitionsHelper<PartitionColumnT> partitionsHelper);
+
+ abstract Builder<PartitionColumnT> setDisableAutoCommit(boolean
disableAutoCommit);
+
+ abstract Builder<PartitionColumnT> setSchema(@Nullable Schema schema);
+
+ abstract ReadRowsWithPartitions<PartitionColumnT> build();
+ }
+
+ public ReadRowsWithPartitions<PartitionColumnT>
withDataSourceConfiguration(
+ final DataSourceConfiguration config) {
+ return withDataSourceProviderFn(new
DataSourceProviderFromDataSourceConfiguration(config));
+ }
+
+ public ReadRowsWithPartitions<PartitionColumnT> withDataSourceProviderFn(
+ SerializableFunction<Void, DataSource> dataSourceProviderFn) {
+ return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
+ }
+
+ /**
+ * The number of partitions. This, along with withLowerBound and
withUpperBound, form partitions
+ * strides for generated WHERE clause expressions used to split the column
withPartitionColumn
+ * evenly. When the input is less than 1, the number is set to 1.
+ */
+ public ReadRowsWithPartitions<PartitionColumnT> withNumPartitions(int
numPartitions) {
+ checkArgument(numPartitions > 0, "numPartitions can not be less than 1");
+ return toBuilder().setNumPartitions(numPartitions).build();
+ }
+
+ /** The name of a column of numeric type that will be used for
partitioning. */
+ public ReadRowsWithPartitions<PartitionColumnT> withPartitionColumn(String
partitionColumn) {
+ checkNotNull(partitionColumn, "partitionColumn can not be null");
+ return toBuilder().setPartitionColumn(partitionColumn).build();
+ }
+
+ /** The number of rows to fetch from the database in the same {@link
ResultSet} round-trip. */
+ public ReadRowsWithPartitions<PartitionColumnT> withFetchSize(int
fetchSize) {
+ checkArgument(fetchSize > 0, "fetchSize can not be less than 1");
+ return toBuilder().setFetchSize(fetchSize).build();
+ }
+
+ /**
+ * Whether to disable auto commit on read. Defaults to true if not
provided. The need for this
+ * config varies depending on the database platform. Informix requires
this to be set to false
+ * while Postgres requires this to be set to true.
+ */
+ public ReadRowsWithPartitions<PartitionColumnT> withDisableAutoCommit(
+ boolean disableAutoCommit) {
+ return toBuilder().setDisableAutoCommit(disableAutoCommit).build();
+ }
+
+ /** Data output type is {@link Row}, and schema is auto-inferred from the
database. */
+ public ReadRowsWithPartitions<PartitionColumnT> withRowOutput() {
+ return toBuilder().setUseBeamSchema(true).build();
+ }
+
+ public ReadRowsWithPartitions<PartitionColumnT>
withLowerBound(PartitionColumnT lowerBound) {
+ return toBuilder().setLowerBound(lowerBound).build();
+ }
+
+ public ReadRowsWithPartitions<PartitionColumnT>
withUpperBound(PartitionColumnT upperBound) {
+ return toBuilder().setUpperBound(upperBound).build();
+ }
+
+ /** Name of the table in the external database. Can be used to pass a
user-defined subqery. */
+ public ReadRowsWithPartitions<PartitionColumnT> withTable(String
tableName) {
+ checkNotNull(tableName, "table can not be null");
+ return toBuilder().setTable(tableName).build();
+ }
+
+ public ReadRowsWithPartitions<PartitionColumnT> withSchema(Schema schema) {
+ return toBuilder().setSchema(schema).build();
+ }
+
+ private static final int EQUAL = 0;
Review Comment:
Can we add a comment as to what this variable is for?
##########
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java:
##########
@@ -515,6 +515,110 @@ public void testIncompatibleSchemaThrowsError() {
assertThrows(PipelineExecutionException.class, () ->
pipeline.run().waitUntilFinish());
}
+ @Test
+ public void testReadRowsPartitions() {
+ PCollection<Row> rows =
+ pipeline.apply(
+ JdbcIO.readRowsWithPartitions()
+ .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+ .withTable(READ_TABLE_NAME)
+ .withNumPartitions(1)
+ .withPartitionColumn("id")
+ .withLowerBound(0L)
+ .withUpperBound(1000L));
+ PAssert.thatSingleton(rows.apply("Count All",
Count.globally())).isEqualTo(1000L);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadRowsPartitionsWithExplicitSchema() {
+ Schema customSchema =
+ Schema.of(
+ Schema.Field.of("CUSTOMER_NAME",
Schema.FieldType.STRING).withNullable(true),
+ Schema.Field.of("CUSTOMER_ID",
Schema.FieldType.INT32).withNullable(true));
+ PCollection<Row> rows =
+ pipeline.apply(
+ JdbcIO.readRowsWithPartitions()
+ .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+ .withTable(String.format("(select name,id from %s) as subq",
READ_TABLE_NAME))
+ .withNumPartitions(5)
+ .withPartitionColumn("id")
+ .withLowerBound(0L)
+ .withUpperBound(1000L)
+ .withRowOutput()
+ .withSchema(customSchema));
+ assertEquals(customSchema, rows.getSchema());
+ PAssert.thatSingleton(rows.apply("Count All",
Count.globally())).isEqualTo(1000L);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadRowsPartitionsBySubqery() {
+ PCollection<Row> rows =
+ pipeline.apply(
+ JdbcIO.readRowsWithPartitions()
+ .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+ .withTable(String.format("(select * from %s) as subq",
READ_TABLE_NAME))
+ .withNumPartitions(10)
+ .withPartitionColumn("id")
+ .withLowerBound(0L)
+ .withUpperBound(1000L));
+ PAssert.thatSingleton(rows.apply("Count All",
Count.globally())).isEqualTo(1000L);
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadRowsPartitionsIfNumPartitionsIsZero() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("numPartitions can not be less than 1");
+ pipeline.apply(
+ JdbcIO.readRowsWithPartitions()
+ .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+ .withTable(READ_TABLE_NAME)
+ .withNumPartitions(0)
+ .withPartitionColumn("id")
+ .withLowerBound(0L)
+ .withUpperBound(1000L));
+ pipeline.run();
+ }
+
+ @Test
+ public void testReadRowsPartitionsLowerBoundIsMoreThanUpperBound() {
Review Comment:
Can we also add a test to test a null partition column and a null table name?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]