This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e28f66f6ef2ed03f9ee69148fe5079ae5e358c4
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Jan 6 14:52:38 2022 +0100

    [FLINK-25391][connector-elasticsearch] Forward catalog table options
---
 .../content/docs/connectors/table/elasticsearch.md | 23 +++++++++-
 .../table/ElasticsearchDynamicSinkFactoryBase.java | 50 ++++++++++++++--------
 .../table/Elasticsearch6DynamicSinkFactory.java    | 21 +++++----
 3 files changed, 66 insertions(+), 28 deletions(-)

diff --git a/docs/content/docs/connectors/table/elasticsearch.md 
b/docs/content/docs/connectors/table/elasticsearch.md
index 22f0b60..b5ae31d 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -67,15 +67,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, valid values are:
@@ -87,6 +89,7 @@ Connector Options
     <tr>
       <td><h5>hosts</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>One or more Elasticsearch hosts to connect to, e.g. 
<code>'http://host_name:9092;http://host_name:9093'</code>.</td>
@@ -94,6 +97,7 @@ Connector Options
     <tr>
       <td><h5>index</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Elasticsearch index for every record. Can be a static index (e.g. 
<code>'myIndex'</code>) or
@@ -103,6 +107,7 @@ Connector Options
     <tr>
       <td><h5>document-type</h5></td>
       <td>required in 6.x</td>
+      <td>yes in 6.x</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Elasticsearch document type. Not necessary anymore in 
<code>elasticsearch-7</code>.</td>
@@ -110,6 +115,7 @@ Connector Options
     <tr>
       <td><h5>document-id.key-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">_</td>
       <td>String</td>
       <td>Delimiter for composite keys ("_" by default), e.g., "$" would 
result in IDs "KEY1$KEY2$KEY3".</td>
@@ -117,6 +123,7 @@ Connector Options
     <tr>
       <td><h5>username</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Username used to connect to Elasticsearch instance. Please notice 
that Elasticsearch doesn't pre-bundled security feature, but you can enable it 
by following the <a 
href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html";>guideline</a>
 to secure an Elasticsearch cluster.</td>
@@ -124,6 +131,7 @@ Connector Options
     <tr>
       <td><h5>password</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Password used to connect to Elasticsearch instance. If 
<code>username</code> is configured, this option must be configured with 
non-empty string as well.</td>
@@ -131,6 +139,7 @@ Connector Options
     <tr>
       <td><h5>sink.delivery-guarantee</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
       <td>Optional delivery guarantee when committing. Valid values are 
<code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
@@ -138,6 +147,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.max-actions</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Integer</td>
       <td>Maximum number of buffered actions per bulk request.
@@ -147,6 +157,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.max-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">2mb</td>
       <td>MemorySize</td>
       <td>Maximum size in memory of buffered actions per bulk request. Must be 
in MB granularity.
@@ -156,6 +167,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>The interval to flush buffered actions.
@@ -166,6 +178,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">NONE</td>
       <td>String</td>
       <td>Specify how to perform retries if any flush actions failed due to a 
temporary request error. Valid strategies are:
@@ -179,6 +192,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Maximum number of backoff retries.</td>
@@ -186,6 +200,7 @@ Connector Options
     <tr>
       <td><h5>sink.bulk-flush.backoff.delay</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>Delay between each backoff attempt. For <code>CONSTANT</code> 
backoff, this is simply the delay between each retry. For 
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
@@ -193,6 +208,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the Elasticsearch sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
@@ -200,6 +216,7 @@ Connector Options
     <tr>
       <td><h5>connection.path-prefix</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Prefix string to be added to every REST communication, e.g., 
<code>'/v1'</code>.</td>
@@ -207,6 +224,7 @@ Connector Options
     <tr>
       <td><h5>connection.request-timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The timeout in milliseconds for requesting a connection from the 
connection manager.
@@ -217,6 +235,7 @@ Connector Options
     <tr>
       <td><h5>connection.timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The timeout in milliseconds for establishing a connection.
@@ -227,6 +246,7 @@ Connector Options
     <tr>
       <td><h5>socket.timeout</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Duration</td>
       <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put 
differently,
@@ -238,6 +258,7 @@ Connector Options
     <tr>
       <td><h5>format</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">json</td>
       <td>String</td>
       <td>Elasticsearch connector supports to specify a format. The format 
must produce a valid json document.
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
index 677a8e3..af35888 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.elasticsearch.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -29,7 +28,6 @@ import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
@@ -83,7 +81,7 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements 
DynamicTableSinkFa
     }
 
     @Nullable
-    String getDocumentType(Context context) {
+    String getDocumentType(ElasticsearchConfiguration configuration) {
         return null; // document type is only set in Elasticsearch versions < 7
     }
 
@@ -91,10 +89,14 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
     public DynamicTableSink createDynamicTableSink(Context context) {
         List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex =
                 getPrimaryKeyLogicalTypesWithIndex(context);
+
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
         EncodingFormat<SerializationSchema<RowData>> format =
-                getValidatedEncodingFormat(this, context);
+                
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
 
-        ElasticsearchConfiguration config = getConfiguration(context);
+        ElasticsearchConfiguration config = getConfiguration(helper);
+        helper.validate();
         validateConfiguration(config);
 
         return new ElasticsearchDynamicSink(
@@ -104,12 +106,11 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
                 context.getPhysicalRowDataType(),
                 capitalize(factoryIdentifier),
                 sinkBuilderSupplier,
-                getDocumentType(context));
+                getDocumentType(config));
     }
 
-    ElasticsearchConfiguration getConfiguration(Context context) {
-        return new ElasticsearchConfiguration(
-                Configuration.fromMap(context.getCatalogTable().getOptions()));
+    ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper 
helper) {
+        return new ElasticsearchConfiguration(helper.getOptions());
     }
 
     void validateConfiguration(ElasticsearchConfiguration config) {
@@ -161,16 +162,6 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
         }
     }
 
-    EncodingFormat<SerializationSchema<RowData>> getValidatedEncodingFormat(
-            DynamicTableFactory factory, DynamicTableFactory.Context context) {
-        final FactoryUtil.TableFactoryHelper helper =
-                FactoryUtil.createTableFactoryHelper(factory, context);
-        final EncodingFormat<SerializationSchema<RowData>> format =
-                
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
-        helper.validate();
-        return format;
-    }
-
     List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context 
context) {
         DataType physicalRowDataType = context.getPhysicalRowDataType();
         int[] primaryKeyIndexes = context.getPrimaryKeyIndexes();
@@ -225,6 +216,27 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
     }
 
     @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        HOSTS_OPTION,
+                        INDEX_OPTION,
+                        PASSWORD_OPTION,
+                        USERNAME_OPTION,
+                        KEY_DELIMITER_OPTION,
+                        BULK_FLUSH_MAX_ACTIONS_OPTION,
+                        BULK_FLUSH_MAX_SIZE_OPTION,
+                        BULK_FLUSH_INTERVAL_OPTION,
+                        BULK_FLUSH_BACKOFF_TYPE_OPTION,
+                        BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+                        BULK_FLUSH_BACKOFF_DELAY_OPTION,
+                        CONNECTION_PATH_PREFIX_OPTION,
+                        CONNECTION_REQUEST_TIMEOUT,
+                        CONNECTION_TIMEOUT,
+                        SOCKET_TIMEOUT)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
     public String factoryIdentifier() {
         return factoryIdentifier;
     }
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index 2bb2c8a..6957697 100644
--- 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -20,14 +20,16 @@ package org.apache.flink.connector.elasticsearch.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.elasticsearch.table.Elasticsearch6ConnectorOptions.DOCUMENT_TYPE_OPTION;
 
@@ -41,17 +43,14 @@ public class Elasticsearch6DynamicSinkFactory extends 
ElasticsearchDynamicSinkFa
     }
 
     @Override
-    ElasticsearchConfiguration getConfiguration(Context context) {
-        return new Elasticsearch6Configuration(
-                Configuration.fromMap(context.getCatalogTable().getOptions()));
+    ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper 
helper) {
+        return new Elasticsearch6Configuration(helper.getOptions());
     }
 
     @Nullable
     @Override
-    String getDocumentType(Context context) {
-        Elasticsearch6Configuration config =
-                (Elasticsearch6Configuration) getConfiguration(context);
-        return config.getDocumentType();
+    String getDocumentType(ElasticsearchConfiguration configuration) {
+        return ((Elasticsearch6Configuration) configuration).getDocumentType();
     }
 
     @Override
@@ -69,4 +68,10 @@ public class Elasticsearch6DynamicSinkFactory extends 
ElasticsearchDynamicSinkFa
         requiredOptions.add(DOCUMENT_TYPE_OPTION);
         return requiredOptions;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.concat(super.forwardOptions().stream(), 
Stream.of(DOCUMENT_TYPE_OPTION))
+                .collect(Collectors.toSet());
+    }
 }

Reply via email to