This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 6407aab  [FLINK-25756][docs] Add documentation
6407aab is described below

commit 6407aabfe82a48002f301aee2bd3fc38fe52e035
Author: Andriy Redko <andriy.re...@aiven.io>
AuthorDate: Mon Dec 12 08:06:17 2022 -0500

    [FLINK-25756][docs] Add documentation
---
 .../docs/connectors/datastream/opensearch.md       | 261 +++++++++++++++++
 .../content.zh/docs/connectors/table/opensearch.md | 308 +++++++++++++++++++++
 .../docs/connectors/datastream/opensearch.md       | 261 +++++++++++++++++
 docs/content/docs/connectors/table/opensearch.md   | 308 +++++++++++++++++++++
 .../table/OpensearchConnectorOptions.java          |   2 +-
 5 files changed, 1139 insertions(+), 1 deletion(-)

diff --git a/docs/content.zh/docs/connectors/datastream/opensearch.md 
b/docs/content.zh/docs/connectors/datastream/opensearch.md
new file mode 100644
index 0000000..cca9e4f
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/opensearch.md
@@ -0,0 +1,261 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for 
information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "a1732edd-4218-470e-adad-b1ebb4021a12" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, 
RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The 
`BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, 
before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to 
be enabled at the execution environment:
+
+{{< tabs "aa0d1e93-4844-40d7-b0ec-9ec37e731a5f" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default 
delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the 
`BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To 
configure the processor to flush more frequently, please refer to the <a 
href="#configuring-the-internal-bulk-processor">BulkProcessor configuration 
section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is 
possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` 
delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a 
backoff-policy.
+
+Below is an example:
+
+{{< tabs "adb958b3-5dd5-476e-b946-ace3335628ea" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum 
of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 
5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+The above example will let the sink re-add requests that failed due to 
resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed 
documents, the sink will fail. 
+If no `BulkFlushBackoffStrategy` (or `FlushBackoffType.NONE`) is configured, 
the sink will fail for any kind of error.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
+will need to wait until Opensearch node queues have enough capacity for
+all the pending requests, or until the maximum number of retries has been 
reached.
+</p>
+
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by using the following methods of 
the OpensearchSinkBuilder:
+
+* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to 
buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in 
megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush 
regardless of the amount or size of buffered actions.
+ 
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int 
maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` 
or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay 
for backoff. For constant backoff, this
+   is simply the delay between each retry. For exponential backoff, this is 
the initial base delay.
+
+More information about Opensearch can be found [here](https://opensearch.org/).
+
+## Packaging the Opensearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{< ref "docs/dev/configuration" >}}) for further information).
+
+Alternatively, you can put the connector's jar file into Flink's `lib/` folder 
to make it available
+system-wide, i.e. for all job being run.
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/opensearch.md 
b/docs/content.zh/docs/connectors/table/opensearch.md
new file mode 100644
index 0000000..475c180
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/opensearch.md
@@ -0,0 +1,308 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch 
engine. This document describes how to setup the Opensearch Connector to run 
SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages 
with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in 
append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. 
<code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. 
<code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section 
for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable 
certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would 
result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that 
Opensearch comes with pre-bundled security feature, you can disable it by 
following the <a 
href="https://opensearch.org/docs/latest/security-plugin/configuration/index/";>guidelines</a>
 on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If 
<code>username</code> is configured, this option must be configured with 
non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.delivery-guarantee</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
+      <td>String</td>
+      <td>Optional delivery guarantee when committing. Valid values are:
+      <ul>
+        <li><code>EXACTLY_ONCE</code>: records are only delivered exactly-once 
also under failover scenarios.</li>
+        <li><code>AT_LEAST_ONCE</code>: records are ensured to be delivered 
but it may happen that the same record is delivered multiple times.</li>
+        <li><code>NONE</code>:  records are delivered on a best effort 
basis.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td></td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for 
all pending action requests
+       to be acknowledged by Opensearch on checkpoints. Thus, a sink does NOT 
provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be 
in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both 
<code>'sink.bulk-flush.max-size'</code> and 
<code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing 
for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a 
temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the 
first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and 
increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> 
backoff, this is simply the delay between each retry. For 
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., 
<code>'/v1'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for requesting a connection from the connection 
manager.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for establishing a connection.</td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put 
differently, a maximum period inactivity between two consecutive data 
packets.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Opensearch connector supports to specify a format. The format must 
produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a 
href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> 
page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+The Opensearch sink can work in either upsert mode or append mode, depending 
on whether a primary key is defined.
+If a primary key is defined, the Opensearch sink works in upsert mode which 
can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Opensearch sink works in append mode 
which can only consume queries containing INSERT only messages.
+
+In the Opensearch connector, the primary key is used to calculate the 
Opensearch document id, which is a string of up to 512 bytes. It cannot have 
whitespaces.
+The Opensearch connector generates a document ID string for every row by 
concatenating all primary key fields in the order defined in the DDL using a 
key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as a primary key field as they do not have a 
good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Opensearch will generate a document id 
automatically.
+
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) 
for more details about the PRIMARY KEY syntax.
+
+### Dynamic Index
+
+The Opensearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain 
string, e.g. `'myusers'`, all the records will be consistently written into 
"myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a 
field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value 
of `TIMESTAMP/DATE/TIME` type into the format specified by the 
`date_format_string`.
+The `date_format_string` is compatible with Java's 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a 
record with `log_ts` field value `2020-03-27 12:25:55` will be written into 
"myusers-2020-03-27" index.
+
+You can also use `'{now()|date_format_string}'` to convert the current system 
time to the format specified by `date_format_string`. The corresponding time 
type of `now()` is `TIMESTAMP_WITH_LTZ`.
+When formatting the system time as a string, the time zone configured in the 
session through `table.local-time-zone` will be used. You can use `NOW()`, 
`now()`, `CURRENT_TIMESTAMP`, `current_timestamp`.
+
+**NOTE:**  When using the dynamic index generated by the current system time, 
for changelog stream, there is no guarantee that the records with the same 
primary key can generate the same index name.
+Therefore, the dynamic index based on the system time can only support append 
only stream.
+
+Data Type Mapping
+----------------
+
+Opensearch stores document in a JSON string. So the data type mapping is 
between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Opensearch connector. Please refer to 
[JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) page for more 
type mapping details.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/opensearch.md 
b/docs/content/docs/connectors/datastream/opensearch.md
new file mode 100644
index 0000000..cca9e4f
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/opensearch.md
@@ -0,0 +1,261 @@
+---
+title: Opensearch
+weight: 5
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch Connector
+
+This connector provides sinks that can request document actions to an
+[Opensearch](https://opensearch.org/) Index. To use this connector, add 
+the following dependency to your project:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">Opensearch version</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>1.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>2.x</td>
+        <td>{{< connector_artifact flink-connector-opensearch 1.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See [here]({{< ref "docs/dev/configuration/overview" >}}) for 
information
+about how to package the program with the libraries for cluster execution.
+
+## Installing Opensearch
+
+Instructions for setting up an Opensearch cluster can be found
+[here](https://opensearch.org/docs/latest/opensearch/install/index/).
+
+## Opensearch Sink
+
+The example below shows how to configure and create a sink:
+
+{{< tabs "a1732edd-4218-470e-adad-b1ebb4021a12" >}}
+{{< tab "Java" >}}
+
+```java
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.connector.opensearch.sink.OpensearchSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.http.HttpHost;
+import org.opensearch.action.index.IndexRequest;
+import org.opensearch.client.Requests;
+
+import java.util.HashMap;
+import java.util.Map;
+
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        .build());
+
+private static IndexRequest createIndexRequest(String element) {
+    Map<String, Object> json = new HashMap<>();
+    json.put("data", element);
+
+    return Requests.indexRequest()
+        .index("my-index")
+        .id(element)
+        .source(json);
+}
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+import org.apache.flink.api.connector.sink.SinkWriter
+import org.apache.flink.connector.opensearch.sink.{OpensearchSinkBuilder, 
RequestIndexer}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.http.HttpHost
+import org.opensearch.action.index.IndexRequest
+import org.opensearch.client.Requests
+
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setBulkFlushMaxActions(1) // Instructs the sink to emit after every 
element, otherwise they would be buffered
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    .build())
+
+def createIndexRequest(element: (String)): IndexRequest = {
+
+  val json = Map(
+    "data" -> element.asInstanceOf[AnyRef]
+  )
+
+  Requests.indexRequest.index("my-index").source(mapAsJavaMap(json))
+}
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Note that the example only demonstrates performing a single index
+request for each incoming element. Generally, the `OpensearchEmitter`
+can be used to perform requests of different types (ex.,
+`DeleteRequest`, `UpdateRequest`, etc.). 
+
+Internally, each parallel instance of the Flink Opensearch Sink uses
+a `BulkProcessor` to send action requests to the cluster.
+This will buffer elements before sending them in bulk to the cluster. The 
`BulkProcessor`
+executes bulk requests one at a time, i.e. there will be no two concurrent
+flushes of the buffered actions in progress.
+
+### Opensearch Sinks and Fault Tolerance
+
+With Flink’s checkpointing enabled, the Flink Opensearch Sink guarantees
+at-least-once delivery of action requests to Opensearch clusters. It does
+so by waiting for all pending action requests in the `BulkProcessor` at the
+time of checkpoints. This effectively assures that all requests before the
+checkpoint was triggered have been successfully acknowledged by Opensearch, 
before
+proceeding to process more records sent to the sink.
+
+More details on checkpoints and fault tolerance are in the [fault tolerance 
docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).
+
+To use fault tolerant Opensearch Sinks, checkpointing of the topology needs to 
be enabled at the execution environment:
+
+{{< tabs "aa0d1e93-4844-40d7-b0ec-9ec37e731a5f" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default 
delivery guarantee is `AT_LEAST_ONCE`.
+This causes the sink to buffer requests until it either finishes or the 
`BulkProcessor` flushes automatically. 
+By default, the `BulkProcessor` will flush after `1000` added actions. To 
configure the processor to flush more frequently, please refer to the <a 
href="#configuring-the-internal-bulk-processor">BulkProcessor configuration 
section</a>.
+</p>
+
+<p style="border-radius: 5px; padding: 5px" class="bg-info">
+Using `UpdateRequests` with deterministic IDs and the upsert method it is 
possible to achieve exactly-once semantics in Opensearch when `AT_LEAST_ONCE` 
delivery is configured for the connector.
+</p>
+
+### Handling Failing Opensearch Requests
+
+Opensearch action requests may fail due to a variety of reasons, including
+temporarily saturated node queue capacity or malformed documents to be indexed.
+The Flink Opensearch Sink allows the user to retry requests by specifying a 
backoff-policy.
+
+Below is an example:
+
+{{< tabs "adb958b3-5dd5-476e-b946-ace3335628ea" >}}
+{{< tab "Java" >}}
+```java
+DataStream<String> input = ...;
+
+input.sinkTo(
+    new OpensearchSinkBuilder<String>()
+        .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+        .setEmitter(
+        (element, context, indexer) ->
+        indexer.add(createIndexRequest(element)))
+        // This enables an exponential backoff retry mechanism, with a maximum 
of 5 retries and an initial delay of 1000 milliseconds
+        .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+        .build());
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val input: DataStream[String] = ...
+
+input.sinkTo(
+  new OpensearchSinkBuilder[String]
+    .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
+    .setEmitter((element: String, context: SinkWriter.Context, indexer: 
RequestIndexer) => 
+    indexer.add(createIndexRequest(element)))
+    // This enables an exponential backoff retry mechanism, with a maximum of 
5 retries and an initial delay of 1000 milliseconds
+    .setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
+    .build())
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+The above example will let the sink re-add requests that failed due to 
resource constrains (e.g.
+queue capacity saturation). For all other failures, such as malformed 
documents, the sink will fail. 
+If no `BulkFlushBackoffStrategy` (or `FlushBackoffType.NONE`) is configured, 
the sink will fail for any kind of error.
+
+<p style="border-radius: 5px; padding: 5px" class="bg-danger">
+<b>IMPORTANT</b>: Re-adding requests back to the internal <b>BulkProcessor</b>
+on failures will lead to longer checkpoints, as the sink will also
+need to wait for the re-added requests to be flushed when checkpointing.
+For example, when using <b>FlushBackoffType.EXPONENTIAL</b>, checkpoints
+will need to wait until Opensearch node queues have enough capacity for
+all the pending requests, or until the maximum number of retries has been 
reached.
+</p>
+
+### Configuring the Internal Bulk Processor
+
+The internal `BulkProcessor` can be further configured for its behaviour
+on how buffered action requests are flushed, by using the following methods of 
the OpensearchSinkBuilder:
+
+* **setBulkFlushMaxActions(int numMaxActions)**: Maximum amount of actions to 
buffer before flushing.
+* **setBulkFlushMaxSizeMb(int maxSizeMb)**: Maximum size of data (in 
megabytes) to buffer before flushing.
+* **setBulkFlushInterval(long intervalMillis)**: Interval at which to flush 
regardless of the amount or size of buffered actions.
+ 
+Configuring how temporary request errors are retried is also supported:
+ * **setBulkFlushBackoffStrategy(FlushBackoffType flushBackoffType, int 
maxRetries, long delayMillis)**: The type of backoff delay, either `CONSTANT` 
or `EXPONENTIAL`, the amount of backoff retries to attempt, the amount of delay 
for backoff. For constant backoff, this
+   is simply the delay between each retry. For exponential backoff, this is 
the initial base delay.
+
+More information about Opensearch can be found [here](https://opensearch.org/).
+
+## Packaging the Opensearch Connector into an Uber-Jar
+
+For the execution of your Flink program, it is recommended to build a
+so-called uber-jar (executable jar) containing all your dependencies
+(see [here]({{< ref "docs/dev/configuration" >}}) for further information).
+
+Alternatively, you can put the connector's jar file into Flink's `lib/` folder 
to make it available
+system-wide, i.e. for all job being run.
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/table/opensearch.md 
b/docs/content/docs/connectors/table/opensearch.md
new file mode 100644
index 0000000..475c180
--- /dev/null
+++ b/docs/content/docs/connectors/table/opensearch.md
@@ -0,0 +1,308 @@
+---
+title: Opensearch
+weight: 7
+type: docs
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Opensearch SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The Opensearch connector allows for writing into an index of the Opensearch 
engine. This document describes how to setup the Opensearch Connector to run 
SQL queries against Opensearch.
+
+The connector can operate in upsert mode for exchanging UPDATE/DELETE messages 
with the external system using the primary key defined on the DDL.
+
+If no primary key is defined on the DDL, the connector can only operate in 
append mode for exchanging INSERT only messages with external system.
+
+Dependencies
+------------
+
+{{< sql_download_table "opensearch" >}}
+
+The Opensearch connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+How to create an Opensearch table
+----------------
+
+The example below shows how to create an Opensearch sink table:
+
+```sql
+CREATE TABLE myUserTable (
+  user_id STRING,
+  user_name STRING,
+  uv BIGINT,
+  pv BIGINT,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'opensearch',
+  'hosts' = 'http://localhost:9200',
+  'index' = 'users'
+);
+```
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, the valid value is: `opensearch`
+</td>
+    </tr>
+    <tr>
+      <td><h5>hosts</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>One or more Opensearch hosts to connect to, e.g. 
<code>'http://host_name:9092;http://host_name:9093'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>index</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Opensearch index for every record. Can be a static index (e.g. 
<code>'myIndex'</code>) or
+       a dynamic index (e.g. <code>'index-{log_ts|yyyy-MM-dd}'</code>).
+       See the following <a href="#dynamic-index">Dynamic Index</a> section 
for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>allow-insecure</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>Allow insecure connections to `HTTPS` endpoints (disable 
certificates validation).</td>
+    </tr>
+    <tr>
+      <td><h5>document-id.key-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">_</td>
+      <td>String</td>
+      <td>Delimiter for composite keys ("_" by default), e.g., "$" would 
result in IDs "KEY1$KEY2$KEY3".</td>
+    </tr>
+    <tr>
+      <td><h5>username</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Username used to connect to Opensearch instance. Please notice that 
Opensearch comes with pre-bundled security feature, you can disable it by 
following the <a 
href="https://opensearch.org/docs/latest/security-plugin/configuration/index/";>guidelines</a>
 on how to configure the security for your Opensearch cluster.</td>
+    </tr>
+    <tr>
+      <td><h5>password</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Password used to connect to Opensearch instance. If 
<code>username</code> is configured, this option must be configured with 
non-empty string as well.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.delivery-guarantee</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
+      <td>String</td>
+      <td>Optional delivery guarantee when committing. Valid values are:
+      <ul>
+        <li><code>EXACTLY_ONCE</code>: records are only delivered exactly-once 
also under failover scenarios.</li>
+        <li><code>AT_LEAST_ONCE</code>: records are ensured to be delivered 
but it may happen that the same record is delivered multiple times.</li>
+        <li><code>NONE</code>:  records are delivered on a best effort 
basis.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-on-checkpoint</h5></td>
+      <td>optional</td>
+      <td></td>
+      <td style="word-wrap: break-word;">true</td>
+      <td>Boolean</td>
+      <td>Flush on checkpoint or not. When disabled, a sink will not wait for 
all pending action requests
+       to be acknowledged by Opensearch on checkpoints. Thus, a sink does NOT 
provide any strong
+       guarantees for at-least-once delivery of action requests.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-actions</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Integer</td>
+      <td>Maximum number of buffered actions per bulk request.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Maximum size in memory of buffered actions per bulk request. Must be 
in MB granularity.
+      Can be set to <code>'0'</code> to disable it.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.interval</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">1s</td>
+      <td>Duration</td>
+      <td>The interval to flush buffered actions.
+        Can be set to <code>'0'</code> to disable it. Note, both 
<code>'sink.bulk-flush.max-size'</code> and 
<code>'sink.bulk-flush.max-actions'</code>
+        can be set to <code>'0'</code> with the flush interval set allowing 
for complete async processing of buffered actions.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.strategy</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">DISABLED</td>
+      <td>String</td>
+      <td>Specify how to perform retries if any flush actions failed due to a 
temporary request error. Valid strategies are:
+      <ul>
+        <li><code>DISABLED</code>: no retry performed, i.e. fail after the 
first request error.</li>
+        <li><code>CONSTANT</code>: wait for backoff delay between retries.</li>
+        <li><code>EXPONENTIAL</code>: initially wait for backoff delay and 
increase exponentially between retries.</li>
+      </ul>
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.max-retries</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Maximum number of backoff retries.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.bulk-flush.backoff.delay</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Delay between each backoff attempt. For <code>CONSTANT</code> 
backoff, this is simply the delay between each retry. For 
<code>EXPONENTIAL</code> backoff, this is the initial base delay.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.path-prefix</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Prefix string to be added to every REST communication, e.g., 
<code>'/v1'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for requesting a connection from the connection 
manager.</td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout for establishing a connection.</td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put 
differently, a maximum period inactivity between two consecutive data 
packets.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">json</td>
+      <td>String</td>
+      <td>Opensearch connector supports to specify a format. The format must 
produce a valid json document.
+       By default uses built-in <code>'json'</code> format. Please refer to <a 
href="{{< ref "docs/connectors/table/formats/overview" >}}">JSON Format</a> 
page for more details.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Key Handling
+
+The Opensearch sink can work in either upsert mode or append mode, depending 
on whether a primary key is defined.
+If a primary key is defined, the Opensearch sink works in upsert mode which 
can consume queries containing UPDATE/DELETE messages.
+If a primary key is not defined, the Opensearch sink works in append mode 
which can only consume queries containing INSERT only messages.
+
+In the Opensearch connector, the primary key is used to calculate the 
Opensearch document id, which is a string of up to 512 bytes. It cannot have 
whitespaces.
+The Opensearch connector generates a document ID string for every row by 
concatenating all primary key fields in the order defined in the DDL using a 
key delimiter specified by `document-id.key-delimiter`.
+Certain types are not allowed as a primary key field as they do not have a 
good string representation, e.g. `BYTES`, `ROW`, `ARRAY`, `MAP`, etc.
+If no primary key is specified, Opensearch will generate a document id 
automatically.
+
+See [CREATE TABLE DDL]({{< ref "docs/dev/table/sql/create" >}}#create-table) 
for more details about the PRIMARY KEY syntax.
+
+### Dynamic Index
+
+The Opensearch sink supports both static index and dynamic index.
+
+If you want to have a static index, the `index` option value should be a plain 
string, e.g. `'myusers'`, all the records will be consistently written into 
"myusers" index.
+
+If you want to have a dynamic index, you can use `{field_name}` to reference a 
field value in the record to dynamically generate a target index.
+You can also use `'{field_name|date_format_string}'` to convert a field value 
of `TIMESTAMP/DATE/TIME` type into the format specified by the 
`date_format_string`.
+The `date_format_string` is compatible with Java's 
[DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/index.html).
+For example, if the option value is `'myusers-{log_ts|yyyy-MM-dd}'`, then a 
record with `log_ts` field value `2020-03-27 12:25:55` will be written into 
"myusers-2020-03-27" index.
+
+You can also use `'{now()|date_format_string}'` to convert the current system 
time to the format specified by `date_format_string`. The corresponding time 
type of `now()` is `TIMESTAMP_WITH_LTZ`.
+When formatting the system time as a string, the time zone configured in the 
session through `table.local-time-zone` will be used. You can use `NOW()`, 
`now()`, `CURRENT_TIMESTAMP`, `current_timestamp`.
+
+**NOTE:**  When using the dynamic index generated by the current system time, 
for changelog stream, there is no guarantee that the records with the same 
primary key can generate the same index name.
+Therefore, the dynamic index based on the system time can only support append 
only stream.
+
+Data Type Mapping
+----------------
+
+Opensearch stores document in a JSON string. So the data type mapping is 
between Flink data type and JSON data type.
+Flink uses built-in `'json'` format for Opensearch connector. Please refer to 
[JSON Format]({{< ref "docs/connectors/table/formats/json" >}}) page for more 
type mapping details.
+
+{{< top >}}
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
index dcd87cb..1ff6f12 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/OpensearchConnectorOptions.java
@@ -143,7 +143,7 @@ public class OpensearchConnectorOptions {
     public static final ConfigOption<DeliveryGuarantee> 
DELIVERY_GUARANTEE_OPTION =
             ConfigOptions.key("sink.delivery-guarantee")
                     .enumType(DeliveryGuarantee.class)
-                    .defaultValue(DeliveryGuarantee.NONE)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
                     .withDescription("Optional delivery guarantee when 
committing.");
 
     public static final ConfigOption<Boolean> ALLOW_INSECURE =

Reply via email to