apilloud commented on a change in pull request #15373:
URL: https://github.com/apache/beam/pull/15373#discussion_r695992271



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -148,6 +132,95 @@ public PDone expand(PCollection<Row> input) {
       };
     }
 
+    @AutoValue
+    public abstract static class Reader extends PTransform<PBegin, 
PCollection<Row>>
+        implements PushdownProjector<PBegin> {
+      abstract String getLocation();
+
+      abstract Row getConfig();
+
+      abstract JdbcIO.DataSourceConfiguration getDataSourceConfiguration();
+
+      abstract FieldAccessDescriptor getFieldAccessDescriptor();
+
+      abstract Builder toBuilder();
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setLocation(String location);
+
+        abstract Builder setConfig(Row config);
+
+        abstract Builder setDataSourceConfiguration(
+            JdbcIO.DataSourceConfiguration dataSourceConfiguration);
+
+        abstract Builder setFieldAccessDescriptor(FieldAccessDescriptor 
fieldAccessDescriptor);
+
+        abstract Reader build();
+      }
+
+      static Builder builder() {
+        return new 
AutoValue_JdbcSchemaIOProvider_JdbcSchemaIO_Reader.Builder();
+      }
+
+      @Override
+      public PCollection<Row> expand(PBegin input) {
+        String readQuery;
+
+        JdbcIO.ReadRows readRows =
+            JdbcIO.readRows()
+                .withDataSourceConfiguration(getDataSourceConfiguration())
+                .withQuery(getReadQuery());
+
+        if (getConfig().getInt16("fetchSize") != null) {
+          readRows = readRows.withFetchSize(getConfig().getInt16("fetchSize"));
+        }
+        if (getConfig().getBoolean("outputParallelization") != null) {
+          readRows =
+              
readRows.withOutputParallelization(getConfig().getBoolean("outputParallelization"));
+        }
+        return input.apply(readRows);
+      }
+
+      @Override
+      public PTransform<PBegin, PCollection<Row>> withProjectionPushdown(
+          FieldAccessDescriptor fieldAccessDescriptor) {
+        return 
toBuilder().setFieldAccessDescriptor(fieldAccessDescriptor).build();
+      }
+
+      @Override
+      public boolean supportsFieldReordering() {
+        return true;
+      }
+
+      private String getReadQuery() {
+        if (getConfig().getString("readQuery") != null) {
+          return getConfig().getString("readQuery");
+        }
+        if (getFieldAccessDescriptor().getAllFields()) {
+          return String.format("SELECT * FROM %s", getLocation());
+        }
+
+        // Build query from field access descriptor.
+        StringBuilder query = new StringBuilder("SELECT ");
+        List<FieldAccessDescriptor.FieldDescriptor> fieldsAccessed =
+            getFieldAccessDescriptor().getFieldsAccessed();
+        for (int i = 0; i < fieldsAccessed.size(); i++) {
+          if (i > 0) {
+            query.append(", ");
+          }
+          query.append(fieldsAccessed.get(i).getFieldName());
+          if (fieldsAccessed.get(i).getFieldRename() != null) {
+            query.append(" AS ");
+            query.append(fieldsAccessed.get(i).getFieldRename());
+          }
+        }
+        query.append(" FROM ");
+        query.append(getLocation());

Review comment:
       This is an existing bug, but what happens when the table name is `WHERE`?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -148,6 +132,95 @@ public PDone expand(PCollection<Row> input) {
       };
     }
 
+    @AutoValue
+    public abstract static class Reader extends PTransform<PBegin, 
PCollection<Row>>
+        implements PushdownProjector<PBegin> {
+      abstract String getLocation();
+
+      abstract Row getConfig();
+
+      abstract JdbcIO.DataSourceConfiguration getDataSourceConfiguration();
+
+      abstract FieldAccessDescriptor getFieldAccessDescriptor();
+
+      abstract Builder toBuilder();
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setLocation(String location);
+
+        abstract Builder setConfig(Row config);
+
+        abstract Builder setDataSourceConfiguration(
+            JdbcIO.DataSourceConfiguration dataSourceConfiguration);
+
+        abstract Builder setFieldAccessDescriptor(FieldAccessDescriptor 
fieldAccessDescriptor);
+
+        abstract Reader build();
+      }
+
+      static Builder builder() {
+        return new 
AutoValue_JdbcSchemaIOProvider_JdbcSchemaIO_Reader.Builder();
+      }
+
+      @Override
+      public PCollection<Row> expand(PBegin input) {
+        String readQuery;
+
+        JdbcIO.ReadRows readRows =
+            JdbcIO.readRows()
+                .withDataSourceConfiguration(getDataSourceConfiguration())
+                .withQuery(getReadQuery());
+
+        if (getConfig().getInt16("fetchSize") != null) {
+          readRows = readRows.withFetchSize(getConfig().getInt16("fetchSize"));
+        }
+        if (getConfig().getBoolean("outputParallelization") != null) {
+          readRows =
+              
readRows.withOutputParallelization(getConfig().getBoolean("outputParallelization"));
+        }
+        return input.apply(readRows);
+      }
+
+      @Override
+      public PTransform<PBegin, PCollection<Row>> withProjectionPushdown(
+          FieldAccessDescriptor fieldAccessDescriptor) {
+        return 
toBuilder().setFieldAccessDescriptor(fieldAccessDescriptor).build();
+      }
+
+      @Override
+      public boolean supportsFieldReordering() {
+        return true;
+      }
+
+      private String getReadQuery() {
+        if (getConfig().getString("readQuery") != null) {
+          return getConfig().getString("readQuery");
+        }
+        if (getFieldAccessDescriptor().getAllFields()) {
+          return String.format("SELECT * FROM %s", getLocation());
+        }
+
+        // Build query from field access descriptor.

Review comment:
       JdbcUtil.generateStatement does something similar for `INSERT INTO`. 
There is also a generateWriteStatement method below in this file. They all seem 
to have the same bugs. It would be nice if there was some code reuse across 
these.

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcSchemaIOProvider.java
##########
@@ -148,6 +132,95 @@ public PDone expand(PCollection<Row> input) {
       };
     }
 
+    @AutoValue
+    public abstract static class Reader extends PTransform<PBegin, 
PCollection<Row>>
+        implements PushdownProjector<PBegin> {
+      abstract String getLocation();
+
+      abstract Row getConfig();
+
+      abstract JdbcIO.DataSourceConfiguration getDataSourceConfiguration();
+
+      abstract FieldAccessDescriptor getFieldAccessDescriptor();
+
+      abstract Builder toBuilder();
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setLocation(String location);
+
+        abstract Builder setConfig(Row config);
+
+        abstract Builder setDataSourceConfiguration(
+            JdbcIO.DataSourceConfiguration dataSourceConfiguration);
+
+        abstract Builder setFieldAccessDescriptor(FieldAccessDescriptor 
fieldAccessDescriptor);
+
+        abstract Reader build();
+      }
+
+      static Builder builder() {
+        return new 
AutoValue_JdbcSchemaIOProvider_JdbcSchemaIO_Reader.Builder();
+      }
+
+      @Override
+      public PCollection<Row> expand(PBegin input) {
+        String readQuery;
+
+        JdbcIO.ReadRows readRows =
+            JdbcIO.readRows()
+                .withDataSourceConfiguration(getDataSourceConfiguration())
+                .withQuery(getReadQuery());
+
+        if (getConfig().getInt16("fetchSize") != null) {
+          readRows = readRows.withFetchSize(getConfig().getInt16("fetchSize"));
+        }
+        if (getConfig().getBoolean("outputParallelization") != null) {
+          readRows =
+              
readRows.withOutputParallelization(getConfig().getBoolean("outputParallelization"));
+        }
+        return input.apply(readRows);
+      }
+
+      @Override
+      public PTransform<PBegin, PCollection<Row>> withProjectionPushdown(
+          FieldAccessDescriptor fieldAccessDescriptor) {
+        return 
toBuilder().setFieldAccessDescriptor(fieldAccessDescriptor).build();
+      }
+
+      @Override
+      public boolean supportsFieldReordering() {
+        return true;
+      }
+
+      private String getReadQuery() {
+        if (getConfig().getString("readQuery") != null) {
+          return getConfig().getString("readQuery");
+        }
+        if (getFieldAccessDescriptor().getAllFields()) {
+          return String.format("SELECT * FROM %s", getLocation());
+        }
+
+        // Build query from field access descriptor.
+        StringBuilder query = new StringBuilder("SELECT ");
+        List<FieldAccessDescriptor.FieldDescriptor> fieldsAccessed =
+            getFieldAccessDescriptor().getFieldsAccessed();
+        for (int i = 0; i < fieldsAccessed.size(); i++) {
+          if (i > 0) {
+            query.append(", ");
+          }
+          query.append(fieldsAccessed.get(i).getFieldName());

Review comment:
       What happens when the field name is `FROM` (or another reserved keyword)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to