This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c61162b30f4b5567ecc2ee29481fcc87e5016428
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Jan 6 15:01:39 2022 +0100

    [FLINK-25391][connector-jdbc] Forward catalog table options
---
 docs/content/docs/connectors/table/jdbc.md         | 24 +++++++++++++++++++++-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 23 +++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/docs/content/docs/connectors/table/jdbc.md 
b/docs/content/docs/connectors/table/jdbc.md
index b289831..81a179a 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -93,15 +93,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 8%">Forwarded</th>
         <th class="text-left" style="width: 7%">Default</th>
         <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 50%">Description</th>
+        <th class="text-left" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, here should be 
<code>'jdbc'</code>.</td>
@@ -109,6 +111,7 @@ Connector Options
     <tr>
       <td><h5>url</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC database url.</td>
@@ -116,6 +119,7 @@ Connector Options
     <tr>
       <td><h5>table-name</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of JDBC table to connect.</td>
@@ -123,6 +127,7 @@ Connector Options
     <tr>
       <td><h5>driver</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The class name of the JDBC driver to use to connect to this URL, if 
not set, it will automatically be derived from the URL.</td>
@@ -130,6 +135,7 @@ Connector Options
     <tr>
       <td><h5>username</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC user name. <code>'username'</code> and 
<code>'password'</code> must both be specified if any of them is specified.</td>
@@ -137,6 +143,7 @@ Connector Options
     <tr>
       <td><h5>password</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC password.</td>
@@ -144,6 +151,7 @@ Connector Options
     <tr>
       <td><h5>connection.max-retry-timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">60s</td>
       <td>Duration</td>
       <td>Maximum timeout between retries. The timeout should be in second 
granularity and shouldn't be smaller than 1 second.</td>
@@ -151,6 +159,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.column</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The column name used for partitioning the input. See the following 
<a href="#partitioned-scan">Partitioned Scan</a> section for more details.</td>
@@ -158,6 +167,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.num</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The number of partitions.</td>
@@ -165,6 +175,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.lower-bound</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The smallest value of the first partition.</td>
@@ -172,6 +183,7 @@ Connector Options
     <tr>
       <td><h5>scan.partition.upper-bound</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The largest value of the last partition.</td>
@@ -179,6 +191,7 @@ Connector Options
     <tr>
       <td><h5>scan.fetch-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">0</td>
       <td>Integer</td>
       <td>The number of rows that should be fetched from the database when 
reading per round trip. If the value specified is zero, then the hint is 
ignored.</td>
@@ -186,6 +199,7 @@ Connector Options
     <tr>
       <td><h5>scan.auto-commit</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>Sets the <a 
href="https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html#commit_transactions";>auto-commit</a>
 flag on the JDBC driver,
@@ -195,6 +209,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The max number of rows of lookup cache, over this value, the oldest 
rows will be expired.
@@ -203,6 +218,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.ttl</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The max time to live for each rows in lookup cache, over this time, 
the oldest rows will be expired.
@@ -211,6 +227,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.caching-missing-key</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">true</td>
       <td>Boolean</td>
       <td>Flag to cache missing key, true by default</td>
@@ -218,6 +235,7 @@ Connector Options
     <tr>
       <td><h5>lookup.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if lookup database failed.</td>
@@ -225,6 +243,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
       <td>The max size of buffered records before flush. Can be set to zero to 
disable it.</td>
@@ -232,6 +251,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>The flush interval mills, over this time, asynchronous threads will 
flush data. Can be set to <code>'0'</code> to disable it. Note, 
<code>'sink.buffer-flush.max-rows'</code> can be set to <code>'0'</code> with 
the flush interval set allowing for complete async processing of buffered 
actions.</td>
@@ -239,6 +259,7 @@ Connector Options
     <tr>
       <td><h5>sink.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if writing records to database failed.</td>
@@ -246,6 +267,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the JDBC sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator.</td>
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index dc5051d..89af619 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -42,6 +42,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.DRIVER;
 import static 
org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -216,6 +218,27 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         return optionalOptions;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        URL,
+                        TABLE_NAME,
+                        USERNAME,
+                        PASSWORD,
+                        DRIVER,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL,
+                        SINK_MAX_RETRIES,
+                        MAX_RETRY_TIMEOUT,
+                        SCAN_FETCH_SIZE,
+                        SCAN_AUTO_COMMIT,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES,
+                        LOOKUP_CACHE_MISSING_KEY)
+                .collect(Collectors.toSet());
+    }
+
     private void validateConfigOptions(ReadableConfig config) {
         String jdbcUrl = config.get(URL);
         JdbcDialectLoader.load(jdbcUrl);

Reply via email to