This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c34a4f288deb7dd349c6e52f674d7fa95aa013a1 Author: Flavio Pompermaier <[email protected]> AuthorDate: Mon May 4 18:15:38 2020 +0200 [FLINK-17361] Add custom query on JDBC tables --- docs/dev/table/connect.md | 6 ++++- .../jdbc/internal/options/JdbcReadOptions.java | 29 ++++++++++++++++----- .../connector/jdbc/table/JdbcTableSource.java | 9 +++++-- .../jdbc/table/JdbcTableSourceSinkFactory.java | 6 +++++ .../flink/table/descriptors/JdbcValidator.java | 2 ++ .../jdbc/table/JdbcTableSourceITCase.java | 30 ++++++++++++++++++++++ .../jdbc/table/JdbcTableSourceSinkFactoryTest.java | 2 ++ 7 files changed, 75 insertions(+), 9 deletions(-) diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 4a1e83a..ac2646c 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -1307,7 +1307,11 @@ CREATE TABLE MyUserTable ( 'connector.username' = 'name', 'connector.password' = 'password', - -- **followings are scan options, optional, used when reading from table** + -- **followings are scan options, optional, used when reading from a table** + + -- optional: SQL query / prepared statement. + -- If set, this will take precedence over the 'connector.table' setting + 'connector.read.query' = 'SELECT * FROM sometable', -- These options must all be specified if any of them is specified. In addition, -- partition.num must be specified. They describe how to partition the table when diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java index a1350ab..65b5729 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcReadOptions.java @@ -27,6 +27,7 @@ import java.util.Optional; */ public class JdbcReadOptions implements Serializable { + private final String query; private final String partitionColumnName; private final Long partitionLowerBound; private final Long partitionUpperBound; @@ -35,11 +36,13 @@ public class JdbcReadOptions implements Serializable { private final int fetchSize; private JdbcReadOptions( + String query, String partitionColumnName, Long partitionLowerBound, Long partitionUpperBound, Integer numPartitions, int fetchSize) { + this.query = query; this.partitionColumnName = partitionColumnName; this.partitionLowerBound = partitionLowerBound; this.partitionUpperBound = partitionUpperBound; @@ -48,6 +51,10 @@ public class JdbcReadOptions implements Serializable { this.fetchSize = fetchSize; } + public Optional<String> getQuery() { + return Optional.ofNullable(query); + } + public Optional<String> getPartitionColumnName() { return Optional.ofNullable(partitionColumnName); } @@ -76,11 +83,12 @@ public class JdbcReadOptions implements Serializable { public boolean equals(Object o) { if (o instanceof JdbcReadOptions) { JdbcReadOptions options = (JdbcReadOptions) o; - return Objects.equals(partitionColumnName, options.partitionColumnName) && - Objects.equals(partitionLowerBound, options.partitionLowerBound) && - Objects.equals(partitionUpperBound, options.partitionUpperBound) && - Objects.equals(numPartitions, options.numPartitions) && - Objects.equals(fetchSize, options.fetchSize); + return Objects.equals(query, options.query) && + Objects.equals(partitionColumnName, options.partitionColumnName) && + Objects.equals(partitionLowerBound, options.partitionLowerBound) && + Objects.equals(partitionUpperBound, options.partitionUpperBound) && + Objects.equals(numPartitions, options.numPartitions) && + Objects.equals(fetchSize, options.fetchSize); } else { return false; } @@ -90,6 +98,7 @@ public class JdbcReadOptions implements Serializable { * Builder of {@link JdbcReadOptions}. */ public static class Builder { + protected String query; protected String partitionColumnName; protected Long partitionLowerBound; protected Long partitionUpperBound; @@ -98,6 +107,14 @@ public class JdbcReadOptions implements Serializable { protected int fetchSize = 0; /** + * optional, SQL query statement for this JDBC source. + */ + public Builder setQuery(String query) { + this.query = query; + return this; + } + + /** * optional, name of the column used for partitioning the input. */ public Builder setPartitionColumnName(String partitionColumnName) { @@ -140,7 +157,7 @@ public class JdbcReadOptions implements Serializable { public JdbcReadOptions build() { return new JdbcReadOptions( - partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize); + query, partitionColumnName, partitionLowerBound, partitionUpperBound, numPartitions, fetchSize); } } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java index a599f2f..ff21aae 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSource.java @@ -168,8 +168,7 @@ public class JdbcTableSource implements } final JdbcDialect dialect = options.getDialect(); - String query = dialect.getSelectFromStatement( - options.getTableName(), rowTypeInfo.getFieldNames(), new String[0]); + String query = getBaseQueryStatement(rowTypeInfo); if (readOptions.getPartitionColumnName().isPresent()) { long lowerBound = readOptions.getPartitionLowerBound().get(); long upperBound = readOptions.getPartitionUpperBound().get(); @@ -185,6 +184,12 @@ public class JdbcTableSource implements return builder.finish(); } + private String getBaseQueryStatement(RowTypeInfo rowTypeInfo) { + return readOptions.getQuery().orElseGet(() -> + options.getDialect().getSelectFromStatement( + options.getTableName(), rowTypeInfo.getFieldNames(), new String[0])); + } + @Override public boolean equals(Object o) { if (o instanceof JdbcTableSource) { diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java index bdc8642..438779f 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactory.java @@ -57,6 +57,7 @@ import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PA import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_LOWER_BOUND; import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_NUM; import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_PARTITION_UPPER_BOUND; +import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_READ_QUERY; import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TABLE; import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_TYPE_VALUE_JDBC; import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_URL; @@ -96,6 +97,7 @@ public class JdbcTableSourceSinkFactory implements properties.add(CONNECTOR_PASSWORD); // scan options + properties.add(CONNECTOR_READ_QUERY); properties.add(CONNECTOR_READ_PARTITION_COLUMN); properties.add(CONNECTOR_READ_PARTITION_NUM); properties.add(CONNECTOR_READ_PARTITION_LOWER_BOUND); @@ -184,6 +186,7 @@ public class JdbcTableSourceSinkFactory implements } private JdbcReadOptions getJdbcReadOptions(DescriptorProperties descriptorProperties) { + final Optional<String> query = descriptorProperties.getOptionalString(CONNECTOR_READ_QUERY); final Optional<String> partitionColumnName = descriptorProperties.getOptionalString(CONNECTOR_READ_PARTITION_COLUMN); final Optional<Long> partitionLower = descriptorProperties.getOptionalLong(CONNECTOR_READ_PARTITION_LOWER_BOUND); @@ -191,6 +194,9 @@ public class JdbcTableSourceSinkFactory implements final Optional<Integer> numPartitions = descriptorProperties.getOptionalInt(CONNECTOR_READ_PARTITION_NUM); final JdbcReadOptions.Builder builder = JdbcReadOptions.builder(); + if (query.isPresent()) { + builder.setQuery(query.get()); + } if (partitionColumnName.isPresent()) { builder.setPartitionColumnName(partitionColumnName.get()); builder.setPartitionLowerBound(partitionLower.get()); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java index e8b0fe5..218759e 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/table/descriptors/JdbcValidator.java @@ -43,6 +43,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator { public static final String CONNECTOR_USERNAME = "connector.username"; public static final String CONNECTOR_PASSWORD = "connector.password"; + public static final String CONNECTOR_READ_QUERY = "connector.read.query"; public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column"; public static final String CONNECTOR_READ_PARTITION_LOWER_BOUND = "connector.read.partition.lower-bound"; public static final String CONNECTOR_READ_PARTITION_UPPER_BOUND = "connector.read.partition.upper-bound"; @@ -89,6 +90,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator { } private void validateReadProperties(DescriptorProperties properties) { + properties.validateString(CONNECTOR_READ_QUERY, true); properties.validateString(CONNECTOR_READ_PARTITION_COLUMN, true); properties.validateLong(CONNECTOR_READ_PARTITION_LOWER_BOUND, true); properties.validateLong(CONNECTOR_READ_PARTITION_UPPER_BOUND, true); diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java index 74e90b2..fa8d98a 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java @@ -154,4 +154,34 @@ public class JdbcTableSourceITCase extends AbstractTestBase { "2020-01-01T15:36:01.123456,101.1234"); StreamITCase.compareWithList(expected); } + + @Test + public void testScanQueryJDBCSource() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + EnvironmentSettings envSettings = EnvironmentSettings.newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); + + final String testQuery = "SELECT id FROM " + INPUT_TABLE; + tEnv.sqlUpdate( + "CREATE TABLE test(" + + "id BIGINT" + + ") WITH (" + + " 'connector.type'='jdbc'," + + " 'connector.url'='" + DB_URL + "'," + + " 'connector.table'='whatever'," + + " 'connector.read.query'='" + testQuery + "'" + + ")" + ); + + StreamITCase.clear(); + tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), Row.class) + .addSink(new StreamITCase.StringSink<>()); + env.execute(); + + List<String> expected = Arrays.asList("1", "2"); + StreamITCase.compareWithList(expected); + } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java index 38cb29c..7f15565 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceSinkFactoryTest.java @@ -88,6 +88,7 @@ public class JdbcTableSourceSinkFactoryTest { @Test public void testJdbcReadProperties() { Map<String, String> properties = getBasicProperties(); + properties.put("connector.read.query", "SELECT aaa FROM mytable"); properties.put("connector.read.partition.column", "aaa"); properties.put("connector.read.partition.lower-bound", "-10"); properties.put("connector.read.partition.upper-bound", "100"); @@ -102,6 +103,7 @@ public class JdbcTableSourceSinkFactoryTest { .setTableName("mytable") .build(); final JdbcReadOptions readOptions = JdbcReadOptions.builder() + .setQuery("SELECT aaa FROM mytable") .setPartitionColumnName("aaa") .setPartitionLowerBound(-10) .setPartitionUpperBound(100)
