Ahmed Hamdy created FLINK-35500:
-----------------------------------

             Summary: DynamoDb SinkWriter fails to delete elements due to key 
not found
                 Key: FLINK-35500
                 URL: https://issues.apache.org/jira/browse/FLINK-35500
             Project: Flink
          Issue Type: Bug
          Components: Connectors / DynamoDB
    Affects Versions: aws-connector-4.2.0, aws-connector-4.1.0, 
aws-connector-4.0.0
            Reporter: Ahmed Hamdy
             Fix For: aws-connector-4.4.0


h2. Description
When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} 
records and throws 
{quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
 The provided key element does not match the schema{quote}

This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key 
instead of the constructed primary Key[1].

Note: The issue is reported in user mailing list[2]

h2. Steps to Reproduce

(1) Create a new DynamoDB table in AWS.  Command line:
aws dynamodb create-table \
  --table-name orders \
  --attribute-definitions AttributeName=userId,AttributeType=S \
  --key-schema AttributeName=userId,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST

(2) Create an input file in Debezium-JSON format with the following rows to 
start:
{"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
{"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
{"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
{"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}

(3) Start the Flink SQL Client, and run the following, substituting in the 
proper local paths for the Dynamo Connector JAR file and for this local sample 
input file:

ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'changelog';

CREATE TABLE Orders_CDC(
  orderId BIGINT,
  price float,
  userId STRING
 ) WITH (
   'connector' = 'filesystem',
   'path' = '/path/to/input_file.jsonl',
   'format' = 'debezium-json'
 );

CREATE TABLE Orders_Dynamo (
  orderId BIGINT,
  price float,
  userId STRING,
  PRIMARY KEY (userId) NOT ENFORCED
) PARTITIONED BY ( userId )
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'orders',
  'aws.region' = 'us-east-1'
);

INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

(4) At this point, we will see that things currently all work properly, and 
these 4 rows are inserted properly to Dynamo, because they are "Insert" 
operations.   So far, so good!

(5) Now, add the following row to the input file.  This represents a deletion 
in Debezium format, which should then cause a Deletion on the corresponding 
DynamoDB table:
{"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}

(6) Re-Run the SQL statement:
INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

h3. References
1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to