This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e28f66f6ef2ed03f9ee69148fe5079ae5e358c4 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Jan 6 14:52:38 2022 +0100 [FLINK-25391][connector-elasticsearch] Forward catalog table options --- .../content/docs/connectors/table/elasticsearch.md | 23 +++++++++- .../table/ElasticsearchDynamicSinkFactoryBase.java | 50 ++++++++++++++-------- .../table/Elasticsearch6DynamicSinkFactory.java | 21 +++++---- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md index 22f0b60..b5ae31d 100644 --- a/docs/content/docs/connectors/table/elasticsearch.md +++ b/docs/content/docs/connectors/table/elasticsearch.md @@ -67,15 +67,17 @@ Connector Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> <th class="text-center" style="width: 7%">Default</th> <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-center" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>connector</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what connector to use, valid values are: @@ -87,6 +89,7 @@ Connector Options <tr> <td><h5>hosts</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>One or more Elasticsearch hosts to connect to, e.g. <code>'http://host_name:9092;http://host_name:9093'</code>.</td> @@ -94,6 +97,7 @@ Connector Options <tr> <td><h5>index</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Elasticsearch index for every record. Can be a static index (e.g. <code>'myIndex'</code>) or @@ -103,6 +107,7 @@ Connector Options <tr> <td><h5>document-type</h5></td> <td>required in 6.x</td> + <td>yes in 6.x</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Elasticsearch document type. Not necessary anymore in <code>elasticsearch-7</code>.</td> @@ -110,6 +115,7 @@ Connector Options <tr> <td><h5>document-id.key-delimiter</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">_</td> <td>String</td> <td>Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3".</td> @@ -117,6 +123,7 @@ Connector Options <tr> <td><h5>username</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Username used to connect to Elasticsearch instance. Please notice that Elasticsearch doesn't pre-bundled security feature, but you can enable it by following the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/master/configuring-security.html">guideline</a> to secure an Elasticsearch cluster.</td> @@ -124,6 +131,7 @@ Connector Options <tr> <td><h5>password</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Password used to connect to Elasticsearch instance. If <code>username</code> is configured, this option must be configured with non-empty string as well.</td> @@ -131,6 +139,7 @@ Connector Options <tr> <td><h5>sink.delivery-guarantee</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">NONE</td> <td>String</td> <td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td> @@ -138,6 +147,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.max-actions</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">1000</td> <td>Integer</td> <td>Maximum number of buffered actions per bulk request. @@ -147,6 +157,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.max-size</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">2mb</td> <td>MemorySize</td> <td>Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. @@ -156,6 +167,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.interval</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">1s</td> <td>Duration</td> <td>The interval to flush buffered actions. @@ -166,6 +178,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.backoff.strategy</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">NONE</td> <td>String</td> <td>Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are: @@ -179,6 +192,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.backoff.max-retries</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>Maximum number of backoff retries.</td> @@ -186,6 +200,7 @@ Connector Options <tr> <td><h5>sink.bulk-flush.backoff.delay</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>Delay between each backoff attempt. For <code>CONSTANT</code> backoff, this is simply the delay between each retry. For <code>EXPONENTIAL</code> backoff, this is the initial base delay.</td> @@ -193,6 +208,7 @@ Connector Options <tr> <td><h5>sink.parallelism</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>Defines the parallelism of the Elasticsearch sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> @@ -200,6 +216,7 @@ Connector Options <tr> <td><h5>connection.path-prefix</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td> @@ -207,6 +224,7 @@ Connector Options <tr> <td><h5>connection.request-timeout</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>The timeout in milliseconds for requesting a connection from the connection manager. @@ -217,6 +235,7 @@ Connector Options <tr> <td><h5>connection.timeout</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>The timeout in milliseconds for establishing a connection. @@ -227,6 +246,7 @@ Connector Options <tr> <td><h5>socket.timeout</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put differently, @@ -238,6 +258,7 @@ Connector Options <tr> <td><h5>format</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">json</td> <td>String</td> <td>Elasticsearch connector supports to specify a format. The format must produce a valid json document. diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java index 677a8e3..af35888 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java @@ -21,7 +21,6 @@ package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -29,7 +28,6 @@ import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; @@ -83,7 +81,7 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa } @Nullable - String getDocumentType(Context context) { + String getDocumentType(ElasticsearchConfiguration configuration) { return null; // document type is only set in Elasticsearch versions < 7 } @@ -91,10 +89,14 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa public DynamicTableSink createDynamicTableSink(Context context) { List<LogicalTypeWithIndex> primaryKeyLogicalTypesWithIndex = getPrimaryKeyLogicalTypesWithIndex(context); + + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); EncodingFormat<SerializationSchema<RowData>> format = - getValidatedEncodingFormat(this, context); + helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION); - ElasticsearchConfiguration config = getConfiguration(context); + ElasticsearchConfiguration config = getConfiguration(helper); + helper.validate(); validateConfiguration(config); return new ElasticsearchDynamicSink( @@ -104,12 +106,11 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa context.getPhysicalRowDataType(), capitalize(factoryIdentifier), sinkBuilderSupplier, - getDocumentType(context)); + getDocumentType(config)); } - ElasticsearchConfiguration getConfiguration(Context context) { - return new ElasticsearchConfiguration( - Configuration.fromMap(context.getCatalogTable().getOptions())); + ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) { + return new ElasticsearchConfiguration(helper.getOptions()); } void validateConfiguration(ElasticsearchConfiguration config) { @@ -161,16 +162,6 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa } } - EncodingFormat<SerializationSchema<RowData>> getValidatedEncodingFormat( - DynamicTableFactory factory, DynamicTableFactory.Context context) { - final FactoryUtil.TableFactoryHelper helper = - FactoryUtil.createTableFactoryHelper(factory, context); - final EncodingFormat<SerializationSchema<RowData>> format = - helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION); - helper.validate(); - return format; - } - List<LogicalTypeWithIndex> getPrimaryKeyLogicalTypesWithIndex(Context context) { DataType physicalRowDataType = context.getPhysicalRowDataType(); int[] primaryKeyIndexes = context.getPrimaryKeyIndexes(); @@ -225,6 +216,27 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa } @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + HOSTS_OPTION, + INDEX_OPTION, + PASSWORD_OPTION, + USERNAME_OPTION, + KEY_DELIMITER_OPTION, + BULK_FLUSH_MAX_ACTIONS_OPTION, + BULK_FLUSH_MAX_SIZE_OPTION, + BULK_FLUSH_INTERVAL_OPTION, + BULK_FLUSH_BACKOFF_TYPE_OPTION, + BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, + BULK_FLUSH_BACKOFF_DELAY_OPTION, + CONNECTION_PATH_PREFIX_OPTION, + CONNECTION_REQUEST_TIMEOUT, + CONNECTION_TIMEOUT, + SOCKET_TIMEOUT) + .collect(Collectors.toSet()); + } + + @Override public String factoryIdentifier() { return factoryIdentifier; } diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java index 2bb2c8a..6957697 100644 --- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java +++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java @@ -20,14 +20,16 @@ package org.apache.flink.connector.elasticsearch.table; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.util.StringUtils; import javax.annotation.Nullable; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.connector.elasticsearch.table.Elasticsearch6ConnectorOptions.DOCUMENT_TYPE_OPTION; @@ -41,17 +43,14 @@ public class Elasticsearch6DynamicSinkFactory extends ElasticsearchDynamicSinkFa } @Override - ElasticsearchConfiguration getConfiguration(Context context) { - return new Elasticsearch6Configuration( - Configuration.fromMap(context.getCatalogTable().getOptions())); + ElasticsearchConfiguration getConfiguration(FactoryUtil.TableFactoryHelper helper) { + return new Elasticsearch6Configuration(helper.getOptions()); } @Nullable @Override - String getDocumentType(Context context) { - Elasticsearch6Configuration config = - (Elasticsearch6Configuration) getConfiguration(context); - return config.getDocumentType(); + String getDocumentType(ElasticsearchConfiguration configuration) { + return ((Elasticsearch6Configuration) configuration).getDocumentType(); } @Override @@ -69,4 +68,10 @@ public class Elasticsearch6DynamicSinkFactory extends ElasticsearchDynamicSinkFa requiredOptions.add(DOCUMENT_TYPE_OPTION); return requiredOptions; } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.concat(super.forwardOptions().stream(), Stream.of(DOCUMENT_TYPE_OPTION)) + .collect(Collectors.toSet()); + } }