Hi Rob, I agree with the issue here as well as the proposed solution. Thanks alot for the deep dive and the reproducing steps. I have created a ticket on your behalf: https://issues.apache.org/jira/browse/FLINK-35500 you can comment on it if you intend to work on it and then submit a PR against https://github.com/apache/flink-connector-aws repo. One of the committers should be able to review it then.
Best Regards Ahmed Hamdy On Fri, 31 May 2024 at 00:55, Rob Goretsky <robert.goret...@gmail.com> wrote: > Hello! > > I am looking to use the DynamoDB Table API connector to write rows to AWS > DynamoDB. I have found what appears to be a bug in the implementation for > Row Delete operations. I have an idea on what needs to be fixed as well, > and given that I am new to this community, I am looking for guidance on how > to get an official JIRA created for this issue and then potentially > contribute a fix via a GitHub PR. > > The issue is that, for a Deletion request, DynamoDB expects to get a > request containing *only *the primary key fields, but the Connector is > including ALL fields in the Row with the Delete request, leading to the > runtime error message: > > org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: > The provided key element does not match the schema > > I believe the root cause here is line 275 in DynamoDbSinkWriter.java as > seen here - > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267 > > > This line seems to be setting the "key" for the DeleteRequest to the full > set of attributes for the Row, rather than only the key field(s). > > To replicate this issue, one can take the following steps: > > (1) Create a new DynamoDB table in AWS. Command line: > aws dynamodb create-table \ > --table-name orders \ > --attribute-definitions AttributeName=userId,AttributeType=S \ > --key-schema AttributeName=userId,KeyType=HASH \ > --billing-mode PAY_PER_REQUEST > > (2) Create an input file in Debezium-JSON format with the following rows > to start: > {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}} > {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}} > {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}} > {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}} > > (3) Start the Flink SQL Client, and run the following, substituting in the > proper local paths for the Dynamo Connector JAR file and for this local > sample input file: > > ADD JAR > '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar'; > SET 'execution.runtime-mode' = 'streaming'; > SET 'sql-client.execution.result-mode' = 'changelog'; > > CREATE TABLE Orders_CDC( > orderId BIGINT, > price float, > userId STRING > ) WITH ( > 'connector' = 'filesystem', > 'path' = '/path/to/input_file.jsonl', > 'format' = 'debezium-json' > ); > > CREATE TABLE Orders_Dynamo ( > orderId BIGINT, > price float, > userId STRING, > PRIMARY KEY (userId) NOT ENFORCED > ) PARTITIONED BY ( userId ) > WITH ( > 'connector' = 'dynamodb', > 'table-name' = 'orders', > 'aws.region' = 'us-east-1' > ); > > INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; > > (4) At this point, we will see that things currently all work properly, > and these 4 rows are inserted properly to Dynamo, because they are "Insert" > operations. So far, so good! > > (5) Now, add the following row to the input file. This represents a > deletion in Debezium format, which should then cause a Deletion on the > corresponding DynamoDB table: > {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}} > > (6) Re-Run the SQL statement: > INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; > > (7) You will now see a failure in the job with the error message "The > provided key element does not match the schema". This is happening > because the Flink DynamoDB connector is attempting to issue a Delete > request to Dynamo while passing all fields (orderId, price, and userId), > when only the orderId should be passed in a delete request. > > I believe we can fix this by adjusting the DynamoDbSinkWriter.java to > leverage the knowledge it has about partition keys (already present in the > "overwriteByPartitionKeys" field) to have it only pass long the partition > key fields to any Delete requests. Does this sound like a good approach? > > If so, as I am new to this community, please let me know the proper > procedure to get this flagged as an official issue within JIRA and then to > get a Pull Request reviewed/approved, and I could start working on one. > > Thanks, > Rob > > >