This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c34a4f288deb7dd349c6e52f674d7fa95aa013a1
Author: Flavio Pompermaier <[email protected]>
AuthorDate: Mon May 4 18:15:38 2020 +0200

    [FLINK-17361] Add custom query on JDBC tables
---
 docs/dev/table/connect.md                          |  6 ++++-
 .../jdbc/internal/options/JdbcReadOptions.java     | 29 ++++++++++++++++-----
 .../connector/jdbc/table/JdbcTableSource.java      |  9 +++++--
 .../jdbc/table/JdbcTableSourceSinkFactory.java     |  6 +++++
 .../flink/table/descriptors/JdbcValidator.java     |  2 ++
 .../jdbc/table/JdbcTableSourceITCase.java          | 30 ++++++++++++++++++++++
 .../jdbc/table/JdbcTableSourceSinkFactoryTest.java |  2 ++
 7 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 4a1e83a..ac2646c 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1307,7 +1307,11 @@ CREATE TABLE MyUserTable (
   'connector.username' = 'name',
   'connector.password' = 'password',
   
-  -- **followings are scan options, optional, used when reading from table**
+  -- **followings are scan options, optional, used when reading from a table**
+
+  -- optional: SQL query / prepared statement.
+  -- If set, this will take precedence over the 'connector.table' setting
+  'connector.read.query' = 'SELECT * FROM sometable',
 
   -- These options must all be specified if any of them is specified. In 
addition,
   -- partition.num must be specified. They describe how to partition the table 
when
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
index a1350ab..65b5729 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java
@@ -27,6 +27,7 @@ import java.util.Optional;
  */
 public class JdbcReadOptions implements Serializable {
 
+       private final String query;
        private final String partitionColumnName;
        private final Long partitionLowerBound;
        private final Long partitionUpperBound;
@@ -35,11 +36,13 @@ public class JdbcReadOptions implements Serializable {
        private final int fetchSize;
 
        private JdbcReadOptions(
+                       String query,
                        String partitionColumnName,
                        Long partitionLowerBound,
                        Long partitionUpperBound,
                        Integer numPartitions,
                        int fetchSize) {
+               this.query = query;
                this.partitionColumnName = partitionColumnName;
                this.partitionLowerBound = partitionLowerBound;
                this.partitionUpperBound = partitionUpperBound;
@@ -48,6 +51,10 @@ public class JdbcReadOptions implements Serializable {
                this.fetchSize = fetchSize;
        }
 
+       public Optional<String> getQuery() {
+               return Optional.ofNullable(query);
+       }
+
        public Optional<String> getPartitionColumnName() {
                return Optional.ofNullable(partitionColumnName);
        }
@@ -76,11 +83,12 @@ public class JdbcReadOptions implements Serializable {
        public boolean equals(Object o) {
                if (o instanceof JdbcReadOptions) {
                        JdbcReadOptions options = (JdbcReadOptions) o;
-                       return Objects.equals(partitionColumnName, 
options.partitionColumnName) &&
-                               Objects.equals(partitionLowerBound, 
options.partitionLowerBound) &&
-                               Objects.equals(partitionUpperBound, 
options.partitionUpperBound) &&
-                               Objects.equals(numPartitions, 
options.numPartitions) &&
-                               Objects.equals(fetchSize, options.fetchSize);
+                       return Objects.equals(query, options.query) &&
+                                       Objects.equals(partitionColumnName, 
options.partitionColumnName) &&
+                                       Objects.equals(partitionLowerBound, 
options.partitionLowerBound) &&
+                                       Objects.equals(partitionUpperBound, 
options.partitionUpperBound) &&
+                                       Objects.equals(numPartitions, 
options.numPartitions) &&
+                                       Objects.equals(fetchSize, 
options.fetchSize);
                } else {
                        return false;
                }
@@ -90,6 +98,7 @@ public class JdbcReadOptions implements Serializable {
         * Builder of {@link JdbcReadOptions}.
         */
        public static class Builder {
+               protected String query;
                protected String partitionColumnName;
                protected Long partitionLowerBound;
                protected Long partitionUpperBound;
@@ -98,6 +107,14 @@ public class JdbcReadOptions implements Serializable {
                protected int fetchSize = 0;
 
                /**
+                * optional, SQL query statement for this JDBC source.
+                */
+               public Builder setQuery(String query) {
+                       this.query = query;
+                       return this;
+               }
+
+               /**
                 * optional, name of the column used for partitioning the input.
                 */
                public Builder setPartitionColumnName(String 
partitionColumnName) {
@@ -140,7 +157,7 @@ public class JdbcReadOptions implements Serializable {
 
                public JdbcReadOptions build() {
                        return new JdbcReadOptions(
-                               partitionColumnName, partitionLowerBound, 
partitionUpperBound, numPartitions, fetchSize);
+                               query, partitionColumnName, 
partitionLowerBound, partitionUpperBound, numPartitions, fetchSize);
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
index a599f2f..ff21aae 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java
@@ -168,8 +168,7 @@ public class JdbcTableSource implements
                }
 
                final JdbcDialect dialect = options.getDialect();
-               String query = dialect.getSelectFromStatement(
-                       options.getTableName(), rowTypeInfo.getFieldNames(), 
new String[0]);
+               String query = getBaseQueryStatement(rowTypeInfo);
                if (readOptions.getPartitionColumnName().isPresent()) {
                        long lowerBound = 
readOptions.getPartitionLowerBound().get();
                        long upperBound = 
readOptions.getPartitionUpperBound().get();
@@ -185,6 +184,12 @@ public class JdbcTableSource implements
                return builder.finish();
        }
 
+       private String getBaseQueryStatement(RowTypeInfo rowTypeInfo) {
+               return readOptions.getQuery().orElseGet(() ->
+                       options.getDialect().getSelectFromStatement(
+                               options.getTableName(), 
rowTypeInfo.getFieldNames(), new String[0]));
+       }
+
        @Override
        public boolean equals(Object o) {
                if (o instanceof JdbcTableSource) {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
index bdc8642..438779f 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java
@@ -57,6 +57,7 @@ import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PA
 import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND;
 import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_NUM;
 import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND;
+import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_QUERY;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TABLE;
 import static 
org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TYPE_VALUE_JDBC;
 import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_URL;
@@ -96,6 +97,7 @@ public class JdbcTableSourceSinkFactory implements
                properties.add(CONNECTOR_PASSWORD);
 
                // scan options
+               properties.add(CONNECTOR_READ_QUERY);
                properties.add(CONNECTOR_READ_PARTITION_COLUMN);
                properties.add(CONNECTOR_READ_PARTITION_NUM);
                properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -184,6 +186,7 @@ public class JdbcTableSourceSinkFactory implements
        }
 
        private JdbcReadOptions getJdbcReadOptions(DescriptorProperties 
descriptorProperties) {
+               final Optional<String> query = 
descriptorProperties.getOptionalString(CONNECTOR_READ_QUERY);
                final Optional<String> partitionColumnName =
                        
descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN);
                final Optional<Long> partitionLower = 
descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND);
@@ -191,6 +194,9 @@ public class JdbcTableSourceSinkFactory implements
                final Optional<Integer> numPartitions = 
descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM);
 
                final JdbcReadOptions.Builder builder = 
JdbcReadOptions.builder();
+               if (query.isPresent()) {
+                       builder.setQuery(query.get());
+               }
                if (partitionColumnName.isPresent()) {
                        
builder.setPartitionColumnName(partitionColumnName.get());
                        builder.setPartitionLowerBound(partitionLower.get());
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
index e8b0fe5..218759e 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java
@@ -43,6 +43,7 @@ public class JdbcValidator extends 
ConnectorDescriptorValidator {
        public static final String CONNECTOR_USERNAME = "connector.username";
        public static final String CONNECTOR_PASSWORD = "connector.password";
 
+       public static final String CONNECTOR_READ_QUERY = 
"connector.read.query";
        public static final String CONNECTOR_READ_PARTITION_COLUMN = 
"connector.read.partition.column";
        public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = 
"connector.read.partition.lower-bound";
        public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = 
"connector.read.partition.upper-bound";
@@ -89,6 +90,7 @@ public class JdbcValidator extends 
ConnectorDescriptorValidator {
        }
 
        private void validateReadProperties(DescriptorProperties properties) {
+               properties.validateString(CONNECTOR_READ_QUERY, true);
                properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, 
true);
                properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, 
true);
                properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, 
true);
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 74e90b2..fa8d98a 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -154,4 +154,34 @@ public class JdbcTableSourceITCase extends 
AbstractTestBase {
                                "2020-01-01T15:36:01.123456,101.1234");
                StreamITCase.compareWithList(expected);
        }
+
+       @Test
+       public void testScanQueryJDBCSource() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               EnvironmentSettings envSettings = 
EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, envSettings);
+
+               final String testQuery = "SELECT id FROM " + INPUT_TABLE;
+               tEnv.sqlUpdate(
+                       "CREATE TABLE test(" +
+                               "id BIGINT" +
+                               ") WITH (" +
+                               "  'connector.type'='jdbc'," +
+                               "  'connector.url'='" + DB_URL + "'," +
+                               "  'connector.table'='whatever'," +
+                               "  'connector.read.query'='" + testQuery + "'" +
+                               ")"
+               );
+
+               StreamITCase.clear();
+               tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), 
Row.class)
+                       .addSink(new StreamITCase.StringSink<>());
+               env.execute();
+
+               List<String> expected = Arrays.asList("1", "2");
+               StreamITCase.compareWithList(expected);
+       }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
index 38cb29c..7f15565 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java
@@ -88,6 +88,7 @@ public class JdbcTableSourceSinkFactoryTest {
        @Test
        public void testJdbcReadProperties() {
                Map<String, String> properties = getBasicProperties();
+               properties.put("connector.read.query", "SELECT aaa FROM 
mytable");
                properties.put("connector.read.partition.column", "aaa");
                properties.put("connector.read.partition.lower-bound", "-10");
                properties.put("connector.read.partition.upper-bound", "100");
@@ -102,6 +103,7 @@ public class JdbcTableSourceSinkFactoryTest {
                        .setTableName("mytable")
                        .build();
                final JdbcReadOptions readOptions = JdbcReadOptions.builder()
+                       .setQuery("SELECT aaa FROM mytable")
                        .setPartitionColumnName("aaa")
                        .setPartitionLowerBound(-10)
                        .setPartitionUpperBound(100)

Reply via email to