Ahmed Hamdy created FLINK-35500:
-----------------------------------
Summary: DynamoDb SinkWriter fails to delete elements due to key
not found
Key: FLINK-35500
URL: https://issues.apache.org/jira/browse/FLINK-35500
Project: Flink
Issue Type: Bug
Components: Connectors / DynamoDB
Affects Versions: aws-connector-4.2.0, aws-connector-4.1.0,
aws-connector-4.0.0
Reporter: Ahmed Hamdy
Fix For: aws-connector-4.4.0
h2. Description
When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}}
records and throws
{quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
The provided key element does not match the schema{quote}
This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key
instead of the constructed primary Key[1].
Note: The issue is reported in user mailing list[2]
h2. Steps to Reproduce
(1) Create a new DynamoDB table in AWS. Command line:
aws dynamodb create-table \
--table-name orders \
--attribute-definitions AttributeName=userId,AttributeType=S \
--key-schema AttributeName=userId,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
(2) Create an input file in Debezium-JSON format with the following rows to
start:
{"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
{"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
{"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
{"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
(3) Start the Flink SQL Client, and run the following, substituting in the
proper local paths for the Dynamo Connector JAR file and for this local sample
input file:
ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'changelog';
CREATE TABLE Orders_CDC(
orderId BIGINT,
price float,
userId STRING
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/input_file.jsonl',
'format' = 'debezium-json'
);
CREATE TABLE Orders_Dynamo (
orderId BIGINT,
price float,
userId STRING,
PRIMARY KEY (userId) NOT ENFORCED
) PARTITIONED BY ( userId )
WITH (
'connector' = 'dynamodb',
'table-name' = 'orders',
'aws.region' = 'us-east-1'
);
INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
(4) At this point, we will see that things currently all work properly, and
these 4 rows are inserted properly to Dynamo, because they are "Insert"
operations. So far, so good!
(5) Now, add the following row to the input file. This represents a deletion
in Debezium format, which should then cause a Deletion on the corresponding
DynamoDB table:
{"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
(6) Re-Run the SQL statement:
INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
h3. References
1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf
--
This message was sent by Atlassian Jira
(v8.20.10#820010)