This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f5fe53ecd2515b03c12852c003693aa3450338ea Author: Martijn Visser <[email protected]> AuthorDate: Mon Jan 19 17:02:32 2026 +0100 [hotfix][docs] Fix broken refs - Remove .zh suffix from refs in standalone/overview.md - Manually added externalized docs for DynamoDB, since this repo doesn't contain external sync anymore --- .../docs/connectors/datastream/dynamodb.md | 171 ++++++++++++ docs/content.zh/docs/connectors/table/dynamodb.md | 306 +++++++++++++++++++++ .../resource-providers/standalone/overview.md | 8 +- .../content/docs/connectors/datastream/dynamodb.md | 171 ++++++++++++ docs/content/docs/connectors/table/dynamodb.md | 306 +++++++++++++++++++++ 5 files changed, 958 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/dynamodb.md b/docs/content.zh/docs/connectors/datastream/dynamodb.md new file mode 100644 index 00000000000..a8e44f70f21 --- /dev/null +++ b/docs/content.zh/docs/connectors/datastream/dynamodb.md @@ -0,0 +1,171 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-dynamodb 3.0.0 >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter(); + +DynamoDbSink<String> dynamoDbSink = + DynamoDbSink.<InputType>builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build(); + +flinkStream.sinkTo(dynamoDbSink); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val elementConverter = new CustomElementConverter(); + +val dynamoDbSink = + DynamoDbSink.<InputType>builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build() + +flinkStream.sinkTo(dynamoDbSink) +``` +{{< /tab >}} +{{< /tabs >}} + +## Configurations + +Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.<InputType>builder()`. + +1. __setDynamoDbProperties(Properties sinkProperties)__ + * Required. + * Supplies credentials, region and other parameters to the DynamoDB client. +2. __setTableName(String tableName)__ + * Required. + * Name of the table to sink to. +3. __setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> elementConverter)__ + * Required. + * Converts generic records of type `InputType` to `DynamoDbWriteRequest`. +4. _setOverwriteByPartitionKeys(List<String> partitionKeys)_ + * Optional. Default: []. + * Used to deduplicate write requests within each batch pushed to DynamoDB. +5. _setFailOnError(boolean failOnError)_ + * Optional. Default: `false`. + * Whether failed requests to write records are treated as fatal exceptions in the sink. +6. _setMaxBatchSize(int maxBatchSize)_ + * Optional. Default: `25`. + * Maximum size of a batch to write. +7. _setMaxInFlightRequests(int maxInFlightRequests)_ + * Optional. Default: `50`. + * The maximum number of in flight requests allowed before the sink applies backpressure. +8. _setMaxBufferedRequests(int maxBufferedRequests)_ + * Optional. Default: `10_000`. + * The maximum number of records that may be buffered in the sink before backpressure is applied. +9. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_ + * N/A. + * This configuration is not supported, see [FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854). +10. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_ + * Optional. Default: `5000`. + * The maximum time a record may stay in the sink before being flushed. +11. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_ + * N/A. + * This configuration is not supported, see [FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854). +12. _build()_ + * Constructs and returns the DynamoDB sink. + +## Element Converter + +An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided +`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` objects. For more information on supported +annotations see [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema). + +A sample application using a custom `ElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java). A sample application using the `DynamoDbBeanElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoD [...] + +## Using Custom DynamoDB Endpoints + +It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS +DynamoDB endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing +functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the +Flink configuration must be overridden via a configuration property. + +To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL. + +{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}} +{{< tab "Java" >}} +```java +Properties producerConfig = new Properties(); +producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val producerConfig = new Properties() +producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566") +``` +{{< /tab >}} +{{< /tabs >}} + +{{< top >}} diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md new file mode 100644 index 00000000000..28d23ed12ae --- /dev/null +++ b/docs/content.zh/docs/connectors/table/dynamodb.md @@ -0,0 +1,306 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/table/connectors/dynamodb.html +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Amazon DynamoDB SQL Connector + +{{< label "Sink: Batch" >}} +{{< label "Sink: Streaming Append & Upsert Mode" >}} + +The DynamoDB connector allows for writing data into [Amazon DynamoDB](https://aws.amazon.com/dynamodb). + +Dependencies +------------ + +{{< sql_connector_download_table "dynamodb" 3.0.0 >}} + +How to create a DynamoDB table +----------------------------------------- + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to set up a DynamoDB table. The following example shows how to create a table backed by a DynamoDB table with minimum required options: + +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Connector Options +----------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Common Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>connector</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use. For DynamoDB use <code>'dynamodb'</code>.</td> + </tr> + <tr> + <td><h5>table-name</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the DynamoDB table to use.</td> + </tr> + <tr> + <td><h5>aws.region</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS region where the DynamoDB table is defined.</td> + </tr> + <tr> + <td><h5>aws.endpoint</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS endpoint for DynamoDB.</td> + </tr> + <tr> + <td><h5>aws.trust.all.certificates</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If true accepts all SSL certificates.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Authentication Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>aws.credentials.provider</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">AUTO</td> + <td>String</td> + <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.accesskeyid</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.secretkey</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS secret key to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.path</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.name</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.arn</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.sessionName</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.externalId</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.provider</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td> + </tr> + <tr> + <td><h5>aws.credentials.webIdentityToken.file</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Sink Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.batch.max-size</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">25</td> + <td>Integer</td> + <td>Maximum batch size of elements to be written to DynamoDB.</td> + </tr> + <tr> + <td><h5>sink.requests.max-inflight</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">50</td> + <td>Integer</td> + <td>Maximum number of parallel batch requests to DynamoDB.</td> + </tr> + <tr> + <td><h5>sink.requests.max-buffered</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">10000</td> + <td>String</td> + <td>Size of input buffer before applying backpressure to upstream job graph</td> + </tr> + <tr> + <td><h5>sink.flush-buffer.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>Threshold time in ms for an element to be in a buffer before flushing.</td> + </tr> + <tr> + <td><h5>sink.fail-on-error</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">HTTP Client Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.http-client.max-concurrency</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">10000</td> + <td>Integer</td> + <td>Maximum number of allowed concurrent requests by the HTTP client.</td> + </tr> + <tr> + <td><h5>sink.http-client.read-timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">360000</td> + <td>Integer</td> + <td>Timeout for each read to the underlying socket.</td> + </tr> + </tbody> +</table> + +## Authorization + +Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html) to allow writing to the DynamoDB table. + +## Authentication + +Depending on your deployment you would choose an appropriate Credentials Provider to allow access to DynamoDB. +By default, the `AUTO` Credentials Provider is used. +If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider. + +A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting. +Supported values are: + +- `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider. +- `BASIC` - Use access key ID and secret key supplied as configuration. +- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. +- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`. +- `PROFILE` - Use an AWS credentials profile to create the AWS credentials. +- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. +- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. + +## Sink Partitioning + +The DynamoDB sink supports client side deduplication of data via the `PARTITIONED BY` clause. You can specify a list of +partition keys, the sink will only send the latest record for each composite key within a batch. For example: + +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING +) PARTITIONED BY ( user_id ) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +## Notice + +The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. +Queries similar to: +```sql +SELECT * FROM DynamoDbTable; +``` +should result in an error similar to +``` +Connector dynamodb can only be used as a sink. It cannot be used as a source. +``` +{{< top >}} diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md index 6498b2cb3e3..8312ab55c84 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/overview.md @@ -116,12 +116,12 @@ cd flink-* Flink 目录必须放在所有 worker 节点的相同目录下。你可以使用共享的 NFS 目录,或将 Flink 目录复制到每个 worker 节点上。 -请参考 [配置参数页面]({{< ref "docs/deployment/config.zh" >}}) 获取更多细节以及额外的配置项。 +请参考 [配置参数页面]({{< ref "docs/deployment/config" >}}) 获取更多细节以及额外的配置项。 特别地, * 每个 JobManager 的可用内存值(`jobmanager.memory.process.size`), -* 每个 TaskManager 的可用内存值 (`taskmanager.memory.process.size`,并检查 [内存调优指南]({{< ref "docs/deployment/memory/mem_tuning.zh" >}}#configure-memory-for-standalone-deployment)), +* 每个 TaskManager 的可用内存值 (`taskmanager.memory.process.size`,并检查 [内存调优指南]({{< ref "docs/deployment/memory/mem_tuning" >}}#configure-memory-for-standalone-deployment)), * 每台机器的可用 CPU 数(`taskmanager.numberOfTaskSlots`), * 集群中所有 CPU 数(`parallelism.default`)和 * 临时目录(`io.tmp.dirs`) @@ -172,7 +172,7 @@ bin/taskmanager.sh start|start-foreground|stop|stop-all ## High-Availability with Standalone -In order to enable HA for a standalone cluster, you have to use the [ZooKeeper HA services]({{< ref "docs/deployment/ha/zookeeper_ha.zh" >}}). +In order to enable HA for a standalone cluster, you have to use the [ZooKeeper HA services]({{< ref "docs/deployment/ha/zookeeper_ha" >}}). Additionally, you have to configure your cluster to start multiple JobManagers. @@ -188,7 +188,7 @@ jobManagerAddress1:webUIPort1 jobManagerAddressX:webUIPortX </pre> -By default, the job manager will pick a *random port* for inter process communication. You can change this via the [high-availability.jobmanager.port]({{< ref "docs/deployment/config.zh" >}}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). +By default, the job manager will pick a *random port* for inter process communication. You can change this via the [high-availability.jobmanager.port]({{< ref "docs/deployment/config" >}}#high-availability-jobmanager-port) key. This key accepts single ports (e.g. `50010`), ranges (`50000-50025`), or a combination of both (`50010,50011,50020-50025,50050-50075`). ### Example: Standalone Cluster with 2 JobManagers diff --git a/docs/content/docs/connectors/datastream/dynamodb.md b/docs/content/docs/connectors/datastream/dynamodb.md new file mode 100644 index 00000000000..4564d79f1a2 --- /dev/null +++ b/docs/content/docs/connectors/datastream/dynamodb.md @@ -0,0 +1,171 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /dev/connectors/dynamodb.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Amazon DynamoDB Sink + +The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to setup a table. + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-dynamodb 3.0.0 >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter(); + +DynamoDbSink<String> dynamoDbSink = + DynamoDbSink.<InputType>builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build(); + +flinkStream.sinkTo(dynamoDbSink); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +// Optional, provide via alternative routes e.g. environment variables +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val elementConverter = new CustomElementConverter(); + +val dynamoDbSink = + DynamoDbSink.<InputType>builder() + .setDynamoDbProperties(sinkProperties) // Required + .setTableName("my-dynamodb-table") // Required + .setElementConverter(elementConverter) // Required + .setOverwriteByPartitionKeys(singletonList("key")) // Optional + .setFailOnError(false) // Optional + .setMaxBatchSize(25) // Optional + .setMaxInFlightRequests(50) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .build() + +flinkStream.sinkTo(dynamoDbSink) +``` +{{< /tab >}} +{{< /tabs >}} + +## Configurations + +Flink's DynamoDB sink is created by using the static builder `DynamoDBSink.<InputType>builder()`. + +1. __setDynamoDbProperties(Properties sinkProperties)__ + * Required. + * Supplies credentials, region and other parameters to the DynamoDB client. +2. __setTableName(String tableName)__ + * Required. + * Name of the table to sink to. +3. __setElementConverter(ElementConverter<InputType, DynamoDbWriteRequest> elementConverter)__ + * Required. + * Converts generic records of type `InputType` to `DynamoDbWriteRequest`. +4. _setOverwriteByPartitionKeys(List<String> partitionKeys)_ + * Optional. Default: []. + * Used to deduplicate write requests within each batch pushed to DynamoDB. +5. _setFailOnError(boolean failOnError)_ + * Optional. Default: `false`. + * Whether failed requests to write records are treated as fatal exceptions in the sink. +6. _setMaxBatchSize(int maxBatchSize)_ + * Optional. Default: `25`. + * Maximum size of a batch to write. +7. _setMaxInFlightRequests(int maxInFlightRequests)_ + * Optional. Default: `50`. + * The maximum number of in flight requests allowed before the sink applies backpressure. +8. _setMaxBufferedRequests(int maxBufferedRequests)_ + * Optional. Default: `10_000`. + * The maximum number of records that may be buffered in the sink before backpressure is applied. +9. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_ + * N/A. + * This configuration is not supported, see [FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854). +10. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_ + * Optional. Default: `5000`. + * The maximum time a record may stay in the sink before being flushed. +11. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_ + * N/A. + * This configuration is not supported, see [FLINK-29854](https://issues.apache.org/jira/browse/FLINK-29854). +12. _build()_ + * Constructs and returns the DynamoDB sink. + +## Element Converter + +An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided +`DynamoDbBeanElementConverter` when you are working with `@DynamoDbBean` objects. For more information on supported +annotations see [here](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/examples-dynamodb-enhanced.html#dynamodb-enhanced-mapper-tableschema). + +A sample application using a custom `ElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java). A sample application using the `DynamoDbBeanElementConverter` can be found [here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoD [...] + +## Using Custom DynamoDB Endpoints + +It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS +DynamoDB endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing +functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the +Flink configuration must be overridden via a configuration property. + +To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL. + +{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}} +{{< tab "Java" >}} +```java +Properties producerConfig = new Properties(); +producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566"); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +val producerConfig = new Properties() +producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1") +producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") +producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566") +``` +{{< /tab >}} +{{< /tabs >}} + +{{< top >}} diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md new file mode 100644 index 00000000000..a51c5e718d2 --- /dev/null +++ b/docs/content/docs/connectors/table/dynamodb.md @@ -0,0 +1,306 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /dev/table/connectors/dynamodb.html +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Amazon DynamoDB SQL Connector + +{{< label "Sink: Batch" >}} +{{< label "Sink: Streaming Append & Upsert Mode" >}} + +The DynamoDB connector allows for writing data into [Amazon DynamoDB](https://aws.amazon.com/dynamodb). + +Dependencies +------------ + +{{< sql_connector_download_table "dynamodb" 3.0.0 >}} + +How to create a DynamoDB table +----------------------------------------- + +Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) +to set up a DynamoDB table. The following example shows how to create a table backed by a DynamoDB table with minimum required options: + +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING +) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +Connector Options +----------------- + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 50%">Description</th> + </tr> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Common Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>connector</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use. For DynamoDB use <code>'dynamodb'</code>.</td> + </tr> + <tr> + <td><h5>table-name</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the DynamoDB table to use.</td> + </tr> + <tr> + <td><h5>aws.region</h5></td> + <td>required</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS region where the DynamoDB table is defined.</td> + </tr> + <tr> + <td><h5>aws.endpoint</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS endpoint for DynamoDB.</td> + </tr> + <tr> + <td><h5>aws.trust.all.certificates</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If true accepts all SSL certificates.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Authentication Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>aws.credentials.provider</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">AUTO</td> + <td>String</td> + <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.accesskeyid</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.secretkey</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS secret key to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.path</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.name</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.arn</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.sessionName</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.externalId</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.provider</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td> + </tr> + <tr> + <td><h5>aws.credentials.webIdentityToken.file</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">Sink Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.batch.max-size</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">25</td> + <td>Integer</td> + <td>Maximum batch size of elements to be written to DynamoDB.</td> + </tr> + <tr> + <td><h5>sink.requests.max-inflight</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">50</td> + <td>Integer</td> + <td>Maximum number of parallel batch requests to DynamoDB.</td> + </tr> + <tr> + <td><h5>sink.requests.max-buffered</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">10000</td> + <td>String</td> + <td>Size of input buffer before applying backpressure to upstream job graph</td> + </tr> + <tr> + <td><h5>sink.flush-buffer.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>Threshold time in ms for an element to be in a buffer before flushing.</td> + </tr> + <tr> + <td><h5>sink.fail-on-error</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="5" class="text-left" style="width: 100%">HTTP Client Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.http-client.max-concurrency</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">10000</td> + <td>Integer</td> + <td>Maximum number of allowed concurrent requests by the HTTP client.</td> + </tr> + <tr> + <td><h5>sink.http-client.read-timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">360000</td> + <td>Integer</td> + <td>Timeout for each read to the underlying socket.</td> + </tr> + </tbody> +</table> + +## Authorization + +Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/using-identity-based-policies.html) to allow writing to the DynamoDB table. + +## Authentication + +Depending on your deployment you would choose an appropriate Credentials Provider to allow access to DynamoDB. +By default, the `AUTO` Credentials Provider is used. +If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider. + +A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting. +Supported values are: + +- `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider. +- `BASIC` - Use access key ID and secret key supplied as configuration. +- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. +- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`. +- `PROFILE` - Use an AWS credentials profile to create the AWS credentials. +- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. +- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. + +## Sink Partitioning + +The DynamoDB sink supports client side deduplication of data via the `PARTITIONED BY` clause. You can specify a list of +partition keys, the sink will only send the latest record for each composite key within a batch. For example: + +```sql +CREATE TABLE DynamoDbTable ( + `user_id` BIGINT, + `item_id` BIGINT, + `category_id` BIGINT, + `behavior` STRING +) PARTITIONED BY ( user_id ) +WITH ( + 'connector' = 'dynamodb', + 'table-name' = 'user_behavior', + 'aws.region' = 'us-east-2' +); +``` + +## Notice + +The current implementation of the DynamoDB SQL connector is write-only and doesn't provide an implementation for source queries. +Queries similar to: +```sql +SELECT * FROM DynamoDbTable; +``` +should result in an error similar to +``` +Connector dynamodb can only be used as a sink. It cannot be used as a source. +``` +{{< top >}}
