This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f5fe53ecd2515b03c12852c003693aa3450338ea
Author: Martijn Visser <[email protected]>
AuthorDate: Mon Jan 19 17:02:32 2026 +0100

    [hotfix][docs] Fix broken refs
    
      - Remove .zh suffix from refs in standalone/overview.md
      - Manually added externalized docs for DynamoDB, since this repo doesn't 
contain external sync anymore
---
 .../docs/connectors/datastream/dynamodb.md         | 171 ++++++++++++
 docs/content.zh/docs/connectors/table/dynamodb.md  | 306 +++++++++++++++++++++
 .../resource-providers/standalone/overview.md      |   8 +-
 .../content/docs/connectors/datastream/dynamodb.md | 171 ++++++++++++
 docs/content/docs/connectors/table/dynamodb.md     | 306 +++++++++++++++++++++
 5 files changed, 958 insertions(+), 4 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/dynamodb.md 
b/docs/content.zh/docs/connectors/datastream/dynamodb.md
new file mode 100644
index 00000000000..a8e44f70f21
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/dynamodb.md
@@ -0,0 +1,171 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon DynamoDB Sink
+
+The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
+to setup a table.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-dynamodb 3.0.0 >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new 
CustomElementConverter();
+
+DynamoDbSink<String> dynamoDbSink = 
+    DynamoDbSink.<InputType>builder()
+        .setDynamoDbProperties(sinkProperties)              // Required
+        .setTableName("my-dynamodb-table")                  // Required
+        .setElementConverter(elementConverter)              // Required
+        .setOverwriteByPartitionKeys(singletonList("key"))  // Optional  
+        .setFailOnError(false)                              // Optional
+        .setMaxBatchSize(25)                                // Optional
+        .setMaxInFlightRequests(50)                         // Optional
+        .setMaxBufferedRequests(10_000)                     // Optional
+        .setMaxTimeInBufferMS(5000)                         // Optional
+        .build();
+
+flinkStream.sinkTo(dynamoDbSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val elementConverter = new CustomElementConverter();
+
+val dynamoDbSink =
+    DynamoDbSink.<InputType>builder()
+        .setDynamoDbProperties(sinkProperties)              // Required
+        .setTableName("my-dynamodb-table")                  // Required
+        .setElementConverter(elementConverter)              // Required
+        .setOverwriteByPartitionKeys(singletonList("key"))  // Optional    
+        .setFailOnError(false)                              // Optional
+        .setMaxBatchSize(25)                                // Optional
+        .setMaxInFlightRequests(50)                         // Optional
+        .setMaxBufferedRequests(10_000)                     // Optional
+        .setMaxTimeInBufferMS(5000)                         // Optional
+        .build()
+
+flinkStream.sinkTo(dynamoDbSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's DynamoDB sink is created by using the static builder 
`DynamoDBSink.<InputType>builder()`.
+
+1. __setDynamoDbProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the DynamoDB client.
+2. __setTableName(String tableName)__
+    * Required.
+    * Name of the table to sink to.
+3. __setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> 
elementConverter)__
+    * Required.
+    * Converts generic records of type `InputType` to `DynamoDbWriteRequest`.
+4. _setOverwriteByPartitionKeys(List<String> partitionKeys)_
+    * Optional. Default: [].
+    * Used to deduplicate write requests within each batch pushed to DynamoDB.
+5. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records are treated as fatal exceptions 
in the sink.
+6. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `25`.
+    * Maximum size of a batch to write.
+7. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+8. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before 
backpressure is applied.
+9. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * N/A.
+    * This configuration is not supported, see 
[FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854).
+10. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+11. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * N/A.
+    * This configuration is not supported, see 
[FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854).
+12. _build()_
+    * Constructs and returns the DynamoDB sink.
+
+## Element Converter
+
+An element converter is used to convert from a record in the DataStream to a 
DynamoDbWriteRequest which the sink will write to the destination DynamoDB 
table. The DynamoDB sink allows the user to supply a custom element converter, 
or use the provided
+`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` 
objects. For more information on supported
+annotations see 
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema).
+
+A sample application using a custom `ElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java).
 A sample application using the `DynamoDbBeanElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoD
 [...]
+
+## Using Custom DynamoDB Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer 
against a DynamoDB VPC endpoint or a non-AWS
+DynamoDB endpoint such as [Localstack](https://localstack.cloud/); this is 
especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would 
normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and 
`AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the 
endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md 
b/docs/content.zh/docs/connectors/table/dynamodb.md
new file mode 100644
index 00000000000..28d23ed12ae
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/dynamodb.md
@@ -0,0 +1,306 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/table/connectors/dynamodb.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon DynamoDB SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The DynamoDB connector allows for writing data into [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table "dynamodb" 3.0.0 >}}
+
+How to create a DynamoDB table
+-----------------------------------------
+
+Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
+to set up a DynamoDB table. The following example shows how to create a table 
backed by a DynamoDB table with minimum required options:
+
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+)
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For DynamoDB use 
<code>'dynamodb'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>table-name</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the DynamoDB table to use.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the DynamoDB table is defined.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Authentication 
Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS access key ID to use when setting credentials provider type 
to BASIC.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.basic.secretkey</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS secret key to use when setting credentials provider type to 
BASIC.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.profile.path</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Optional configuration for profile path if credential provider type 
is set to be PROFILE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.profile.name</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Optional configuration for profile name if credential provider type 
is set to be PROFILE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.arn</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The role ARN to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.sessionName</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The role session name to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.externalId</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The external ID to use when credential provider type is set to 
ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The credentials provider that provides credentials for assuming the 
role when credential provider type is set to ASSUME_ROLE. Roles can be nested, 
so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The absolute path to the web identity token file that should be used 
if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">25</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be written to DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>Maximum number of parallel batch requests to DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>Size of input buffer before applying backpressure to upstream job 
graph</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in ms for an element to be in a buffer before 
flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure 
will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">HTTP Client 
Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>Maximum number of allowed concurrent requests by the HTTP 
client.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>Timeout for each read to the underlying socket.</td>
+    </tr>
+    </tbody>
+</table>
+
+## Authorization
+
+Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html)
 to allow writing to the DynamoDB table.
+
+## Authentication
+
+Depending on your deployment you would choose an appropriate Credentials 
Provider to allow access to DynamoDB.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.
+
+A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+- `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+- `BASIC` - Use access key ID and secret key supplied as configuration.
+- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
+- `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
+
+## Sink Partitioning
+
+The DynamoDB sink supports client side deduplication of data via the 
`PARTITIONED BY` clause. You can specify a list of
+partition keys, the sink will only send the latest record for each composite 
key within a batch. For example:
+
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+) PARTITIONED BY ( user_id )
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+## Notice
+
+The current implementation of the DynamoDB SQL connector is write-only and 
doesn't provide an implementation for source queries.
+Queries similar to:
+```sql
+SELECT * FROM DynamoDbTable;
+```
+should result in an error similar to
+```
+Connector dynamodb can only be used as a sink. It cannot be used as a source.
+```
+{{< top >}}
diff --git 
a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md 
b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md
index 6498b2cb3e3..8312ab55c84 100644
--- a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md
+++ b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md
@@ -116,12 +116,12 @@ cd flink-*
 
 Flink 目录必须放在所有 worker 节点的相同目录下。你可以使用共享的 NFS 目录,或将 Flink 目录复制到每个 worker 节点上。
 
-请参考 [配置参数页面]({{< ref "docs/deployment/config.zh" >}}) 获取更多细节以及额外的配置项。
+请参考 [配置参数页面]({{< ref "docs/deployment/config" >}}) 获取更多细节以及额外的配置项。
 
 特别地,
 
 * 每个 JobManager 的可用内存值(`jobmanager.memory.process.size`),
-* 每个 TaskManager 的可用内存值 (`taskmanager.memory.process.size`,并检查 [内存调优指南]({{< 
ref "docs/deployment/memory/mem_tuning.zh" 
>}}#configure-memory-for-standalone-deployment)),
+* 每个 TaskManager 的可用内存值 (`taskmanager.memory.process.size`,并检查 [内存调优指南]({{< 
ref "docs/deployment/memory/mem_tuning" 
>}}#configure-memory-for-standalone-deployment)),
 * 每台机器的可用 CPU 数(`taskmanager.numberOfTaskSlots`),
 * 集群中所有 CPU 数(`parallelism.default`)和
 * 临时目录(`io.tmp.dirs`)
@@ -172,7 +172,7 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all
 
 ## High-Availability with Standalone
 
-In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({{< ref "docs/deployment/ha/zookeeper_ha.zh" >}}).
+In order to enable HA for a standalone cluster, you have to use the [ZooKeeper 
HA services]({{< ref "docs/deployment/ha/zookeeper_ha" >}}).
 
 Additionally, you have to configure your cluster to start multiple JobManagers.
 
@@ -188,7 +188,7 @@ jobManagerAddress1:webUIPort1
 jobManagerAddressX:webUIPortX
   </pre>
 
-By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
[high-availability.jobmanager.port]({{< ref "docs/deployment/config.zh" 
>}}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. 
`50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
+By default, the job manager will pick a *random port* for inter process 
communication. You can change this via the 
[high-availability.jobmanager.port]({{< ref "docs/deployment/config" 
>}}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. 
`50010`), ranges (`50000-50025`), or a combination of both 
(`50010,50011,50020-50025,50050-50075`).
 
 ### Example: Standalone Cluster with 2 JobManagers
 
diff --git a/docs/content/docs/connectors/datastream/dynamodb.md 
b/docs/content/docs/connectors/datastream/dynamodb.md
new file mode 100644
index 00000000000..4564d79f1a2
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/dynamodb.md
@@ -0,0 +1,171 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /dev/connectors/dynamodb.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon DynamoDB Sink
+
+The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
+to setup a table.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-dynamodb 3.0.0 >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new 
CustomElementConverter();
+
+DynamoDbSink<String> dynamoDbSink = 
+    DynamoDbSink.<InputType>builder()
+        .setDynamoDbProperties(sinkProperties)              // Required
+        .setTableName("my-dynamodb-table")                  // Required
+        .setElementConverter(elementConverter)              // Required
+        .setOverwriteByPartitionKeys(singletonList("key"))  // Optional  
+        .setFailOnError(false)                              // Optional
+        .setMaxBatchSize(25)                                // Optional
+        .setMaxInFlightRequests(50)                         // Optional
+        .setMaxBufferedRequests(10_000)                     // Optional
+        .setMaxTimeInBufferMS(5000)                         // Optional
+        .build();
+
+flinkStream.sinkTo(dynamoDbSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val elementConverter = new CustomElementConverter();
+
+val dynamoDbSink =
+    DynamoDbSink.<InputType>builder()
+        .setDynamoDbProperties(sinkProperties)              // Required
+        .setTableName("my-dynamodb-table")                  // Required
+        .setElementConverter(elementConverter)              // Required
+        .setOverwriteByPartitionKeys(singletonList("key"))  // Optional    
+        .setFailOnError(false)                              // Optional
+        .setMaxBatchSize(25)                                // Optional
+        .setMaxInFlightRequests(50)                         // Optional
+        .setMaxBufferedRequests(10_000)                     // Optional
+        .setMaxTimeInBufferMS(5000)                         // Optional
+        .build()
+
+flinkStream.sinkTo(dynamoDbSink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's DynamoDB sink is created by using the static builder 
`DynamoDBSink.<InputType>builder()`.
+
+1. __setDynamoDbProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the DynamoDB client.
+2. __setTableName(String tableName)__
+    * Required.
+    * Name of the table to sink to.
+3. __setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> 
elementConverter)__
+    * Required.
+    * Converts generic records of type `InputType` to `DynamoDbWriteRequest`.
+4. _setOverwriteByPartitionKeys(List<String> partitionKeys)_
+    * Optional. Default: [].
+    * Used to deduplicate write requests within each batch pushed to DynamoDB.
+5. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records are treated as fatal exceptions 
in the sink.
+6. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `25`.
+    * Maximum size of a batch to write.
+7. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies 
backpressure.
+8. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before 
backpressure is applied.
+9. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * N/A.
+    * This configuration is not supported, see 
[FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854).
+10. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+11. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * N/A.
+    * This configuration is not supported, see 
[FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854).
+12. _build()_
+    * Constructs and returns the DynamoDB sink.
+
+## Element Converter
+
+An element converter is used to convert from a record in the DataStream to a 
DynamoDbWriteRequest which the sink will write to the destination DynamoDB 
table. The DynamoDB sink allows the user to supply a custom element converter, 
or use the provided
+`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` 
objects. For more information on supported
+annotations see 
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema).
+
+A sample application using a custom `ElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java).
 A sample application using the `DynamoDbBeanElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoD
 [...]
+
+## Using Custom DynamoDB Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer 
against a DynamoDB VPC endpoint or a non-AWS
+DynamoDB endpoint such as [Localstack](https://localstack.cloud/); this is 
especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would 
normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and 
`AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the 
endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566";)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/table/dynamodb.md 
b/docs/content/docs/connectors/table/dynamodb.md
new file mode 100644
index 00000000000..a51c5e718d2
--- /dev/null
+++ b/docs/content/docs/connectors/table/dynamodb.md
@@ -0,0 +1,306 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /dev/table/connectors/dynamodb.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon DynamoDB SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append & Upsert Mode" >}}
+
+The DynamoDB connector allows for writing data into [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table "dynamodb" 3.0.0 >}}
+
+How to create a DynamoDB table
+-----------------------------------------
+
+Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
+to set up a DynamoDB table. The following example shows how to create a table 
backed by a DynamoDB table with minimum required options:
+
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+)
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For DynamoDB use 
<code>'dynamodb'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>table-name</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the DynamoDB table to use.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the DynamoDB table is defined.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Authentication 
Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS access key ID to use when setting credentials provider type 
to BASIC.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.basic.secretkey</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS secret key to use when setting credentials provider type to 
BASIC.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.profile.path</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Optional configuration for profile path if credential provider type 
is set to be PROFILE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.profile.name</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Optional configuration for profile name if credential provider type 
is set to be PROFILE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.arn</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The role ARN to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.sessionName</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The role session name to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.externalId</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The external ID to use when credential provider type is set to 
ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.role.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The credentials provider that provides credentials for assuming the 
role when credential provider type is set to ASSUME_ROLE. Roles can be nested, 
so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+      <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The absolute path to the web identity token file that should be used 
if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">25</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be written to DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>Maximum number of parallel batch requests to DynamoDB.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>Size of input buffer before applying backpressure to upstream job 
graph</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in ms for an element to be in a buffer before 
flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure 
will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">HTTP Client 
Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>Maximum number of allowed concurrent requests by the HTTP 
client.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>Timeout for each read to the underlying socket.</td>
+    </tr>
+    </tbody>
+</table>
+
+## Authorization
+
+Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html)
 to allow writing to the DynamoDB table.
+
+## Authentication
+
+Depending on your deployment you would choose an appropriate Credentials 
Provider to allow access to DynamoDB.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.
+
+A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+- `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+- `BASIC` - Use access key ID and secret key supplied as configuration.
+- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
+- `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
+
+## Sink Partitioning
+
+The DynamoDB sink supports client side deduplication of data via the 
`PARTITIONED BY` clause. You can specify a list of
+partition keys, the sink will only send the latest record for each composite 
key within a batch. For example:
+
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+) PARTITIONED BY ( user_id )
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+## Notice
+
+The current implementation of the DynamoDB SQL connector is write-only and 
doesn't provide an implementation for source queries.
+Queries similar to:
+```sql
+SELECT * FROM DynamoDbTable;
+```
+should result in an error similar to
+```
+Connector dynamodb can only be used as a sink. It cannot be used as a source.
+```
+{{< top >}}


Reply via email to