Hello!

I am looking to use the DynamoDB Table API connector to write rows to AWS
DynamoDB.  I have found what appears to be a bug in the implementation for
Row Delete operations.   I have an idea on what needs to be fixed as well,
and given that I am new to this community, I am looking for guidance on how
to get an official JIRA created for this issue and then potentially
contribute a fix via a GitHub PR.

The issue is that, for a Deletion request, DynamoDB expects to get a
request containing *only *the primary key fields, but the Connector is
including ALL fields in the Row with the Delete request, leading to the
runtime error message:

org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
The provided key element does not match the schema

I believe the root cause here is line 275 in DynamoDbSinkWriter.java as
seen here -
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267


This line seems to be setting the "key" for the DeleteRequest to the full
set of attributes for the Row, rather than only the key field(s).

To replicate this issue, one can take the following steps:

(1) Create a new DynamoDB table in AWS.  Command line:
aws dynamodb create-table \
  --table-name orders \
  --attribute-definitions AttributeName=userId,AttributeType=S \
  --key-schema AttributeName=userId,KeyType=HASH \
  --billing-mode PAY_PER_REQUEST

(2) Create an input file in Debezium-JSON format with the following rows to
start:
{"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
{"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
{"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
{"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}

(3) Start the Flink SQL Client, and run the following, substituting in the
proper local paths for the Dynamo Connector JAR file and for this local
sample input file:

ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'changelog';

CREATE TABLE Orders_CDC(
  orderId BIGINT,
  price float,
  userId STRING
 ) WITH (
   'connector' = 'filesystem',
   'path' = '/path/to/input_file.jsonl',
   'format' = 'debezium-json'
 );

CREATE TABLE Orders_Dynamo (
  orderId BIGINT,
  price float,
  userId STRING,
  PRIMARY KEY (userId) NOT ENFORCED
) PARTITIONED BY ( userId )
WITH (
  'connector' = 'dynamodb',
  'table-name' = 'orders',
  'aws.region' = 'us-east-1'
);

INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

(4) At this point, we will see that things currently all work properly, and
these 4 rows are inserted properly to Dynamo, because they are "Insert"
operations.   So far, so good!

(5) Now, add the following row to the input file.  This represents a
deletion in Debezium format, which should then cause a Deletion on the
corresponding DynamoDB table:
{"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}

(6) Re-Run the SQL statement:
INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;

(7) You will now see a failure in the job with the error message "The
provided key element does not match the schema".   This is happening
because the Flink DynamoDB connector is attempting to issue a Delete
request to Dynamo while passing all fields (orderId, price, and userId),
when only the orderId should be passed in a delete request.

I believe we can fix this by adjusting the DynamoDbSinkWriter.java to
leverage the knowledge it has about partition keys (already present in the
"overwriteByPartitionKeys" field) to have it only pass long the partition
key fields to any Delete requests.   Does this sound like a good approach?

If so, as I am new to this community, please let me know the proper
procedure to get this flagged as an official issue within JIRA and then to
get a Pull Request reviewed/approved, and I could start working on one.

Thanks,
Rob

Reply via email to