This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ccd2d531d1cb577113d5021efd6277031eeef9d1
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri May 15 20:06:46 2020 +0200

    [FLINK-17027] Introduce a new Elasticsearch 6 connector with new property 
keys
    
    This closes #12184
---
 .../flink-connector-elasticsearch6/pom.xml         |  22 ++
 .../table/Elasticsearch6Configuration.java         |  80 +++++++
 .../table/Elasticsearch6DynamicSink.java           | 251 ++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkFactory.java    | 155 ++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../Elasticsearch6DynamicSinkFactoryTest.java      | 207 ++++++++++++++++
 .../table/Elasticsearch6DynamicSinkITCase.java     | 262 +++++++++++++++++++++
 .../table/Elasticsearch6DynamicSinkTest.java       | 195 +++++++++++++++
 8 files changed, 1188 insertions(+)

diff --git a/flink-connectors/flink-connector-elasticsearch6/pom.xml 
b/flink-connectors/flink-connector-elasticsearch6/pom.xml
index 25a9f5a..272f9af 100644
--- a/flink-connectors/flink-connector-elasticsearch6/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch6/pom.xml
@@ -145,6 +145,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Table API integration tests -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- Elasticsearch table sink factory testing -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -154,4 +162,18 @@ under the License.
                </dependency>
 
        </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution 
because of spawning
+                                        Elasticsearch cluster multiple times 
-->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
new file mode 100644
index 0000000..c06898e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6Configuration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch6Configuration extends ElasticsearchConfiguration {
+       Elasticsearch6Configuration(ReadableConfig config, ClassLoader 
classLoader) {
+               super(config, classLoader);
+       }
+
+       public List<HttpHost> getHosts() {
+               return config.get(HOSTS_OPTION).stream()
+                       
.map(Elasticsearch6Configuration::validateAndParseHostsString)
+                       .collect(Collectors.toList());
+       }
+
+       /**
+        * Parse Hosts String to list.
+        *
+        * <p>Hosts String format was given as following:
+        *
+        * <pre>
+        *     connector.hosts = http://host_name:9092;http://host_name:9093
+        * </pre>
+        */
+       private static HttpHost validateAndParseHostsString(String host) {
+               try {
+                       HttpHost httpHost = HttpHost.create(host);
+                       if (httpHost.getPort() < 0) {
+                               throw new ValidationException(String.format(
+                                       "Could not parse host '%s' in option 
'%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                       host,
+                                       HOSTS_OPTION.key()));
+                       }
+
+                       if (httpHost.getSchemeName() == null) {
+                               throw new ValidationException(String.format(
+                                       "Could not parse host '%s' in option 
'%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                       host,
+                                       HOSTS_OPTION.key()));
+                       }
+                       return httpHost;
+               } catch (Exception e) {
+                       throw new ValidationException(String.format(
+                               "Could not parse host '%s' in option '%s'. It 
should follow the format 'http://host_name:port'.",
+                               host,
+                               HOSTS_OPTION.key()), e);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
new file mode 100644
index 0000000..eadf659
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link 
ElasticsearchSink} from a logical
+ * description.
+ */
+@PublicEvolving
+final class Elasticsearch6DynamicSink implements DynamicTableSink {
+       @VisibleForTesting
+       static final Elasticsearch7RequestFactory REQUEST_FACTORY = new 
Elasticsearch7RequestFactory();
+
+       private final SinkFormat<SerializationSchema<RowData>> format;
+       private final TableSchema schema;
+       private final Elasticsearch6Configuration config;
+
+       public Elasticsearch6DynamicSink(
+                       SinkFormat<SerializationSchema<RowData>> format,
+                       Elasticsearch6Configuration config,
+                       TableSchema schema) {
+               this(format, config, schema, (ElasticsearchSink.Builder::new));
+       }
+
+       //--------------------------------------------------------------
+       // Hack to make configuration testing possible.
+       //
+       // The code in this block should never be used outside of tests.
+       // Having a way to inject a builder we can assert the builder in
+       // the test. We can not assert everything though, e.g. it is not
+       // possible to assert flushing on checkpoint, as it is configured
+       // on the sink itself.
+       //--------------------------------------------------------------
+
+       private final ElasticSearchBuilderProvider builderProvider;
+
+       @FunctionalInterface
+       interface ElasticSearchBuilderProvider {
+               ElasticsearchSink.Builder<RowData> createBuilder(
+                       List<HttpHost> httpHosts,
+                       RowElasticsearchSinkFunction upsertSinkFunction);
+       }
+
+       Elasticsearch6DynamicSink(
+                       SinkFormat<SerializationSchema<RowData>> format,
+                       Elasticsearch6Configuration config,
+                       TableSchema schema,
+                       ElasticSearchBuilderProvider builderProvider) {
+               this.format = format;
+               this.schema = schema;
+               this.config = config;
+               this.builderProvider = builderProvider;
+       }
+
+       //--------------------------------------------------------------
+       // End of hack to make configuration testing possible
+       //--------------------------------------------------------------
+
+       @Override
+       public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+               ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+               for (RowKind kind : requestedMode.getContainedKinds()) {
+                       if (kind != RowKind.UPDATE_BEFORE) {
+                               builder.addContainedKind(kind);
+                       }
+               }
+               return builder.build();
+       }
+
+       @Override
+       public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+               return () -> {
+                       SerializationSchema<RowData> format = 
this.format.createSinkFormat(context, schema.toRowDataType());
+
+                       final RowElasticsearchSinkFunction upsertFunction =
+                               new RowElasticsearchSinkFunction(
+                                       
IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+                                       config.getDocumentType(),
+                                       format,
+                                       XContentType.JSON,
+                                       REQUEST_FACTORY,
+                                       KeyExtractor.createKeyExtractor(schema, 
config.getKeyDelimiter())
+                               );
+
+                       final ElasticsearchSink.Builder<RowData> builder = 
builderProvider.createBuilder(
+                               config.getHosts(),
+                               upsertFunction);
+
+                       builder.setFailureHandler(config.getFailureHandler());
+                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
+                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
+                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+                       
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+                       
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+                       
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+                       config.getPathPrefix()
+                               .ifPresent(pathPrefix -> 
builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+
+                       final ElasticsearchSink<RowData> sink = builder.build();
+
+                       if (config.isDisableFlushOnCheckpoint()) {
+                               sink.disableFlushOnCheckpoint();
+                       }
+
+                       return sink;
+               };
+       }
+
+       @Override
+       public DynamicTableSink copy() {
+               return this;
+       }
+
+       @Override
+       public String asSummaryString() {
+               return "Elasticsearch7";
+       }
+
+       /**
+        * Serializable {@link RestClientFactory} used by the sink.
+        */
+       @VisibleForTesting
+       static class DefaultRestClientFactory implements RestClientFactory {
+
+               private final String pathPrefix;
+
+               public DefaultRestClientFactory(@Nullable String pathPrefix) {
+                       this.pathPrefix = pathPrefix;
+               }
+
+               @Override
+               public void configureRestClientBuilder(RestClientBuilder 
restClientBuilder) {
+                       if (pathPrefix != null) {
+                               restClientBuilder.setPathPrefix(pathPrefix);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       DefaultRestClientFactory that = 
(DefaultRestClientFactory) o;
+                       return Objects.equals(pathPrefix, that.pathPrefix);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(pathPrefix);
+               }
+       }
+
+       /**
+        * Version-specific creation of {@link 
org.elasticsearch.action.ActionRequest}s used by the sink.
+        */
+       private static class Elasticsearch7RequestFactory implements 
RequestFactory {
+               @Override
+               public UpdateRequest createUpdateRequest(
+                       String index,
+                       String docType,
+                       String key,
+                       XContentType contentType,
+                       byte[] document) {
+                       return new UpdateRequest(index, docType, key)
+                               .doc(document, contentType)
+                               .upsert(document, contentType);
+               }
+
+               @Override
+               public IndexRequest createIndexRequest(
+                               String index,
+                               String docType,
+                               String key,
+                               XContentType contentType,
+                               byte[] document) {
+                       return new IndexRequest(index, docType, index)
+                               .source(document, contentType);
+               }
+
+               @Override
+               public DeleteRequest createDeleteRequest(String index, String 
docType, String key) {
+                       return new DeleteRequest(index, docType, key);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               Elasticsearch6DynamicSink that = (Elasticsearch6DynamicSink) o;
+               return Objects.equals(format, that.format) &&
+                       Objects.equals(schema, that.schema) &&
+                       Objects.equals(config, that.config) &&
+                       Objects.equals(builderProvider, that.builderProvider);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(format, schema, config, builderProvider);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
new file mode 100644
index 0000000..65c90b5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.DOCUMENT_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link 
Elasticsearch6DynamicSink}.
+ */
+@Internal
+public class Elasticsearch6DynamicSinkFactory implements 
DynamicTableSinkFactory {
+       private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+               HOSTS_OPTION,
+               INDEX_OPTION,
+               DOCUMENT_TYPE_OPTION
+       ).collect(Collectors.toSet());
+       private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+               KEY_DELIMITER_OPTION,
+               FAILURE_HANDLER_OPTION,
+               FLUSH_ON_CHECKPOINT_OPTION,
+               BULK_FLASH_MAX_SIZE_OPTION,
+               BULK_FLUSH_MAX_ACTIONS_OPTION,
+               BULK_FLUSH_INTERVAL_OPTION,
+               BULK_FLUSH_BACKOFF_TYPE_OPTION,
+               BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+               BULK_FLUSH_BACKOFF_DELAY_OPTION,
+               CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+               CONNECTION_PATH_PREFIX,
+               FORMAT_OPTION
+       ).collect(Collectors.toSet());
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               TableSchema tableSchema = context.getCatalogTable().getSchema();
+               ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               final SinkFormat<SerializationSchema<RowData>> format = 
helper.discoverSinkFormat(
+                       SerializationFormatFactory.class,
+                       FORMAT_OPTION);
+
+               helper.validate();
+               Configuration configuration = new Configuration();
+               context.getCatalogTable()
+                       .getOptions()
+                       .forEach(configuration::setString);
+               Elasticsearch6Configuration config = new 
Elasticsearch6Configuration(configuration, context.getClassLoader());
+
+               validate(config, configuration);
+
+               return new Elasticsearch6DynamicSink(
+                       format,
+                       config,
+                       TableSchemaUtils.getPhysicalSchema(tableSchema));
+       }
+
+       private void validate(Elasticsearch6Configuration config, Configuration 
originalConfiguration) {
+               config.getFailureHandler(); // checks if we can instantiate the 
custom failure handler
+               config.getHosts(); // validate hosts
+               validate(
+                       config.getIndex().length() >= 1,
+                       () -> String.format("'%s' must not be empty", 
INDEX_OPTION.key()));
+               validate(
+                       config.getBulkFlushMaxActions().map(maxActions -> 
maxActions >= 1).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1 character. Got: %s",
+                               BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+                               config.getBulkFlushMaxActions().get())
+               );
+               validate(
+                       config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 
1024 * 1024).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1mb character. Got: %s",
+                               BULK_FLASH_MAX_SIZE_OPTION.key(),
+                               
originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
+               );
+               validate(
+                       config.getBulkFlushBackoffRetries().map(retries -> 
retries >= 1).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1. Got: %s",
+                               BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                               config.getBulkFlushBackoffRetries().get())
+               );
+       }
+
+       private static void validate(boolean condition, Supplier<String> 
message) {
+               if (!condition) {
+                       throw new ValidationException(message.get());
+               }
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return "elasticsearch-6";
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return requiredOptions;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               return optionalOptions;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..29a8593
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..f1be1b2
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactoryTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/**
+ * Tests for validation in {@link Elasticsearch6DynamicSinkFactory}.
+ */
+public class Elasticsearch6DynamicSinkFactoryTest {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void validateEmptyConfiguration() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "One or more required options are missing.\n" +
+                               "\n" +
+                               "Missing required options are:\n" +
+                               "\n" +
+                               "document-type\n" +
+                               "hosts\n" +
+                               "index");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongIndex() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'index' must not be empty");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .withOption("index", "")
+                               .withOption("document-type", "MyType")
+                               .withOption("hosts", "http://localhost:12345";)
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongHosts() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Could not parse host 'wrong-host' in option 'hosts'. 
It should follow the format 'http://host_name:port'.");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .withOption("index", "MyIndex")
+                               .withOption("document-type", "MyType")
+                               .withOption("hosts", "wrong-host")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongFlushSize() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.max-size' must be at least 1mb 
character. Got: 1024 bytes");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongRetries() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.back-off.max-retries' must be at 
least 1. Got: 0");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), 
"0")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongMaxActions() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.max-actions' must be at least 1 
character. Got: 0");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongBackoffDelay() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Invalid value for option 
'sink.bulk-flush.back-off.delay'.");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), "MyType")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validatePrimaryKeyOnIllegalColumn() {
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "The table has a primary key on columns of illegal 
types: " +
+                               "[ARRAY, MAP, MULTISET, ROW, RAW, 
VARBINARY].\n" +
+                               " Elasticsearch sink does not support primary 
keys on columns of types: " +
+                               "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, 
RAW, BINARY, VARBINARY].");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", 
DataTypes.BIGINT().notNull())
+                                       .field("b", 
DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull())
+                                       .field("c", 
DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull())
+                                       .field("d", 
DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull())
+                                       .field("e", 
DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull())
+                                       .field("f", 
DataTypes.RAW(Types.BIG_INT).notNull())
+                                       .field("g", DataTypes.BYTES().notNull())
+                                       .primaryKey("a", "b", "c", "d", "e", 
"f", "g")
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s")
+                               .build()
+               );
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
new file mode 100644
index 0000000..3c09653
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkITCase.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHits;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT tests for {@link Elasticsearch6DynamicSink}.
+ */
+public class Elasticsearch6DynamicSinkITCase {
+
+       @ClassRule
+       public static ElasticsearchResource elasticsearchResource = new 
ElasticsearchResource("es-6-dynamic-sink-tests");
+
+       @Test
+       public void testWritingDocuments() throws Exception {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT().notNull())
+                       .field("b", DataTypes.TIME())
+                       .field("c", DataTypes.STRING().notNull())
+                       .field("d", DataTypes.FLOAT())
+                       .field("e", DataTypes.TINYINT().notNull())
+                       .field("f", DataTypes.DATE())
+                       .field("g", DataTypes.TIMESTAMP().notNull())
+                       .primaryKey("a", "g")
+                       .build();
+               GenericRowData rowData = GenericRowData.of(
+                       1L,
+                       12345,
+                       StringData.fromString("ABCDE"),
+                       12.12f,
+                       (byte) 2,
+                       12345,
+                       
TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")));
+
+               String index = "writing-documents";
+               String myType = "MyType";
+               Elasticsearch6DynamicSinkFactory sinkFactory = new 
Elasticsearch6DynamicSinkFactory();
+
+               SinkFunctionProvider sinkRuntimeProvider = 
(SinkFunctionProvider) sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(schema)
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), index)
+                               
.withOption(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType)
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";)
+                               
.withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                               .build()
+               ).getSinkRuntimeProvider(new MockContext());
+
+               SinkFunction<RowData> sinkFunction = 
sinkRuntimeProvider.createSinkFunction();
+               StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               rowData.setRowKind(RowKind.UPDATE_AFTER);
+               
environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+               environment.execute();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new GetRequest(index, 
myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(response, equalTo(expectedMap));
+       }
+
+       @Test
+       public void testWritingDocumentsFromTableApi() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "table-api";
+               String myType = "MyType";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIME,\n" +
+                       "c STRING NOT NULL,\n" +
+                       "d FLOAT,\n" +
+                       "e TINYINT NOT NULL,\n" +
+                       "f DATE,\n" +
+                       "g TIMESTAMP NOT NULL,\n" +
+                       "h as a + 2,\n" +
+                       "PRIMARY KEY (a, g) NOT ENFORCED\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-6") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(
+                       row(
+                               1L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "ABCDE",
+                               12.12f,
+                               (byte) 2,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2012-12-12T12:12:12"))
+               ).executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new GetRequest(index, 
myType, "1_2012-12-12T12:12:12"))
+                       .actionGet()
+                       .getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(response, equalTo(expectedMap));
+       }
+
+       @Test
+       public void testWritingDocumentsNoPrimaryKey() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "no-primary-key";
+               String myType = "MyType";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIME,\n" +
+                       "c STRING NOT NULL,\n" +
+                       "d FLOAT,\n" +
+                       "e TINYINT NOT NULL,\n" +
+                       "f DATE,\n" +
+                       "g TIMESTAMP NOT NULL\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-6") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), myType) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(
+                       row(
+                               1L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "ABCDE",
+                               12.12f,
+                               (byte) 2,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2012-12-12T12:12:12"))
+               ).executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+
+               // search API does not return documents that were not indexed, 
we might need to query
+               // the index a few times
+               Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+               SearchHits hits;
+               do {
+                       hits = client.prepareSearch(index)
+                               .execute()
+                               .actionGet()
+                               .getHits();
+                       if (hits.getTotalHits() == 0) {
+                               Thread.sleep(100);
+                       }
+               } while (hits.getTotalHits() == 0 && deadline.hasTimeLeft());
+
+               Map<String, Object> result = hits.getAt(0).getSourceAsMap();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(result, equalTo(expectedMap));
+       }
+
+       private static class MockContext implements DynamicTableSink.Context {
+               @Override
+               public boolean isBounded() {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<?> createTypeInformation(DataType 
consumedDataType) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(DataType consumedDataType) {
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
new file mode 100644
index 0000000..df54147
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link Elasticsearch6DynamicSink} parameters.
+ */
+public class Elasticsearch6DynamicSinkTest {
+
+       private static final String FIELD_KEY = "key";
+       private static final String FIELD_FRUIT_NAME = "fruit_name";
+       private static final String FIELD_COUNT = "count";
+       private static final String FIELD_TS = "ts";
+
+       private static final String HOSTNAME = "host1";
+       private static final int PORT = 1234;
+       private static final String SCHEMA = "https";
+       private static final String INDEX = "MyIndex";
+       private static final String DOC_TYPE = "MyType";
+
+       @Test
+       public void testBuilder() {
+               final TableSchema schema = createTestSchema();
+
+               BuilderProvider provider = new BuilderProvider();
+               final Elasticsearch6DynamicSink testSink = new 
Elasticsearch6DynamicSink(
+                       new DummySinkFormat(),
+                       new Elasticsearch6Configuration(getConfig(), 
this.getClass().getClassLoader()),
+                       schema,
+                       provider
+               );
+
+               testSink.getSinkRuntimeProvider(new 
MockSinkContext()).createSinkFunction();
+
+               verify(provider.builderSpy).setFailureHandler(new 
DummyFailureHandler());
+               verify(provider.builderSpy).setBulkFlushBackoff(true);
+               
verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+               verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+               verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+               verify(provider.builderSpy).setBulkFlushInterval(100);
+               verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+               verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+               verify(provider.builderSpy).setRestClientFactory(new 
Elasticsearch6DynamicSink.DefaultRestClientFactory("/myapp"));
+               verify(provider.sinkSpy).disableFlushOnCheckpoint();
+       }
+
+       private Configuration getConfig() {
+               Configuration configuration = new Configuration();
+               
configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+               
configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), 
DOC_TYPE);
+               
configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" 
+ HOSTNAME + ":" + PORT);
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(),
 "exponential");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
 "123");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
 "3");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), 
"100");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
 "1000");
+               
configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), 
"1mb");
+               
configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), 
"/myapp");
+               
configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), 
DummyFailureHandler.class.getName());
+               
configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), 
"false");
+               return configuration;
+       }
+
+       private static class BuilderProvider implements 
Elasticsearch6DynamicSink.ElasticSearchBuilderProvider {
+               public ElasticsearchSink.Builder<RowData> builderSpy;
+               public ElasticsearchSink<RowData> sinkSpy;
+
+               @Override
+               public ElasticsearchSink.Builder<RowData> createBuilder(
+                               List<HttpHost> httpHosts,
+                               RowElasticsearchSinkFunction 
upsertSinkFunction) {
+                       builderSpy = Mockito.spy(new 
ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+                       doAnswer(
+                               invocation -> {
+                                       sinkSpy = 
Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod());
+                                       return sinkSpy;
+                               }
+                       ).when(builderSpy).build();
+
+                       return builderSpy;
+               }
+       }
+
+       private TableSchema createTestSchema() {
+               return TableSchema.builder()
+                       .field(FIELD_KEY, DataTypes.BIGINT())
+                       .field(FIELD_FRUIT_NAME, DataTypes.STRING())
+                       .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+                       .field(FIELD_TS, DataTypes.TIMESTAMP(3))
+                       .build();
+       }
+
+       private static class DummySerializationSchema implements 
SerializationSchema<RowData> {
+
+               private static final DummySerializationSchema INSTANCE = new 
DummySerializationSchema();
+
+               @Override
+               public byte[] serialize(RowData element) {
+                       return new byte[0];
+               }
+       }
+
+       private static class DummySinkFormat implements 
SinkFormat<SerializationSchema<RowData>> {
+               @Override
+               public SerializationSchema<RowData> createSinkFormat(
+                               DynamicTableSink.Context context,
+                               DataType consumedDataType) {
+                       return DummySerializationSchema.INSTANCE;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode() {
+                       return null;
+               }
+       }
+
+       private static class MockSinkContext implements 
DynamicTableSink.Context {
+               @Override
+               public boolean isBounded() {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<?> createTypeInformation(DataType 
consumedDataType) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(DataType consumedDataType) {
+                       return null;
+               }
+       }
+
+       /**
+        * Custom failure handler for testing.
+        */
+       public static class DummyFailureHandler implements 
ActionRequestFailureHandler {
+
+               @Override
+               public void onFailure(ActionRequest action, Throwable failure, 
int restStatusCode, RequestIndexer indexer) {
+                       // do nothing
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o instanceof DummyFailureHandler;
+               }
+
+               @Override
+               public int hashCode() {
+                       return DummyFailureHandler.class.hashCode();
+               }
+       }
+}

Reply via email to