This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0ce2af04963b51397d9b0e210475e69e36d60f36
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri May 15 20:06:40 2020 +0200

    [FLINK-17027] Introduce a new Elasticsearch 7 connector with new property 
keys
---
 .../table/AbstractTimeIndexGenerator.java          |  43 ++++
 .../table/ElasticsearchConfiguration.java          | 152 ++++++++++++
 .../elasticsearch/table/ElasticsearchOptions.java  | 134 ++++++++++
 .../table/ElasticsearchValidationUtils.java        |  93 +++++++
 .../elasticsearch/table/IndexGenerator.java        |  42 ++++
 .../elasticsearch/table/IndexGeneratorBase.java    |  54 ++++
 .../elasticsearch/table/IndexGeneratorFactory.java | 276 +++++++++++++++++++++
 .../elasticsearch/table/KeyExtractor.java          | 131 ++++++++++
 .../elasticsearch/table/RequestFactory.java        |  67 +++++
 .../table/RowElasticsearchSinkFunction.java        | 142 +++++++++++
 .../elasticsearch/table/StaticIndexGenerator.java  |  37 +++
 .../elasticsearch/ElasticsearchSinkTestBase.java   |  42 +---
 .../table/IndexGeneratorFactoryTest.java           | 213 ++++++++++++++++
 .../elasticsearch/table/KeyExtractorTest.java      | 130 ++++++++++
 .../elasticsearch/table/TestContext.java           |  79 ++++++
 .../testutils/ElasticsearchResource.java           |  78 ++++++
 .../flink-connector-elasticsearch7/pom.xml         |  22 ++
 .../table/Elasticsearch7Configuration.java         |  71 ++++++
 .../table/Elasticsearch7DynamicSink.java           | 252 +++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkFactory.java    | 154 ++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../Elasticsearch7DynamicSinkFactoryTest.java      | 200 +++++++++++++++
 .../table/Elasticsearch7DynamicSinkITCase.java     | 254 +++++++++++++++++++
 .../table/Elasticsearch7DynamicSinkTest.java       | 195 +++++++++++++++
 .../apache/flink/table/utils/TableSchemaUtils.java |   3 +
 25 files changed, 2841 insertions(+), 39 deletions(-)

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
new file mode 100644
index 0000000..864d7fa
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/AbstractTimeIndexGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.time.format.DateTimeFormatter;
+
+/**
+ * Abstract class for time related {@link IndexGenerator}.
+ */
+@Internal
+abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
+
+       private final String dateTimeFormat;
+       protected transient DateTimeFormatter dateTimeFormatter;
+
+       public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
+               super(index);
+               this.dateTimeFormat = dateTimeFormat;
+       }
+
+       @Override
+       public void open() {
+               this.dateTimeFormatter = 
DateTimeFormatter.ofPattern(dateTimeFormat);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
new file mode 100644
index 0000000..48b848c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConfiguration.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+
+/**
+ * Accessor methods to elasticsearch options.
+ */
+@Internal
+class ElasticsearchConfiguration {
+       protected final ReadableConfig config;
+       private final ClassLoader classLoader;
+
+       ElasticsearchConfiguration(ReadableConfig config, ClassLoader 
classLoader) {
+               this.config = config;
+               this.classLoader = classLoader;
+       }
+
+       public ActionRequestFailureHandler getFailureHandler() {
+               final ActionRequestFailureHandler failureHandler;
+               String value = config.get(FAILURE_HANDLER_OPTION);
+               switch (value.toUpperCase()) {
+                       case "FAIL":
+                               failureHandler = new NoOpFailureHandler();
+                               break;
+                       case "IGNORE":
+                               failureHandler = new IgnoringFailureHandler();
+                               break;
+                       case "RETRY-REJECTED":
+                               failureHandler = new 
RetryRejectedExecutionFailureHandler();
+                               break;
+                       default:
+                               try {
+                                       Class<?> failureHandlerClass = 
Class.forName(value, false, classLoader);
+                                       failureHandler = 
(ActionRequestFailureHandler) 
InstantiationUtil.instantiate(failureHandlerClass);
+                               } catch (ClassNotFoundException e) {
+                                       throw new ValidationException("Could 
not instantiate the failure handler class: " + value, e);
+                               }
+                               break;
+               }
+               return failureHandler;
+       }
+
+       public String getDocumentType() {
+               return config.get(ElasticsearchOptions.DOCUMENT_TYPE_OPTION);
+       }
+
+       public Optional<Integer> getBulkFlushMaxActions() {
+               return 
config.getOptional(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION);
+       }
+
+       public Optional<Integer> getBulkFlushMaxSize() {
+               return 
config.getOptional(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION).map(MemorySize::getMebiBytes);
+       }
+
+       public Optional<Long> getBulkFlushInterval() {
+               return 
config.getOptional(BULK_FLUSH_INTERVAL_OPTION).map(Duration::toMillis);
+       }
+
+       public boolean isBulkFlushBackoffEnabled() {
+               return config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION) != 
ElasticsearchOptions.BackOffType.DISABLED;
+       }
+
+       public Optional<ElasticsearchSinkBase.FlushBackoffType> 
getBulkFlushBackoffType() {
+               switch (config.get(BULK_FLUSH_BACKOFF_TYPE_OPTION)) {
+                       case CONSTANT:
+                               return 
Optional.of(ElasticsearchSinkBase.FlushBackoffType.CONSTANT);
+                       case EXPONENTIAL:
+                               return 
Optional.of(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+                       default:
+                               return Optional.empty();
+               }
+       }
+
+       public Optional<Integer> getBulkFlushBackoffRetries() {
+               return 
config.getOptional(BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION);
+       }
+
+       public Optional<Long> getBulkFlushBackoffDelay() {
+               return 
config.getOptional(BULK_FLUSH_BACKOFF_DELAY_OPTION).map(Duration::toMillis);
+       }
+
+       public boolean isDisableFlushOnCheckpoint() {
+               return 
!config.get(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION);
+       }
+
+       public String getIndex() {
+               return config.get(ElasticsearchOptions.INDEX_OPTION);
+       }
+
+       public String getKeyDelimiter() {
+               return config.get(ElasticsearchOptions.KEY_DELIMITER_OPTION);
+       }
+
+       public Optional<String> getPathPrefix() {
+               return 
config.getOptional(ElasticsearchOptions.CONNECTION_PATH_PREFIX);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               ElasticsearchConfiguration that = (ElasticsearchConfiguration) 
o;
+               return Objects.equals(config, that.config) &&
+                       Objects.equals(classLoader, that.classLoader);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(config, classLoader);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
new file mode 100644
index 0000000..c68ca68
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchOptions.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Options for {@link 
org.apache.flink.table.factories.DynamicTableSinkFactory} for Elasticsearch.
+ */
+public class ElasticsearchOptions {
+       /**
+        * Backoff strategy. Extends {@link 
ElasticsearchSinkBase.FlushBackoffType} with
+        * {@code DISABLED} option.
+        */
+       public enum BackOffType {
+               DISABLED,
+               CONSTANT,
+               EXPONENTIAL
+       }
+
+       public static final ConfigOption<List<String>> HOSTS_OPTION =
+               ConfigOptions.key("hosts")
+                       .stringType()
+                       .asList()
+                       .noDefaultValue()
+                       .withDescription("Elasticseatch hosts to connect to.");
+       public static final ConfigOption<String> INDEX_OPTION =
+               ConfigOptions.key("index")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Elasticsearch index for every 
record.");
+       public static final ConfigOption<String> DOCUMENT_TYPE_OPTION =
+               ConfigOptions.key("document-type")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Elasticsearch document type.");
+       public static final ConfigOption<String> KEY_DELIMITER_OPTION =
+               ConfigOptions.key("document-id.key-delimiter")
+                       .stringType()
+                       .defaultValue("_")
+                       .withDescription("Delimiter for composite keys e.g., 
\"$\" would result in IDs \"KEY1$KEY2$KEY3\".");
+       public static final ConfigOption<String> FAILURE_HANDLER_OPTION =
+               ConfigOptions.key("failure-handler")
+                       .stringType()
+                       .defaultValue("fail")
+                       .withDescription(Description.builder()
+                               .text("Failure handling strategy in case a 
request to Elasticsearch fails")
+                               .list(
+                                       text("\"fail\" (throws an exception if 
a request fails and thus causes a job failure),"),
+                                       text("\"ignore\" (ignores failures and 
drops the request),"),
+                                       text("\"retry_rejected\" (re-adds 
requests that have failed due to queue capacity saturation),"),
+                                       text("\"class name\" for failure 
handling with a ActionRequestFailureHandler subclass"))
+                               .build());
+       public static final ConfigOption<Boolean> FLUSH_ON_CHECKPOINT_OPTION =
+               ConfigOptions.key("sink.flush-on-checkpoint")
+                       .booleanType()
+                       .defaultValue(true)
+                       .withDescription("Disables flushing on checkpoint");
+       public static final ConfigOption<Integer> BULK_FLUSH_MAX_ACTIONS_OPTION 
=
+               ConfigOptions.key("sink.bulk-flush.max-actions")
+                       .intType()
+                       .noDefaultValue()
+                       .withDescription("Maximum number of actions to buffer 
for each bulk request.");
+       public static final ConfigOption<MemorySize> BULK_FLASH_MAX_SIZE_OPTION 
=
+               ConfigOptions.key("sink.bulk-flush.max-size")
+                       .memoryType()
+                       .noDefaultValue()
+                       .withDescription("Maximum size of buffered actions per 
bulk request");
+       public static final ConfigOption<Duration> BULK_FLUSH_INTERVAL_OPTION =
+               ConfigOptions.key("sink.bulk-flush.interval")
+                       .durationType()
+                       .noDefaultValue()
+                       .withDescription("Bulk flush interval");
+       public static final ConfigOption<BackOffType> 
BULK_FLUSH_BACKOFF_TYPE_OPTION =
+               ConfigOptions.key("sink.bulk-flush.back-off.strategy")
+                       .enumType(BackOffType.class)
+                       .defaultValue(BackOffType.DISABLED)
+                       .withDescription("Backoff strategy");
+       public static final ConfigOption<Integer> 
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION =
+               ConfigOptions.key("sink.bulk-flush.back-off.max-retries")
+                       .intType()
+                       .noDefaultValue()
+                       .withDescription("Maximum number of retries.");
+       public static final ConfigOption<Duration> 
BULK_FLUSH_BACKOFF_DELAY_OPTION =
+               ConfigOptions.key("sink.bulk-flush.back-off.delay")
+                       .durationType()
+                       .noDefaultValue()
+                       .withDescription("Delay between each backoff attempt.");
+       public static final ConfigOption<Duration> 
CONNECTION_MAX_RETRY_TIMEOUT_OPTION =
+               ConfigOptions.key("connection.max-retry-timeout")
+                       .durationType()
+                       .noDefaultValue()
+                       .withDescription("Maximum timeout between retries.");
+       public static final ConfigOption<String> CONNECTION_PATH_PREFIX =
+               ConfigOptions.key("connection.path-prefix")
+                       .stringType()
+                       .noDefaultValue()
+                       .withDescription("Prefix string to be added to every 
REST communication.");
+       public static final ConfigOption<String> FORMAT_OPTION =
+               ConfigOptions.key("format")
+                       .stringType()
+                       .defaultValue("json")
+                       .withDescription("Elasticsearch connector requires to 
specify a format.\n" +
+                               "The format must produce a valid json document. 
\n" +
+                               "By default uses built-in 'json' format. Please 
refer to Table Formats section for more details.");
+
+       private ElasticsearchOptions() {
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
new file mode 100644
index 0000000..b5caa76
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot;
+
+/**
+ * Utility methods for validating Elasticsearch properties.
+ */
+@Internal
+class ElasticsearchValidationUtils {
+
+       private static final Set<LogicalTypeRoot> ILLEGAL_PRIMARY_KEY_TYPES = 
new LinkedHashSet<>();
+
+       static {
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ARRAY);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MAP);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.MULTISET);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.STRUCTURED_TYPE);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.ROW);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.RAW);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.BINARY);
+               ILLEGAL_PRIMARY_KEY_TYPES.add(LogicalTypeRoot.VARBINARY);
+       }
+
+       /**
+        * Checks that the table does not have primary key defined on illegal 
types.
+        * In Elasticsearch the primary key is used to calculate the 
Elasticsearch document id,
+        * which is a string of up to 512 bytes. It cannot have whitespaces. As 
of now it is calculated
+        * by concatenating the fields. Certain types do not have a good string 
representation to be used
+        * in this scenario. The illegal types are mostly {@link 
LogicalTypeFamily#COLLECTION} types and
+        * {@link LogicalTypeRoot#RAW} type.
+        */
+       public static void validatePrimaryKey(TableSchema schema) {
+               schema.getPrimaryKey().ifPresent(
+                       key -> {
+                               List<LogicalTypeRoot> illegalTypes = 
key.getColumns()
+                                       .stream()
+                                       .map(fieldName -> {
+                                               LogicalType logicalType = 
schema.getFieldDataType(fieldName).get().getLogicalType();
+                                               if (hasRoot(logicalType, 
LogicalTypeRoot.DISTINCT_TYPE)) {
+                                                       return ((DistinctType) 
logicalType).getSourceType().getTypeRoot();
+                                               } else {
+                                                       return 
logicalType.getTypeRoot();
+                                               }
+                                       })
+                                       
.filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
+                                       .collect(Collectors.toList());
+
+                               if (!illegalTypes.isEmpty()) {
+                                       throw new ValidationException(
+                                               String.format(
+                                                       "The table has a 
primary key on columns of illegal types: %s.\n" +
+                                                               " Elasticsearch 
sink does not support primary keys on columns of types: %s.",
+                                                       illegalTypes,
+                                                       
ILLEGAL_PRIMARY_KEY_TYPES));
+                               }
+                       }
+               );
+       }
+
+       private ElasticsearchValidationUtils() {
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
new file mode 100644
index 0000000..f5faf84
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+
+/**
+ * This interface is responsible to generate index name from given {@link Row} 
record.
+ */
+@Internal
+interface IndexGenerator extends Serializable {
+
+       /**
+        * Initialize the index generator, this will be called only once before 
{@link #generate(RowData)} is called.
+        */
+       default void open() {}
+
+       /**
+        * Generate index name according the the given row.
+        */
+       String generate(RowData row);
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
new file mode 100644
index 0000000..5df3efd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/**
+ * Base class for {@link IndexGenerator}.
+ */
+@Internal
+public abstract class IndexGeneratorBase implements IndexGenerator {
+
+       private static final long serialVersionUID = 1L;
+       protected final String index;
+
+       public IndexGeneratorBase(String index) {
+               this.index = index;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof IndexGeneratorBase)) {
+                       return false;
+               }
+               IndexGeneratorBase that = (IndexGeneratorBase) o;
+               return index.equals(that.index);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(index);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
new file mode 100644
index 0000000..e60be72
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactory.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Factory of {@link IndexGenerator}.
+ *
+ * <p>Flink supports both static index and dynamic index.
+ *
+ * <p>If you want to have a static index, this option value should be a plain 
string, e.g. 'myusers',
+ * all the records will be consistently written into "myusers" index.
+ *
+ * <p>If you want to have a dynamic index, you can use '{field_name}' to 
reference a field value in the
+ * record to dynamically generate a target index. You can also use 
'{field_name|date_format_string}' to
+ * convert a field value of TIMESTAMP/DATE/TIME type into the format specified 
by date_format_string. The
+ * date_format_string is compatible with {@link java.text.SimpleDateFormat}. 
For example, if the option
+ * value is 'myusers_{log_ts|yyyy-MM-dd}', then a record with log_ts field 
value 2020-03-27 12:25:55 will
+ * be written into "myusers-2020-03-27" index.
+ */
+@Internal
+final class IndexGeneratorFactory {
+
+       private IndexGeneratorFactory() {}
+
+       public static IndexGenerator createIndexGenerator(String index, 
TableSchema schema) {
+               final IndexHelper indexHelper = new IndexHelper();
+               if (indexHelper.checkIsDynamicIndex(index)) {
+                       return createRuntimeIndexGenerator(index, 
schema.getFieldNames(), schema.getFieldDataTypes(), indexHelper);
+               } else {
+                       return new StaticIndexGenerator(index);
+               }
+       }
+
+       interface DynamicFormatter extends Serializable {
+               String format(@Nonnull Object fieldValue, DateTimeFormatter 
formatter);
+       }
+
+       private static IndexGenerator createRuntimeIndexGenerator(
+                       String index,
+                       String[] fieldNames,
+                       DataType[] fieldTypes,
+                       IndexHelper indexHelper) {
+               final String dynamicIndexPatternStr = 
indexHelper.extractDynamicIndexPatternStr(index);
+               final String indexPrefix = index.substring(0, 
index.indexOf(dynamicIndexPatternStr));
+               final String indexSuffix = index.substring(indexPrefix.length() 
+ dynamicIndexPatternStr.length());
+
+               final boolean isDynamicIndexWithFormat = 
indexHelper.checkIsDynamicIndexWithFormat(index);
+               final int indexFieldPos = 
indexHelper.extractIndexFieldPos(index, fieldNames, isDynamicIndexWithFormat);
+               final LogicalType indexFieldType = 
fieldTypes[indexFieldPos].getLogicalType();
+               final LogicalTypeRoot indexFieldLogicalTypeRoot = 
indexFieldType.getTypeRoot();
+
+               // validate index field type
+               indexHelper.validateIndexFieldType(indexFieldLogicalTypeRoot);
+
+               // time extract dynamic index pattern
+               final RowData.FieldGetter fieldGetter = 
RowData.createFieldGetter(indexFieldType, indexFieldPos);
+
+               if (isDynamicIndexWithFormat) {
+                       final String dateTimeFormat = 
indexHelper.extractDateFormat(index, indexFieldLogicalTypeRoot);
+                       DynamicFormatter formatFunction = createFormatFunction(
+                               indexFieldType,
+                               indexFieldLogicalTypeRoot);
+
+                       return new AbstractTimeIndexGenerator(index, 
dateTimeFormat) {
+                               @Override
+                               public String generate(RowData row) {
+                                       Object fieldOrNull = 
fieldGetter.getFieldOrNull(row);
+                                       final String formattedField;
+                                       // TODO we can possibly optimize it to 
use the nullability of the field
+                                       if (fieldOrNull != null) {
+                                               formattedField = 
formatFunction.format(fieldOrNull, dateTimeFormatter);
+                                       } else {
+                                               formattedField = "null";
+                                       }
+                                       return 
indexPrefix.concat(formattedField).concat(indexSuffix);
+                               }
+                       };
+               }
+               // general dynamic index pattern
+               return new IndexGeneratorBase(index) {
+                       @Override
+                       public String generate(RowData row) {
+                               Object indexField = 
fieldGetter.getFieldOrNull(row);
+                               return indexPrefix.concat(indexField == null ? 
"null" : indexField.toString()).concat(indexSuffix);
+                       }
+               };
+       }
+
+       private static DynamicFormatter createFormatFunction(
+                       LogicalType indexFieldType,
+                       LogicalTypeRoot indexFieldLogicalTypeRoot) {
+               switch (indexFieldLogicalTypeRoot) {
+                       case DATE:
+                               return (value, dateTimeFormatter) -> {
+                                       Integer indexField = (Integer) value;
+                                       return 
LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
+                               };
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return (value, dateTimeFormatter) -> {
+                                       Integer indexField = (Integer) value;
+                                       return LocalTime.ofNanoOfDay(indexField 
* 1_000_000L)
+                                               .format(dateTimeFormatter);
+                               };
+                       case TIMESTAMP_WITHOUT_TIME_ZONE:
+                               return (value, dateTimeFormatter) -> {
+                                       TimestampData indexField = 
(TimestampData) value;
+                                       return 
indexField.toLocalDateTime().format(dateTimeFormatter);
+                               };
+                       case TIMESTAMP_WITH_TIME_ZONE:
+                               throw new 
UnsupportedOperationException("TIMESTAMP_WITH_TIME_ZONE is not supported yet");
+                       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                               return (value, dateTimeFormatter) -> {
+                                       TimestampData indexField = 
(TimestampData) value;
+                                       return 
indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
+                               };
+                       default:
+                               throw new TableException(String.format(
+                                       "Unsupported type '%s' found in 
Elasticsearch dynamic index field, " +
+                                               "time-related pattern only 
support types are: DATE,TIME,TIMESTAMP.",
+                                       indexFieldType));
+               }
+       }
+
+       /**
+        * Helper class for {@link IndexGeneratorFactory}, this helper can use 
to validate index field type
+        * ans parse index format from pattern.
+        */
+       private static class IndexHelper {
+               private static final Pattern dynamicIndexPattern = 
Pattern.compile("\\{[^\\{\\}]+\\}?");
+               private static final Pattern dynamicIndexTimeExtractPattern = 
Pattern.compile(".*\\{.+\\|.*\\}.*");
+               private static final List<LogicalTypeRoot> supportedTypes = new 
ArrayList<>();
+               private static final Map<LogicalTypeRoot, String> 
defaultFormats = new HashMap<>();
+
+               static {
+                       //time related types
+                       supportedTypes.add(LogicalTypeRoot.DATE);
+                       
supportedTypes.add(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE);
+                       
supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE);
+                       
supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE);
+                       
supportedTypes.add(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+                       //general types
+                       supportedTypes.add(LogicalTypeRoot.VARCHAR);
+                       supportedTypes.add(LogicalTypeRoot.CHAR);
+                       supportedTypes.add(LogicalTypeRoot.TINYINT);
+                       supportedTypes.add(LogicalTypeRoot.INTEGER);
+                       supportedTypes.add(LogicalTypeRoot.BIGINT);
+               }
+
+               static {
+
+                       defaultFormats.put(LogicalTypeRoot.DATE, "yyyy_MM_dd");
+                       
defaultFormats.put(LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE, "HH_mm_ss");
+                       
defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ss");
+                       
defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ss");
+                       
defaultFormats.put(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
"yyyy_MM_dd_HH_mm_ssX");
+               }
+
+               /**
+                * Validate the index field Type.
+                */
+               void validateIndexFieldType(LogicalTypeRoot logicalType) {
+                       if (!supportedTypes.contains(logicalType)) {
+                               throw new 
IllegalArgumentException(String.format("Unsupported type %s of index field, " +
+                                       "Supported types are: %s", logicalType, 
supportedTypes));
+                       }
+               }
+
+               /**
+                * Get the default date format.
+                */
+               String getDefaultFormat(LogicalTypeRoot logicalType) {
+                       return defaultFormats.get(logicalType);
+               }
+
+               /**
+                * Check general dynamic index is enabled or not by index 
pattern.
+                */
+               boolean checkIsDynamicIndex(String index) {
+                       final Matcher matcher = 
dynamicIndexPattern.matcher(index);
+                       int count = 0;
+                       while (matcher.find()) {
+                               count++;
+                       }
+                       if (count > 1) {
+                               throw new 
TableException(String.format("Chaining dynamic index pattern %s is not 
supported," +
+                                       " only support single dynamic index 
pattern.", index));
+                       }
+                       return count == 1;
+               }
+
+               /**
+                * Check time extract dynamic index is enabled or not by index 
pattern.
+                */
+               boolean checkIsDynamicIndexWithFormat(String index) {
+                       return 
dynamicIndexTimeExtractPattern.matcher(index).matches();
+               }
+
+               /**
+                * Extract dynamic index pattern string from index pattern 
string.
+                */
+               String extractDynamicIndexPatternStr(String index) {
+                       int start = index.indexOf("{");
+                       int end = index.lastIndexOf("}");
+                       return index.substring(start, end + 1);
+               }
+
+               /**
+                * Extract index field position in a fieldNames, return the 
field position.
+                */
+               int extractIndexFieldPos(String index, String[] fieldNames, 
boolean isDynamicIndexWithFormat) {
+                       List<String> fieldList = Arrays.asList(fieldNames);
+                       String indexFieldName;
+                       if (isDynamicIndexWithFormat) {
+                               indexFieldName = 
index.substring(index.indexOf("{") + 1, index.indexOf("|"));
+                       } else {
+                               indexFieldName = 
index.substring(index.indexOf("{") + 1, index.indexOf("}"));
+                       }
+                       if (!fieldList.contains(indexFieldName)) {
+                               throw new TableException(String.format("Unknown 
field '%s' in index pattern '%s', please check the field name.",
+                                       indexFieldName, index));
+                       }
+                       return fieldList.indexOf(indexFieldName);
+               }
+
+               /**
+                * Extract dateTime format by the date format that extracted 
from index pattern string.
+                */
+               private String extractDateFormat(String index, LogicalTypeRoot 
logicalType) {
+                       String format =  index.substring(index.indexOf("|") + 
1, index.indexOf("}"));
+                       if ("".equals(format)) {
+                               format = getDefaultFormat(logicalType);
+                       }
+                       return format;
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
new file mode 100644
index 0000000..db28ff1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.Period;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * An extractor for a Elasticsearch key from a {@link RowData}.
+ */
+@Internal
+class KeyExtractor implements Function<RowData, String>, Serializable {
+       private final FieldFormatter[] fieldFormatters;
+       private final String keyDelimiter;
+
+       private interface FieldFormatter extends Serializable {
+               String format(RowData rowData);
+       }
+
+       private KeyExtractor(
+                       FieldFormatter[] fieldFormatters,
+                       String keyDelimiter) {
+               this.fieldFormatters = fieldFormatters;
+               this.keyDelimiter = keyDelimiter;
+       }
+
+       @Override
+       public String apply(RowData rowData) {
+               final StringBuilder builder = new StringBuilder();
+               for (int i = 0; i < fieldFormatters.length; i++) {
+                       if (i > 0) {
+                               builder.append(keyDelimiter);
+                       }
+                       final String value = fieldFormatters[i].format(rowData);
+                       builder.append(value);
+               }
+               return builder.toString();
+       }
+
+       private static class ColumnWithIndex {
+               public TableColumn column;
+               public int index;
+
+               public ColumnWithIndex(TableColumn column, int index) {
+                       this.column = column;
+                       this.index = index;
+               }
+
+               public LogicalType getType() {
+                       return column.getType().getLogicalType();
+               }
+
+               public int getIndex() {
+                       return index;
+               }
+       }
+
+       public static Function<RowData, String> createKeyExtractor(
+                       TableSchema schema,
+                       String keyDelimiter) {
+               return schema.getPrimaryKey().map(key -> {
+                       Map<String, ColumnWithIndex> namesToColumns = new 
HashMap<>();
+                       List<TableColumn> tableColumns = 
schema.getTableColumns();
+                       for (int i = 0; i < schema.getFieldCount(); i++) {
+                               TableColumn column = tableColumns.get(i);
+                               namesToColumns.put(column.getName(), new 
ColumnWithIndex(column, i));
+                       }
+
+                       FieldFormatter[] fieldFormatters = key.getColumns()
+                               .stream()
+                               .map(namesToColumns::get)
+                               .map(column -> toFormatter(column.index, 
column.getType()))
+                               .toArray(FieldFormatter[]::new);
+
+                       return (Function<RowData, String>) new KeyExtractor(
+                               fieldFormatters,
+                               keyDelimiter
+                       );
+               }).orElseGet(() -> (Function<RowData, String> & Serializable) 
(row) -> null);
+       }
+
+       private static FieldFormatter toFormatter(int index, LogicalType type) {
+               switch (type.getTypeRoot()) {
+                       case DATE:
+                               return (row) -> 
LocalDate.ofEpochDay(row.getInt(index)).toString();
+                       case TIME_WITHOUT_TIME_ZONE:
+                               return (row) -> LocalTime.ofNanoOfDay((long) 
row.getInt(index) * 1_000_000L).toString();
+                       case INTERVAL_YEAR_MONTH:
+                               return (row) -> 
Period.ofDays(row.getInt(index)).toString();
+                       case INTERVAL_DAY_TIME:
+                               return (row) -> 
Duration.ofMillis(row.getLong(index)).toString();
+                       case DISTINCT_TYPE:
+                               return toFormatter(index, ((DistinctType) 
type).getSourceType());
+                       default:
+                               RowData.FieldGetter fieldGetter = 
RowData.createFieldGetter(
+                                       type,
+                                       index);
+                               return (row) -> 
fieldGetter.getFieldOrNull(row).toString();
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
new file mode 100644
index 0000000..35d69eb
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import java.io.Serializable;
+
+/**
+ * For version-agnostic creating of {@link ActionRequest}s.
+ */
+@Internal
+interface RequestFactory extends Serializable {
+       /**
+        * Creates an update request to be added to a {@link RequestIndexer}.
+        * Note: the type field has been deprecated since Elasticsearch 7.x and 
it would not take any effort.
+        */
+       UpdateRequest createUpdateRequest(
+               String index,
+               String docType,
+               String key,
+               XContentType contentType,
+               byte[] document);
+
+       /**
+        * Creates an index request to be added to a {@link RequestIndexer}.
+        * Note: the type field has been deprecated since Elasticsearch 7.x and 
it would not take any effort.
+        */
+       IndexRequest createIndexRequest(
+               String index,
+               String docType,
+               String key,
+               XContentType contentType,
+               byte[] document);
+
+       /**
+        * Creates a delete request to be added to a {@link RequestIndexer}.
+        * Note: the type field has been deprecated since Elasticsearch 7.x and 
it would not take any effort.
+        */
+       DeleteRequest createDeleteRequest(
+               String index,
+               String docType,
+               String key);
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
new file mode 100644
index 0000000..4eaba48
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * Sink function for converting upserts into Elasticsearch {@link 
ActionRequest}s.
+ */
+@Internal
+class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final IndexGenerator indexGenerator;
+       private final String docType;
+       private final SerializationSchema<RowData> serializationSchema;
+       private final XContentType contentType;
+       private final RequestFactory requestFactory;
+       private final Function<RowData, String> createKey;
+
+       public RowElasticsearchSinkFunction(
+                       IndexGenerator indexGenerator,
+                       @Nullable String docType, // this is deprecated in es 7+
+                       SerializationSchema<RowData> serializationSchema,
+                       XContentType contentType,
+                       RequestFactory requestFactory,
+                       Function<RowData, String> createKey) {
+               this.indexGenerator = 
Preconditions.checkNotNull(indexGenerator);
+               this.docType = docType;
+               this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+               this.contentType = Preconditions.checkNotNull(contentType);
+               this.requestFactory = 
Preconditions.checkNotNull(requestFactory);
+               this.createKey = Preconditions.checkNotNull(createKey);
+       }
+
+       @Override
+       public void process(
+                       RowData element,
+                       RuntimeContext ctx,
+                       RequestIndexer indexer) {
+               switch (element.getRowKind()) {
+                       case INSERT:
+                       case UPDATE_AFTER:
+                               processUpsert(element, indexer);
+                               break;
+                       case UPDATE_BEFORE:
+                       case DELETE:
+                               processDelete(element, indexer);
+                               break;
+                       default:
+                               throw new TableException("Unsupported message 
kind: " + element.getRowKind());
+               }
+       }
+
+       private void processUpsert(RowData row, RequestIndexer indexer) {
+               final byte[] document = serializationSchema.serialize(row);
+               final String key = createKey.apply(row);
+               if (key != null) {
+                       final UpdateRequest updateRequest = 
requestFactory.createUpdateRequest(
+                               indexGenerator.generate(row),
+                               docType,
+                               key,
+                               contentType,
+                               document);
+                       indexer.add(updateRequest);
+               } else {
+                       final IndexRequest indexRequest = 
requestFactory.createIndexRequest(
+                               indexGenerator.generate(row),
+                               docType,
+                               key,
+                               contentType,
+                               document);
+                       indexer.add(indexRequest);
+               }
+       }
+
+       private void processDelete(RowData row, RequestIndexer indexer) {
+               final String key = createKey.apply(row);
+               final DeleteRequest deleteRequest = 
requestFactory.createDeleteRequest(
+                       indexGenerator.generate(row),
+                       docType,
+                       key);
+               indexer.add(deleteRequest);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               RowElasticsearchSinkFunction that = 
(RowElasticsearchSinkFunction) o;
+               return Objects.equals(indexGenerator, that.indexGenerator) &&
+                       Objects.equals(docType, that.docType) &&
+                       Objects.equals(serializationSchema, 
that.serializationSchema) &&
+                       contentType == that.contentType &&
+                       Objects.equals(requestFactory, that.requestFactory) &&
+                       Objects.equals(createKey, that.createKey);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(indexGenerator, docType, 
serializationSchema, contentType, requestFactory, createKey);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
new file mode 100644
index 0000000..196b64c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/StaticIndexGenerator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * A static {@link IndexGenerator} which generate fixed index name.
+ */
+@Internal
+final class StaticIndexGenerator extends IndexGeneratorBase {
+
+       public StaticIndexGenerator(String index) {
+               super(index);
+       }
+
+       public String generate(RowData row) {
+               return index;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
index 8b56896..35bdca7 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.java
@@ -21,17 +21,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
 import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.util.InstantiationUtil;
 
 import org.elasticsearch.client.Client;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,41 +44,10 @@ import static org.junit.Assert.fail;
  */
 public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> 
extends AbstractTestBase {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSinkTestBase.class);
-
        protected static final String CLUSTER_NAME = "test-cluster";
 
-       protected static EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
-
        @ClassRule
-       public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @BeforeClass
-       public static void prepare() throws Exception {
-
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Starting embedded Elasticsearch node ");
-               
LOG.info("-------------------------------------------------------------------------");
-
-               // dynamically load version-specific implementation of the 
Elasticsearch embedded node environment
-               Class<?> clazz = Class.forName(
-                       
"org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl");
-               embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) 
InstantiationUtil.instantiate(clazz);
-
-               embeddedNodeEnv.start(tempFolder.newFolder(), CLUSTER_NAME);
-
-       }
-
-       @AfterClass
-       public static void shutdown() throws Exception {
-
-               
LOG.info("-------------------------------------------------------------------------");
-               LOG.info("    Shutting down embedded Elasticsearch node ");
-               
LOG.info("-------------------------------------------------------------------------");
-
-               embeddedNodeEnv.close();
-
-       }
+       public static ElasticsearchResource elasticsearchResource = new 
ElasticsearchResource(CLUSTER_NAME);
 
        /**
         * Tests that the Elasticsearch sink works properly with json.
@@ -128,7 +92,7 @@ public abstract class ElasticsearchSinkTestBase<C extends 
AutoCloseable, A> exte
                env.execute("Elasticsearch Sink Test");
 
                // verify the results
-               Client client = embeddedNodeEnv.getClient();
+               Client client = elasticsearchResource.getClient();
                SourceSinkDataTestKit.verifyProducedSinkData(client, index);
 
                client.close();
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
new file mode 100644
index 0000000..2f0eedd
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/IndexGeneratorFactoryTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.UnsupportedTemporalTypeException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link IndexGeneratorFactory}.
+ */
+public class IndexGeneratorFactoryTest extends TestLogger {
+
+       private TableSchema schema;
+       private List<RowData> rows;
+
+       @Before
+       public void prepareData() {
+               schema = new TableSchema.Builder()
+                       .field("id", DataTypes.INT())
+                       .field("item", DataTypes.STRING())
+                       .field("log_ts", DataTypes.BIGINT())
+                       .field("log_date", DataTypes.DATE())
+                       .field("log_time", DataTypes.TIME())
+                       .field("order_timestamp", DataTypes.TIMESTAMP())
+                       .field("local_timestamp", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                       .field("status", DataTypes.BOOLEAN())
+                       .build();
+
+               rows = new ArrayList<>();
+               rows.add(GenericRowData.of(
+                       1,
+                       StringData.fromString("apple"),
+                       Timestamp.valueOf("2020-03-18 12:12:14").getTime(),
+                       (int) LocalDate.parse("2020-03-18").toEpochDay(),
+                       (int) (LocalTime.parse("12:12:14").toNanoOfDay() / 
1_000_000L),
+                       
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-18T12:12:14")),
+                       
TimestampData.fromInstant(Instant.parse("2020-03-18T12:12:14Z")),
+                       true));
+               rows.add(GenericRowData.of(
+                       2,
+                       StringData.fromString("peanut"),
+                       Timestamp.valueOf("2020-03-19 12:12:14").getTime(),
+                       (int) LocalDate.parse("2020-03-19").toEpochDay(),
+                       (int) (LocalTime.parse("12:22:21").toNanoOfDay() / 
1_000_000L),
+                       
TimestampData.fromLocalDateTime(LocalDateTime.parse("2020-03-19T12:22:14")),
+                       
TimestampData.fromInstant(Instant.parse("2020-03-19T12:12:14Z")),
+                       false));
+       }
+
+       @Test
+       public void testDynamicIndexFromTimestamp() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "{order_timestamp|yyyy_MM_dd_HH-ss}_index", 
schema);
+               indexGenerator.open();
+               Assert.assertEquals("2020_03_18_12-14_index", 
indexGenerator.generate(rows.get(0)));
+               IndexGenerator indexGenerator1 = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "{order_timestamp|yyyy_MM_dd_HH_mm}_index", 
schema);
+               indexGenerator1.open();
+               Assert.assertEquals("2020_03_19_12_22_index", 
indexGenerator1.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testDynamicIndexFromDate() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "my-index-{log_date|yyyy/MM/dd}", schema);
+               indexGenerator.open();
+               Assert.assertEquals("my-index-2020/03/18", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("my-index-2020/03/19", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testDynamicIndexFromTime() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "my-index-{log_time|HH-mm}", schema);
+               indexGenerator.open();
+               Assert.assertEquals("my-index-12-12", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("my-index-12-22", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testDynamicIndexDefaultFormat() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "my-index-{log_time|}", schema);
+               indexGenerator.open();
+               Assert.assertEquals("my-index-12_12_14", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("my-index-12_22_21", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testDynamicIndexDefaultFormatTimestampWithLocalTimeZone() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "my-index-{local_timestamp|}", schema);
+               indexGenerator.open();
+               Assert.assertEquals("my-index-2020_03_18_12_12_14Z", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("my-index-2020_03_19_12_12_14Z", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testGeneralDynamicIndex() {
+               IndexGenerator indexGenerator = IndexGeneratorFactory
+                       .createIndexGenerator(
+                               "index_{item}", schema);
+               indexGenerator.open();
+               Assert.assertEquals("index_apple", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("index_peanut", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testStaticIndex() {
+               IndexGenerator indexGenerator = 
IndexGeneratorFactory.createIndexGenerator(
+                       "my-index", schema);
+               indexGenerator.open();
+               Assert.assertEquals("my-index", 
indexGenerator.generate(rows.get(0)));
+               Assert.assertEquals("my-index", 
indexGenerator.generate(rows.get(1)));
+       }
+
+       @Test
+       public void testUnknownField() {
+               String expectedExceptionMsg = "Unknown field 'unknown_ts' in 
index pattern 'my-index-{unknown_ts|yyyy-MM-dd}'," +
+                       " please check the field name.";
+               try {
+                       
IndexGeneratorFactory.createIndexGenerator("my-index-{unknown_ts|yyyy-MM-dd}", 
schema);
+               } catch (TableException e) {
+                       Assert.assertEquals(e.getMessage(), 
expectedExceptionMsg);
+               }
+       }
+
+       @Test
+       public void testUnsupportedTimeType() {
+               String expectedExceptionMsg = "Unsupported type 'INT' found in 
Elasticsearch dynamic index field, " +
+                       "time-related pattern only support types are: 
DATE,TIME,TIMESTAMP.";
+               try {
+                       
IndexGeneratorFactory.createIndexGenerator("my-index-{id|yyyy-MM-dd}", schema);
+               } catch (TableException e) {
+                       Assert.assertEquals(expectedExceptionMsg, 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testUnsupportedMultiParametersType() {
+               String expectedExceptionMsg = "Chaining dynamic index pattern 
my-index-{local_date}-{local_time} is not supported," +
+                       " only support single dynamic index pattern.";
+               try {
+                       
IndexGeneratorFactory.createIndexGenerator("my-index-{local_date}-{local_time}",
 schema);
+               } catch (TableException e) {
+                       Assert.assertEquals(expectedExceptionMsg, 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testDynamicIndexUnsupportedFormat() {
+               String expectedExceptionMsg = "Unsupported field: HourOfDay";
+               try {
+                       IndexGeneratorFactory.createIndexGenerator(
+                               "my-index-{log_date|yyyy/MM/dd HH:mm}", schema);
+               } catch (UnsupportedTemporalTypeException e) {
+                       Assert.assertEquals(expectedExceptionMsg, 
e.getMessage());
+               }
+       }
+
+       @Test
+       public void testUnsupportedIndexFieldType() {
+               String expectedExceptionMsg = "Unsupported type BOOLEAN of 
index field, Supported types are:" +
+                       " [DATE, TIME_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE," +
+                       " TIMESTAMP_WITH_LOCAL_TIME_ZONE, VARCHAR, CHAR, 
TINYINT, INTEGER, BIGINT]";
+               try {
+                       
IndexGeneratorFactory.createIndexGenerator("index_{status}", schema);
+               } catch (IllegalArgumentException e) {
+                       Assert.assertEquals(expectedExceptionMsg, 
e.getMessage());
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
new file mode 100644
index 0000000..96b6e3e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractorTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KeyExtractor}.
+ */
+public class KeyExtractorTest {
+       @Test
+       public void testSimpleKey() {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT().notNull())
+                       .field("b", DataTypes.STRING())
+                       .primaryKey("a")
+                       .build();
+
+               Function<RowData, String> keyExtractor = 
KeyExtractor.createKeyExtractor(schema, "_");
+
+               String key = keyExtractor.apply(GenericRowData.of(12L, 
StringData.fromString("ABCD")));
+               assertThat(key, equalTo("12"));
+       }
+
+       @Test
+       public void testNoPrimaryKey() {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT().notNull())
+                       .field("b", DataTypes.STRING())
+                       .build();
+
+               Function<RowData, String> keyExtractor = 
KeyExtractor.createKeyExtractor(schema, "_");
+
+               String key = keyExtractor.apply(GenericRowData.of(12L, 
StringData.fromString("ABCD")));
+               assertThat(key, nullValue());
+       }
+
+       @Test
+       public void testTwoFieldsKey() {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT().notNull())
+                       .field("b", DataTypes.STRING())
+                       .field("c", DataTypes.TIMESTAMP().notNull())
+                       .primaryKey("a", "c")
+                       .build();
+
+               Function<RowData, String> keyExtractor = 
KeyExtractor.createKeyExtractor(schema, "_");
+
+               String key = keyExtractor.apply(
+                       GenericRowData.of(
+                               12L,
+                               StringData.fromString("ABCD"),
+                               
TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12"))
+                       ));
+               assertThat(key, equalTo("12_2012-12-12T12:12:12"));
+       }
+
+       @Test
+       public void testAllTypesKey() {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.TINYINT().notNull())
+                       .field("b", DataTypes.SMALLINT().notNull())
+                       .field("c", DataTypes.INT().notNull())
+                       .field("d", DataTypes.BIGINT().notNull())
+                       .field("e", DataTypes.BOOLEAN().notNull())
+                       .field("f", DataTypes.FLOAT().notNull())
+                       .field("g", DataTypes.DOUBLE().notNull())
+                       .field("h", DataTypes.STRING().notNull())
+                       .field("i", DataTypes.TIMESTAMP().notNull())
+                       .field("j", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull())
+                       .field("k", DataTypes.TIME().notNull())
+                       .field("l", DataTypes.DATE().notNull())
+                       .primaryKey("a", "b", "c", "d", "e", "f", "g", "h", 
"i", "j", "k", "l")
+                       .build();
+
+               Function<RowData, String> keyExtractor = 
KeyExtractor.createKeyExtractor(schema, "_");
+
+               String key = keyExtractor.apply(
+                       GenericRowData.of(
+                               (byte) 1,
+                               (short) 2,
+                               3,
+                               (long) 4,
+                               true,
+                               1.0f,
+                               2.0d,
+                               StringData.fromString("ABCD"),
+                               
TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")),
+                               
TimestampData.fromInstant(Instant.parse("2013-01-13T13:13:13Z")),
+                               (int) 
(LocalTime.parse("14:14:14").toNanoOfDay() / 1_000_000),
+                               (int) LocalDate.parse("2015-05-15").toEpochDay()
+                       ));
+               assertThat(
+                       key,
+                       
equalTo("1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15"));
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
new file mode 100644
index 0000000..e4978fc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/TestContext.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for mocking {@link DynamicTableFactory.Context}.
+ */
+class TestContext {
+       private TableSchema schema;
+       private Map<String, String> properties = new HashMap<>();
+
+       public static TestContext context() {
+               return new TestContext();
+       }
+
+       public TestContext withSchema(TableSchema schema) {
+               this.schema = schema;
+               return this;
+       }
+
+       DynamicTableFactory.Context build() {
+               return new DynamicTableFactory.Context() {
+                       @Override
+                       public ObjectIdentifier getObjectIdentifier() {
+                               return null;
+                       }
+
+                       @Override
+                       public CatalogTable getCatalogTable() {
+                               return new CatalogTableImpl(
+                                       schema,
+                                       properties,
+                                       ""
+                               );
+                       }
+
+                       @Override
+                       public ReadableConfig getConfiguration() {
+                               return null;
+                       }
+
+                       @Override
+                       public ClassLoader getClassLoader() {
+                               return TestContext.class.getClassLoader();
+                       }
+               };
+       }
+
+       public TestContext withOption(String key, String value) {
+               properties.put(key, value);
+               return this;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
new file mode 100644
index 0000000..6f185d3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/testutils/ElasticsearchResource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.testutils;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.elasticsearch.client.Client;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resource that starts an embedded elasticsearch cluster.
+ */
+public class ElasticsearchResource extends ExternalResource {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchResource.class);
+       private EmbeddedElasticsearchNodeEnvironment embeddedNodeEnv;
+       private final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       private final String clusterName;
+
+       public ElasticsearchResource(String clusterName) {
+               this.clusterName = clusterName;
+       }
+
+       @Override
+       protected void before() throws Throwable {
+
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Starting embedded Elasticsearch node ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               // dynamically load version-specific implementation of the 
Elasticsearch embedded node environment
+               Class<?> clazz = Class.forName(
+                       
"org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl");
+               embeddedNodeEnv = (EmbeddedElasticsearchNodeEnvironment) 
InstantiationUtil.instantiate(clazz);
+
+               tempFolder.create();
+               embeddedNodeEnv.start(tempFolder.newFolder(), clusterName);
+       }
+
+       @Override
+       protected void after() {
+
+               
LOG.info("-------------------------------------------------------------------------");
+               LOG.info("    Shutting down embedded Elasticsearch node ");
+               
LOG.info("-------------------------------------------------------------------------");
+
+               try {
+                       embeddedNodeEnv.close();
+                       tempFolder.delete();
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       public Client getClient() {
+               return embeddedNodeEnv.getClient();
+       }
+}
diff --git a/flink-connectors/flink-connector-elasticsearch7/pom.xml 
b/flink-connectors/flink-connector-elasticsearch7/pom.xml
index 5f25f46..ef340b9 100644
--- a/flink-connectors/flink-connector-elasticsearch7/pom.xml
+++ b/flink-connectors/flink-connector-elasticsearch7/pom.xml
@@ -139,6 +139,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Table API integration tests -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- Elasticsearch table sink factory testing -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -148,4 +156,18 @@ under the License.
                </dependency>
 
        </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single fork execution 
because of spawning
+                                        Elasticsearch cluster multiple times 
-->
+                                       <forkCount>1</forkCount>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
 </project>
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
new file mode 100644
index 0000000..0aff575
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7Configuration.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import org.apache.http.HttpHost;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+
+/**
+ * Elasticsearch 7 specific configuration.
+ */
+@Internal
+final class Elasticsearch7Configuration extends ElasticsearchConfiguration {
+       Elasticsearch7Configuration(ReadableConfig config, ClassLoader 
classLoader) {
+               super(config, classLoader);
+       }
+
+       public List<HttpHost> getHosts() {
+               return config.get(HOSTS_OPTION).stream()
+                       
.map(Elasticsearch7Configuration::validateAndParseHostsString)
+                       .collect(Collectors.toList());
+       }
+
+       private static HttpHost validateAndParseHostsString(String host) {
+               try {
+                       HttpHost httpHost = HttpHost.create(host);
+                       if (httpHost.getPort() < 0) {
+                               throw new ValidationException(String.format(
+                                       "Could not parse host '%s' in option 
'%s'. It should follow the format 'http://host_name:port'. Missing port.",
+                                       host,
+                                       HOSTS_OPTION.key()));
+                       }
+
+                       if (httpHost.getSchemeName() == null) {
+                               throw new ValidationException(String.format(
+                                       "Could not parse host '%s' in option 
'%s'. It should follow the format 'http://host_name:port'. Missing scheme.",
+                                       host,
+                                       HOSTS_OPTION.key()));
+                       }
+                       return httpHost;
+               } catch (Exception e) {
+                       throw new ValidationException(String.format(
+                               "Could not parse host '%s' in option '%s'. It 
should follow the format 'http://host_name:port'.",
+                               host,
+                               HOSTS_OPTION.key()), e);
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
new file mode 100644
index 0000000..4076b63
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.common.xcontent.XContentType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link 
ElasticsearchSink} from a logical
+ * description.
+ */
+@Internal
+final class Elasticsearch7DynamicSink implements DynamicTableSink {
+       @VisibleForTesting
+       static final Elasticsearch7RequestFactory REQUEST_FACTORY = new 
Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
+
+       private final SinkFormat<SerializationSchema<RowData>> format;
+       private final TableSchema schema;
+       private final Elasticsearch7Configuration config;
+
+       public Elasticsearch7DynamicSink(
+                       SinkFormat<SerializationSchema<RowData>> format,
+                       Elasticsearch7Configuration config,
+                       TableSchema schema) {
+               this(format, config, schema, (ElasticsearchSink.Builder::new));
+       }
+
+       //--------------------------------------------------------------
+       // Hack to make configuration testing possible.
+       //
+       // The code in this block should never be used outside of tests.
+       // Having a way to inject a builder we can assert the builder in
+       // the test. We can not assert everything though, e.g. it is not
+       // possible to assert flushing on checkpoint, as it is configured
+       // on the sink itself.
+       //--------------------------------------------------------------
+
+       private final ElasticSearchBuilderProvider builderProvider;
+
+       @FunctionalInterface
+       interface ElasticSearchBuilderProvider {
+               ElasticsearchSink.Builder<RowData> createBuilder(
+                       List<HttpHost> httpHosts,
+                       RowElasticsearchSinkFunction upsertSinkFunction);
+       }
+
+       Elasticsearch7DynamicSink(
+                       SinkFormat<SerializationSchema<RowData>> format,
+                       Elasticsearch7Configuration config,
+                       TableSchema schema,
+                       ElasticSearchBuilderProvider builderProvider) {
+               this.format = format;
+               this.schema = schema;
+               this.config = config;
+               this.builderProvider = builderProvider;
+       }
+
+       //--------------------------------------------------------------
+       // End of hack to make configuration testing possible
+       //--------------------------------------------------------------
+
+       @Override
+       public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+               ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+               for (RowKind kind : requestedMode.getContainedKinds()) {
+                       if (kind != RowKind.UPDATE_BEFORE) {
+                               builder.addContainedKind(kind);
+                       }
+               }
+               return builder.build();
+       }
+
+       @Override
+       public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
+               return () -> {
+                       SerializationSchema<RowData> format = 
this.format.createSinkFormat(context, schema.toRowDataType());
+
+                       final RowElasticsearchSinkFunction upsertFunction =
+                               new RowElasticsearchSinkFunction(
+                                       
IndexGeneratorFactory.createIndexGenerator(config.getIndex(), schema),
+                                       null, // this is deprecated in es 7+
+                                       format,
+                                       XContentType.JSON,
+                                       REQUEST_FACTORY,
+                                       KeyExtractor.createKeyExtractor(schema, 
config.getKeyDelimiter())
+                               );
+
+                       final ElasticsearchSink.Builder<RowData> builder = 
builderProvider.createBuilder(
+                               config.getHosts(),
+                               upsertFunction);
+
+                       builder.setFailureHandler(config.getFailureHandler());
+                       
config.getBulkFlushMaxActions().ifPresent(builder::setBulkFlushMaxActions);
+                       
config.getBulkFlushMaxSize().ifPresent(builder::setBulkFlushMaxSizeMb);
+                       
config.getBulkFlushInterval().ifPresent(builder::setBulkFlushInterval);
+                       
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
+                       
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
+                       
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
+                       
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
+
+                       config.getPathPrefix()
+                               .ifPresent(pathPrefix -> 
builder.setRestClientFactory(new DefaultRestClientFactory(pathPrefix)));
+
+                       final ElasticsearchSink<RowData> sink = builder.build();
+
+                       if (config.isDisableFlushOnCheckpoint()) {
+                               sink.disableFlushOnCheckpoint();
+                       }
+
+                       return sink;
+               };
+       }
+
+       @Override
+       public DynamicTableSink copy() {
+               return this;
+       }
+
+       @Override
+       public String asSummaryString() {
+               return "Elasticsearch7";
+       }
+
+       /**
+        * Serializable {@link RestClientFactory} used by the sink.
+        */
+       @VisibleForTesting
+       static class DefaultRestClientFactory implements RestClientFactory {
+
+               private final String pathPrefix;
+
+               public DefaultRestClientFactory(@Nullable String pathPrefix) {
+                       this.pathPrefix = pathPrefix;
+               }
+
+               @Override
+               public void configureRestClientBuilder(RestClientBuilder 
restClientBuilder) {
+                       if (pathPrefix != null) {
+                               restClientBuilder.setPathPrefix(pathPrefix);
+                       }
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       DefaultRestClientFactory that = 
(DefaultRestClientFactory) o;
+                       return Objects.equals(pathPrefix, that.pathPrefix);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(pathPrefix);
+               }
+       }
+
+       /**
+        * Version-specific creation of {@link 
org.elasticsearch.action.ActionRequest}s used by the sink.
+        */
+       private static class Elasticsearch7RequestFactory implements 
RequestFactory {
+               @Override
+               public UpdateRequest createUpdateRequest(
+                               String index,
+                               String docType,
+                               String key,
+                               XContentType contentType,
+                               byte[] document) {
+                       return new UpdateRequest(index, key)
+                               .doc(document, contentType)
+                               .upsert(document, contentType);
+               }
+
+               @Override
+               public IndexRequest createIndexRequest(
+                               String index,
+                               String docType,
+                               String key,
+                               XContentType contentType,
+                               byte[] document) {
+                       return new IndexRequest(index)
+                               .id(key)
+                               .source(document, contentType);
+               }
+
+               @Override
+               public DeleteRequest createDeleteRequest(String index, String 
docType, String key) {
+                       return new DeleteRequest(index, key);
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               Elasticsearch7DynamicSink that = (Elasticsearch7DynamicSink) o;
+               return Objects.equals(format, that.format) &&
+                       Objects.equals(schema, that.schema) &&
+                       Objects.equals(config, that.config) &&
+                       Objects.equals(builderProvider, that.builderProvider);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(format, schema, config, builderProvider);
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
new file mode 100644
index 0000000..055989b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_MAX_RETRY_TIMEOUT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.CONNECTION_PATH_PREFIX;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FAILURE_HANDLER_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.FORMAT_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.HOSTS_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.INDEX_OPTION;
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions.KEY_DELIMITER_OPTION;
+
+/**
+ * A {@link DynamicTableSinkFactory} for discovering {@link 
Elasticsearch7DynamicSink}.
+ */
+@Internal
+public class Elasticsearch7DynamicSinkFactory implements 
DynamicTableSinkFactory {
+       private static final Set<ConfigOption<?>> requiredOptions = Stream.of(
+               HOSTS_OPTION,
+               INDEX_OPTION
+       ).collect(Collectors.toSet());
+       private static final Set<ConfigOption<?>> optionalOptions = Stream.of(
+               KEY_DELIMITER_OPTION,
+               FAILURE_HANDLER_OPTION,
+               FLUSH_ON_CHECKPOINT_OPTION,
+               BULK_FLASH_MAX_SIZE_OPTION,
+               BULK_FLUSH_MAX_ACTIONS_OPTION,
+               BULK_FLUSH_INTERVAL_OPTION,
+               BULK_FLUSH_BACKOFF_TYPE_OPTION,
+               BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
+               BULK_FLUSH_BACKOFF_DELAY_OPTION,
+               CONNECTION_MAX_RETRY_TIMEOUT_OPTION,
+               CONNECTION_PATH_PREFIX,
+               FORMAT_OPTION
+       ).collect(Collectors.toSet());
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               TableSchema tableSchema = context.getCatalogTable().getSchema();
+               ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
+
+               final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               final SinkFormat<SerializationSchema<RowData>> format = 
helper.discoverSinkFormat(
+                       SerializationFormatFactory.class,
+                       FORMAT_OPTION);
+
+               helper.validate();
+               Configuration configuration = new Configuration();
+               context.getCatalogTable()
+                       .getOptions()
+                       .forEach(configuration::setString);
+               Elasticsearch7Configuration config = new 
Elasticsearch7Configuration(configuration, context.getClassLoader());
+
+               validateOptions(config, configuration);
+
+               return new Elasticsearch7DynamicSink(
+                       format,
+                       config,
+                       TableSchemaUtils.getPhysicalSchema(tableSchema));
+       }
+
+       private void validateOptions(Elasticsearch7Configuration config, 
Configuration originalConfiguration) {
+               config.getFailureHandler(); // checks if we can instantiate the 
custom failure handler
+               config.getHosts(); // validate hosts
+               validateOptions(
+                       config.getIndex().length() >= 1,
+                       () -> String.format("'%s' must not be empty", 
INDEX_OPTION.key()));
+               validateOptions(
+                       config.getBulkFlushMaxActions().map(maxActions -> 
maxActions >= 1).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1 character. Got: %s",
+                               BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
+                               config.getBulkFlushMaxActions().get())
+               );
+               validateOptions(
+                       config.getBulkFlushMaxSize().map(maxSize -> maxSize >= 
1024 * 1024).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1mb character. Got: %s",
+                               BULK_FLASH_MAX_SIZE_OPTION.key(),
+                               
originalConfiguration.get(BULK_FLASH_MAX_SIZE_OPTION).toHumanReadableString())
+               );
+               validateOptions(
+                       config.getBulkFlushBackoffRetries().map(retries -> 
retries >= 1).orElse(true),
+                       () -> String.format(
+                               "'%s' must be at least 1. Got: %s",
+                               BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
+                               config.getBulkFlushBackoffRetries().get())
+               );
+       }
+
+       private static void validateOptions(boolean condition, Supplier<String> 
message) {
+               if (!condition) {
+                       throw new ValidationException(message.get());
+               }
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return "elasticsearch-7";
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return requiredOptions;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               return optionalOptions;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..10e4846
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
new file mode 100644
index 0000000..a830fa3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactoryTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+
+/**
+ * Tests for validation in {@link Elasticsearch7DynamicSinkFactory}.
+ */
+public class Elasticsearch7DynamicSinkFactoryTest {
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void validateEmptyConfiguration() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "One or more required options are missing.\n" +
+                               "\n" +
+                               "Missing required options are:\n" +
+                               "\n" +
+                               "hosts\n" +
+                               "index");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongIndex() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'index' must not be empty");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .withOption("index", "")
+                               .withOption("hosts", "http://localhost:12345";)
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongHosts() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Could not parse host 'wrong-host' in option 'hosts'. 
It should follow the format 'http://host_name:port'.");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               .withOption("index", "MyIndex")
+                               .withOption("hosts", "wrong-host")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongFlushSize() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.max-size' must be at least 1mb 
character. Got: 1024 bytes");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), "1kb")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongRetries() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.back-off.max-retries' must be at 
least 1. Got: 0");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(), 
"0")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongMaxActions() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "'sink.bulk-flush.max-actions' must be at least 1 
character. Got: 0");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(), "0")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validateWrongBackoffDelay() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "Invalid value for option 
'sink.bulk-flush.back-off.delay'.");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", DataTypes.TIME())
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "-1s")
+                               .build()
+               );
+       }
+
+       @Test
+       public void validatePrimaryKeyOnIllegalColumn() {
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               thrown.expect(ValidationException.class);
+               thrown.expectMessage(
+                       "The table has a primary key on columns of illegal 
types: " +
+                               "[ARRAY, MAP, MULTISET, ROW, RAW, 
VARBINARY].\n" +
+                               " Elasticsearch sink does not support primary 
keys on columns of types: " +
+                               "[ARRAY, MAP, MULTISET, STRUCTURED_TYPE, ROW, 
RAW, BINARY, VARBINARY].");
+               sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(TableSchema.builder()
+                                       .field("a", 
DataTypes.BIGINT().notNull())
+                                       .field("b", 
DataTypes.ARRAY(DataTypes.BIGINT().notNull()).notNull())
+                                       .field("c", 
DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()).notNull())
+                                       .field("d", 
DataTypes.MULTISET(DataTypes.BIGINT().notNull()).notNull())
+                                       .field("e", 
DataTypes.ROW(DataTypes.FIELD("a", DataTypes.BIGINT())).notNull())
+                                       .field("f", 
DataTypes.RAW(Types.BIG_INT).notNull())
+                                       .field("g", DataTypes.BYTES().notNull())
+                                       .primaryKey("a", "b", "c", "d", "e", 
"f", "g")
+                                       .build())
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), "MyIndex")
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://localhost:1234";)
+                               
.withOption(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(), "1s")
+                               .build()
+               );
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
new file mode 100644
index 0000000..3b667dc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkITCase.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.ElasticsearchResource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.search.SearchHits;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.streaming.connectors.elasticsearch.table.TestContext.context;
+import static org.apache.flink.table.api.Expressions.row;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * IT tests for {@link Elasticsearch7DynamicSink}.
+ */
+public class Elasticsearch7DynamicSinkITCase {
+
+       @ClassRule
+       public static ElasticsearchResource elasticsearchResource = new 
ElasticsearchResource("es-dynamic-sink-it-test");
+
+       @Test
+       public void testWritingDocuments() throws Exception {
+               TableSchema schema = TableSchema.builder()
+                       .field("a", DataTypes.BIGINT().notNull())
+                       .field("b", DataTypes.TIME())
+                       .field("c", DataTypes.STRING().notNull())
+                       .field("d", DataTypes.FLOAT())
+                       .field("e", DataTypes.TINYINT().notNull())
+                       .field("f", DataTypes.DATE())
+                       .field("g", DataTypes.TIMESTAMP().notNull())
+                       .primaryKey("a", "g")
+                       .build();
+               GenericRowData rowData = GenericRowData.of(
+                       1L,
+                       12345,
+                       StringData.fromString("ABCDE"),
+                       12.12f,
+                       (byte) 2,
+                       12345,
+                       
TimestampData.fromLocalDateTime(LocalDateTime.parse("2012-12-12T12:12:12")));
+
+               String index = "writing-documents";
+               Elasticsearch7DynamicSinkFactory sinkFactory = new 
Elasticsearch7DynamicSinkFactory();
+
+               SinkFunctionProvider sinkRuntimeProvider = 
(SinkFunctionProvider) sinkFactory.createDynamicTableSink(
+                       context()
+                               .withSchema(schema)
+                               
.withOption(ElasticsearchOptions.INDEX_OPTION.key(), index)
+                               
.withOption(ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";)
+                               
.withOption(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false")
+                               .build()
+               ).getSinkRuntimeProvider(new MockContext());
+
+               SinkFunction<RowData> sinkFunction = 
sinkRuntimeProvider.createSinkFunction();
+               StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               rowData.setRowKind(RowKind.UPDATE_AFTER);
+               
environment.<RowData>fromElements(rowData).addSink(sinkFunction);
+               environment.execute();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new GetRequest(index, 
"1_2012-12-12T12:12:12")).actionGet().getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(response, equalTo(expectedMap));
+       }
+
+       @Test
+       public void testWritingDocumentsFromTableApi() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "table-api";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIME,\n" +
+                       "c STRING NOT NULL,\n" +
+                       "d FLOAT,\n" +
+                       "e TINYINT NOT NULL,\n" +
+                       "f DATE,\n" +
+                       "g TIMESTAMP NOT NULL," +
+                       "h as a + 2,\n" +
+                       "PRIMARY KEY (a, g) NOT ENFORCED\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-7") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(
+                       row(
+                               1L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "ABCDE",
+                               12.12f,
+                               (byte) 2,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2012-12-12T12:12:12"))
+               ).executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+               Map<String, Object> response = client.get(new GetRequest(index, 
"1_2012-12-12T12:12:12")).actionGet().getSource();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(response, equalTo(expectedMap));
+       }
+
+       @Test
+       public void testWritingDocumentsNoPrimaryKey() throws Exception {
+               TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build());
+
+               String index = "no-primary-key";
+               tableEnvironment.executeSql("CREATE TABLE esTable (" +
+                       "a BIGINT NOT NULL,\n" +
+                       "b TIME,\n" +
+                       "c STRING NOT NULL,\n" +
+                       "d FLOAT,\n" +
+                       "e TINYINT NOT NULL,\n" +
+                       "f DATE,\n" +
+                       "g TIMESTAMP NOT NULL\n" +
+                       ")\n" +
+                       "WITH (\n" +
+                       String.format("'%s'='%s',\n", "connector", 
"elasticsearch-7") +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.INDEX_OPTION.key(), index) +
+                       String.format("'%s'='%s',\n", 
ElasticsearchOptions.HOSTS_OPTION.key(), "http://127.0.0.1:9200";) +
+                       String.format("'%s'='%s'\n", 
ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), "false") +
+                       ")");
+
+               tableEnvironment.fromValues(
+                       row(
+                               1L,
+                               LocalTime.ofNanoOfDay(12345L * 1_000_000L),
+                               "ABCDE",
+                               12.12f,
+                               (byte) 2,
+                               LocalDate.ofEpochDay(12345),
+                               LocalDateTime.parse("2012-12-12T12:12:12"))
+               ).executeInsert("esTable")
+                       .getJobClient()
+                       .get()
+                       .getJobExecutionResult(this.getClass().getClassLoader())
+                       .get();
+
+               Client client = elasticsearchResource.getClient();
+
+               // search API does not return documents that were not indexed, 
we might need to query
+               // the index a few times
+               Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+               SearchHits hits;
+               do {
+                       hits = client.prepareSearch(index)
+                               .execute()
+                               .actionGet()
+                               .getHits();
+                       if (hits.getTotalHits().value == 0) {
+                               Thread.sleep(100);
+                       }
+               } while (hits.getTotalHits().value == 0 && 
deadline.hasTimeLeft());
+
+               Map<String, Object> result = hits.getAt(0).getSourceAsMap();
+               Map<Object, Object> expectedMap = new HashMap<>();
+               expectedMap.put("a", 1);
+               expectedMap.put("b", "00:00:12Z");
+               expectedMap.put("c", "ABCDE");
+               expectedMap.put("d", 12.12d);
+               expectedMap.put("e", 2);
+               expectedMap.put("f", "2003-10-20");
+               expectedMap.put("g", "2012-12-12T12:12:12Z");
+               assertThat(result, equalTo(expectedMap));
+       }
+
+       private static class MockContext implements DynamicTableSink.Context {
+               @Override
+               public boolean isBounded() {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<?> createTypeInformation(DataType 
consumedDataType) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(DataType consumedDataType) {
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
new file mode 100644
index 0000000..466ede3
--- /dev/null
+++ 
b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link Elasticsearch7DynamicSink} parameters.
+ */
+public class Elasticsearch7DynamicSinkTest {
+
+       private static final String FIELD_KEY = "key";
+       private static final String FIELD_FRUIT_NAME = "fruit_name";
+       private static final String FIELD_COUNT = "count";
+       private static final String FIELD_TS = "ts";
+
+       private static final String HOSTNAME = "host1";
+       private static final int PORT = 1234;
+       private static final String SCHEMA = "https";
+       private static final String INDEX = "MyIndex";
+       private static final String DOC_TYPE = "MyType";
+
+       @Test
+       public void testBuilder() {
+               final TableSchema schema = createTestSchema();
+
+               BuilderProvider provider = new BuilderProvider();
+               final Elasticsearch7DynamicSink testSink = new 
Elasticsearch7DynamicSink(
+                       new DummySinkFormat(),
+                       new Elasticsearch7Configuration(getConfig(), 
this.getClass().getClassLoader()),
+                       schema,
+                       provider
+               );
+
+               testSink.getSinkRuntimeProvider(new 
MockSinkContext()).createSinkFunction();
+
+               verify(provider.builderSpy).setFailureHandler(new 
DummyFailureHandler());
+               verify(provider.builderSpy).setBulkFlushBackoff(true);
+               
verify(provider.builderSpy).setBulkFlushBackoffType(ElasticsearchSinkBase.FlushBackoffType.EXPONENTIAL);
+               verify(provider.builderSpy).setBulkFlushBackoffDelay(123);
+               verify(provider.builderSpy).setBulkFlushBackoffRetries(3);
+               verify(provider.builderSpy).setBulkFlushInterval(100);
+               verify(provider.builderSpy).setBulkFlushMaxActions(1000);
+               verify(provider.builderSpy).setBulkFlushMaxSizeMb(1);
+               verify(provider.builderSpy).setRestClientFactory(new 
Elasticsearch7DynamicSink.DefaultRestClientFactory("/myapp"));
+               verify(provider.sinkSpy).disableFlushOnCheckpoint();
+       }
+
+       private Configuration getConfig() {
+               Configuration configuration = new Configuration();
+               
configuration.setString(ElasticsearchOptions.INDEX_OPTION.key(), INDEX);
+               
configuration.setString(ElasticsearchOptions.DOCUMENT_TYPE_OPTION.key(), 
DOC_TYPE);
+               
configuration.setString(ElasticsearchOptions.HOSTS_OPTION.key(), SCHEMA + "://" 
+ HOSTNAME + ":" + PORT);
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_TYPE_OPTION.key(),
 "exponential");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_DELAY_OPTION.key(),
 "123");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
 "3");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_INTERVAL_OPTION.key(), 
"100");
+               
configuration.setString(ElasticsearchOptions.BULK_FLUSH_MAX_ACTIONS_OPTION.key(),
 "1000");
+               
configuration.setString(ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION.key(), 
"1mb");
+               
configuration.setString(ElasticsearchOptions.CONNECTION_PATH_PREFIX.key(), 
"/myapp");
+               
configuration.setString(ElasticsearchOptions.FAILURE_HANDLER_OPTION.key(), 
DummyFailureHandler.class.getName());
+               
configuration.setString(ElasticsearchOptions.FLUSH_ON_CHECKPOINT_OPTION.key(), 
"false");
+               return configuration;
+       }
+
+       private static class BuilderProvider implements 
Elasticsearch7DynamicSink.ElasticSearchBuilderProvider {
+               public ElasticsearchSink.Builder<RowData> builderSpy;
+               public ElasticsearchSink<RowData> sinkSpy;
+
+               @Override
+               public ElasticsearchSink.Builder<RowData> createBuilder(
+                               List<HttpHost> httpHosts,
+                               RowElasticsearchSinkFunction 
upsertSinkFunction) {
+                       builderSpy = Mockito.spy(new 
ElasticsearchSink.Builder<>(httpHosts, upsertSinkFunction));
+                       doAnswer(
+                               invocation -> {
+                                       sinkSpy = 
Mockito.spy((ElasticsearchSink<RowData>) invocation.callRealMethod());
+                                       return sinkSpy;
+                               }
+                       ).when(builderSpy).build();
+
+                       return builderSpy;
+               }
+       }
+
+       private TableSchema createTestSchema() {
+               return TableSchema.builder()
+                       .field(FIELD_KEY, DataTypes.BIGINT())
+                       .field(FIELD_FRUIT_NAME, DataTypes.STRING())
+                       .field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+                       .field(FIELD_TS, DataTypes.TIMESTAMP(3))
+                       .build();
+       }
+
+       private static class DummySerializationSchema implements 
SerializationSchema<RowData> {
+
+               private static final DummySerializationSchema INSTANCE = new 
DummySerializationSchema();
+
+               @Override
+               public byte[] serialize(RowData element) {
+                       return new byte[0];
+               }
+       }
+
+       private static class DummySinkFormat implements 
SinkFormat<SerializationSchema<RowData>> {
+               @Override
+               public SerializationSchema<RowData> createSinkFormat(
+                               DynamicTableSink.Context context,
+                               DataType consumedDataType) {
+                       return DummySerializationSchema.INSTANCE;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode() {
+                       return null;
+               }
+       }
+
+       private static class MockSinkContext implements 
DynamicTableSink.Context {
+               @Override
+               public boolean isBounded() {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<?> createTypeInformation(DataType 
consumedDataType) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSink.DataStructureConverter 
createDataStructureConverter(DataType consumedDataType) {
+                       return null;
+               }
+       }
+
+       /**
+        * Custom failure handler for testing.
+        */
+       public static class DummyFailureHandler implements 
ActionRequestFailureHandler {
+
+               @Override
+               public void onFailure(ActionRequest action, Throwable failure, 
int restStatusCode, RequestIndexer indexer) {
+                       // do nothing
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       return o instanceof DummyFailureHandler;
+               }
+
+               @Override
+               public int hashCode() {
+                       return DummyFailureHandler.class.hashCode();
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index 377cfb8..4863cb2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -55,6 +55,9 @@ public class TableSchemaUtils {
                                        builder.field(tableColumn.getName(), 
tableColumn.getType());
                                }
                        });
+               tableSchema.getPrimaryKey().ifPresent(
+                       uniqueConstraint -> 
builder.primaryKey(uniqueConstraint.getColumns().toArray(new String[0]))
+               );
                return builder.build();
        }
 

Reply via email to