This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c61162b30f4b5567ecc2ee29481fcc87e5016428 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Jan 6 15:01:39 2022 +0100 [FLINK-25391][connector-jdbc] Forward catalog table options --- docs/content/docs/connectors/table/jdbc.md | 24 +++++++++++++++++++++- .../jdbc/table/JdbcDynamicTableFactory.java | 23 +++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md index b289831..81a179a 100644 --- a/docs/content/docs/connectors/table/jdbc.md +++ b/docs/content/docs/connectors/table/jdbc.md @@ -93,15 +93,17 @@ Connector Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-left" style="width: 8%">Required</th> + <th class="text-left" style="width: 8%">Forwarded</th> <th class="text-left" style="width: 7%">Default</th> <th class="text-left" style="width: 10%">Type</th> - <th class="text-left" style="width: 50%">Description</th> + <th class="text-left" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>connector</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what connector to use, here should be <code>'jdbc'</code>.</td> @@ -109,6 +111,7 @@ Connector Options <tr> <td><h5>url</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The JDBC database url.</td> @@ -116,6 +119,7 @@ Connector Options <tr> <td><h5>table-name</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The name of JDBC table to connect.</td> @@ -123,6 +127,7 @@ Connector Options <tr> <td><h5>driver</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL.</td> @@ -130,6 +135,7 @@ Connector Options <tr> <td><h5>username</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The JDBC user name. <code>'username'</code> and <code>'password'</code> must both be specified if any of them is specified.</td> @@ -137,6 +143,7 @@ Connector Options <tr> <td><h5>password</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The JDBC password.</td> @@ -144,6 +151,7 @@ Connector Options <tr> <td><h5>connection.max-retry-timeout</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">60s</td> <td>Duration</td> <td>Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.</td> @@ -151,6 +159,7 @@ Connector Options <tr> <td><h5>scan.partition.column</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The column name used for partitioning the input. See the following <a href="#partitioned-scan">Partitioned Scan</a> section for more details.</td> @@ -158,6 +167,7 @@ Connector Options <tr> <td><h5>scan.partition.num</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>The number of partitions.</td> @@ -165,6 +175,7 @@ Connector Options <tr> <td><h5>scan.partition.lower-bound</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>The smallest value of the first partition.</td> @@ -172,6 +183,7 @@ Connector Options <tr> <td><h5>scan.partition.upper-bound</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>The largest value of the last partition.</td> @@ -179,6 +191,7 @@ Connector Options <tr> <td><h5>scan.fetch-size</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">0</td> <td>Integer</td> <td>The number of rows that should be fetched from the database when reading per round trip. If the value specified is zero, then the hint is ignored.</td> @@ -186,6 +199,7 @@ Connector Options <tr> <td><h5>scan.auto-commit</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td>Sets the <a href="https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html#commit_transactions">auto-commit</a> flag on the JDBC driver, @@ -195,6 +209,7 @@ Connector Options <tr> <td><h5>lookup.cache.max-rows</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>The max number of rows of lookup cache, over this value, the oldest rows will be expired. @@ -203,6 +218,7 @@ Connector Options <tr> <td><h5>lookup.cache.ttl</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. @@ -211,6 +227,7 @@ Connector Options <tr> <td><h5>lookup.cache.caching-missing-key</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">true</td> <td>Boolean</td> <td>Flag to cache missing key, true by default</td> @@ -218,6 +235,7 @@ Connector Options <tr> <td><h5>lookup.max-retries</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">3</td> <td>Integer</td> <td>The max retry times if lookup database failed.</td> @@ -225,6 +243,7 @@ Connector Options <tr> <td><h5>sink.buffer-flush.max-rows</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">100</td> <td>Integer</td> <td>The max size of buffered records before flush. Can be set to zero to disable it.</td> @@ -232,6 +251,7 @@ Connector Options <tr> <td><h5>sink.buffer-flush.interval</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">1s</td> <td>Duration</td> <td>The flush interval mills, over this time, asynchronous threads will flush data. Can be set to <code>'0'</code> to disable it. Note, <code>'sink.buffer-flush.max-rows'</code> can be set to <code>'0'</code> with the flush interval set allowing for complete async processing of buffered actions.</td> @@ -239,6 +259,7 @@ Connector Options <tr> <td><h5>sink.max-retries</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">3</td> <td>Integer</td> <td>The max retry times if writing records to database failed.</td> @@ -246,6 +267,7 @@ Connector Options <tr> <td><h5>sink.parallelism</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>Defines the parallelism of the JDBC sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java index dc5051d..89af619 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java @@ -42,6 +42,8 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER; import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS; @@ -216,6 +218,27 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam return optionalOptions; } + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + URL, + TABLE_NAME, + USERNAME, + PASSWORD, + DRIVER, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL, + SINK_MAX_RETRIES, + MAX_RETRY_TIMEOUT, + SCAN_FETCH_SIZE, + SCAN_AUTO_COMMIT, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES, + LOOKUP_CACHE_MISSING_KEY) + .collect(Collectors.toSet()); + } + private void validateConfigOptions(ReadableConfig config) { String jdbcUrl = config.get(URL); JdbcDialectLoader.load(jdbcUrl);