Hi Rob,
I agree with the issue here as well as the proposed solution. Thanks alot
for the deep dive and the reproducing steps.
I have created a ticket on your behalf:
https://issues.apache.org/jira/browse/FLINK-35500
you can comment on it if you intend to work on it and then submit a PR
against https://github.com/apache/flink-connector-aws repo. One of the
committers should be able to review it then.

Best Regards
Ahmed Hamdy


On Fri, 31 May 2024 at 00:55, Rob Goretsky <robert.goret...@gmail.com>
wrote:

> Hello!
>
> I am looking to use the DynamoDB Table API connector to write rows to AWS
> DynamoDB.  I have found what appears to be a bug in the implementation for
> Row Delete operations.   I have an idea on what needs to be fixed as well,
> and given that I am new to this community, I am looking for guidance on how
> to get an official JIRA created for this issue and then potentially
> contribute a fix via a GitHub PR.
>
> The issue is that, for a Deletion request, DynamoDB expects to get a
> request containing *only *the primary key fields, but the Connector is
> including ALL fields in the Row with the Delete request, leading to the
> runtime error message:
>
> org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
> The provided key element does not match the schema
>
> I believe the root cause here is line 275 in DynamoDbSinkWriter.java as
> seen here -
> https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
>
>
> This line seems to be setting the "key" for the DeleteRequest to the full
> set of attributes for the Row, rather than only the key field(s).
>
> To replicate this issue, one can take the following steps:
>
> (1) Create a new DynamoDB table in AWS.  Command line:
> aws dynamodb create-table \
>   --table-name orders \
>   --attribute-definitions AttributeName=userId,AttributeType=S \
>   --key-schema AttributeName=userId,KeyType=HASH \
>   --billing-mode PAY_PER_REQUEST
>
> (2) Create an input file in Debezium-JSON format with the following rows
> to start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
>
> (3) Start the Flink SQL Client, and run the following, substituting in the
> proper local paths for the Dynamo Connector JAR file and for this local
> sample input file:
>
> ADD JAR
> '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
>
> CREATE TABLE Orders_CDC(
>   orderId BIGINT,
>   price float,
>   userId STRING
>  ) WITH (
>    'connector' = 'filesystem',
>    'path' = '/path/to/input_file.jsonl',
>    'format' = 'debezium-json'
>  );
>
> CREATE TABLE Orders_Dynamo (
>   orderId BIGINT,
>   price float,
>   userId STRING,
>   PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
>   'connector' = 'dynamodb',
>   'table-name' = 'orders',
>   'aws.region' = 'us-east-1'
> );
>
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
>
> (4) At this point, we will see that things currently all work properly,
> and these 4 rows are inserted properly to Dynamo, because they are "Insert"
> operations.   So far, so good!
>
> (5) Now, add the following row to the input file.  This represents a
> deletion in Debezium format, which should then cause a Deletion on the
> corresponding DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
>
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
>
> (7) You will now see a failure in the job with the error message "The
> provided key element does not match the schema".   This is happening
> because the Flink DynamoDB connector is attempting to issue a Delete
> request to Dynamo while passing all fields (orderId, price, and userId),
> when only the orderId should be passed in a delete request.
>
> I believe we can fix this by adjusting the DynamoDbSinkWriter.java to
> leverage the knowledge it has about partition keys (already present in the
> "overwriteByPartitionKeys" field) to have it only pass long the partition
> key fields to any Delete requests.   Does this sound like a good approach?
>
> If so, as I am new to this community, please let me know the proper
> procedure to get this flagged as an official issue within JIRA and then to
> get a Pull Request reviewed/approved, and I could start working on one.
>
> Thanks,
> Rob
>
>
>

Reply via email to