Hello! I am looking to use the DynamoDB Table API connector to write rows to AWS DynamoDB. I have found what appears to be a bug in the implementation for Row Delete operations. I have an idea on what needs to be fixed as well, and given that I am new to this community, I am looking for guidance on how to get an official JIRA created for this issue and then potentially contribute a fix via a GitHub PR.
The issue is that, for a Deletion request, DynamoDB expects to get a request containing *only *the primary key fields, but the Connector is including ALL fields in the Row with the Delete request, leading to the runtime error message: org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The provided key element does not match the schema I believe the root cause here is line 275 in DynamoDbSinkWriter.java as seen here - https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267 This line seems to be setting the "key" for the DeleteRequest to the full set of attributes for the Row, rather than only the key field(s). To replicate this issue, one can take the following steps: (1) Create a new DynamoDB table in AWS. Command line: aws dynamodb create-table \ --table-name orders \ --attribute-definitions AttributeName=userId,AttributeType=S \ --key-schema AttributeName=userId,KeyType=HASH \ --billing-mode PAY_PER_REQUEST (2) Create an input file in Debezium-JSON format with the following rows to start: {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}} {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}} {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}} {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}} (3) Start the Flink SQL Client, and run the following, substituting in the proper local paths for the Dynamo Connector JAR file and for this local sample input file: ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar'; SET 'execution.runtime-mode' = 'streaming'; SET 'sql-client.execution.result-mode' = 'changelog'; CREATE TABLE Orders_CDC( orderId BIGINT, price float, userId STRING ) WITH ( 'connector' = 'filesystem', 'path' = '/path/to/input_file.jsonl', 'format' = 'debezium-json' ); CREATE TABLE Orders_Dynamo ( orderId BIGINT, price float, userId STRING, PRIMARY KEY (userId) NOT ENFORCED ) PARTITIONED BY ( userId ) WITH ( 'connector' = 'dynamodb', 'table-name' = 'orders', 'aws.region' = 'us-east-1' ); INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; (4) At this point, we will see that things currently all work properly, and these 4 rows are inserted properly to Dynamo, because they are "Insert" operations. So far, so good! (5) Now, add the following row to the input file. This represents a deletion in Debezium format, which should then cause a Deletion on the corresponding DynamoDB table: {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}} (6) Re-Run the SQL statement: INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; (7) You will now see a failure in the job with the error message "The provided key element does not match the schema". This is happening because the Flink DynamoDB connector is attempting to issue a Delete request to Dynamo while passing all fields (orderId, price, and userId), when only the orderId should be passed in a delete request. I believe we can fix this by adjusting the DynamoDbSinkWriter.java to leverage the knowledge it has about partition keys (already present in the "overwriteByPartitionKeys" field) to have it only pass long the partition key fields to any Delete requests. Does this sound like a good approach? If so, as I am new to this community, please let me know the proper procedure to get this flagged as an official issue within JIRA and then to get a Pull Request reviewed/approved, and I could start working on one. Thanks, Rob