robg-eb commented on code in PR #152: URL: https://github.com/apache/flink-connector-aws/pull/152#discussion_r1700332902
########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java: ########## @@ -547,12 +552,132 @@ void testInstantArray() { assertThat(actualResult).containsAllEntriesOf(expectedResult); } + @Test + void testDeleteOnlyPrimaryKey() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set<String> primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the single primary key. + // For a Delete request, only "key" should be included in the expectedResult, and not "otherField". + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map<String, AttributeValue> actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map<String, AttributeValue> expectedResult = + singletonMap(key, AttributeValue.builder().s(value).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + + @Test + void testDeleteOnlyPrimaryKeys() { + String key = "key"; + String value = "some_string"; + String additionalKey = "additional_key"; + String additionalValue = "additional_value"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set<String> primaryKeys = new HashSet<>(); + primaryKeys.add(key); + primaryKeys.add(additionalKey); + + // Create a Row with three fields - "key", "additional_key", and "otherField". + // "key" and "additional_key" make up the composite primary key. + // For a Delete request, only "key" and "additional_key" should be included in the expectedResult, and not "otherField". + DataType dataType = DataTypes.ROW( + DataTypes.FIELD(key, DataTypes.STRING()), + DataTypes.FIELD(additionalKey, DataTypes.STRING()), + DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields( + StringData.fromString(value), StringData.fromString(additionalValue), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.DELETE); + + Map<String, AttributeValue> actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map<String, AttributeValue> expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(additionalKey, AttributeValue.builder().s(additionalValue).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); + assertThat(expectedResult).containsAllEntriesOf(actualResult); + } + + @Test + void testPKIgnoredForInsert() { + String key = "key"; + String value = "some_string"; + String otherField = "other_field"; + String otherValue = "other_value"; + Set<String> primaryKeys = new HashSet<>(Collections.singletonList(key)); + + // Create a Row with two fields - "key" and "otherField". "key" is the primary key. + // For an Insert request, all fields should be included regardless of the Primary Key. + DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING())); + RowDataToAttributeValueConverter rowDataToAttributeValueConverter = + new RowDataToAttributeValueConverter(dataType, primaryKeys); + RowData rowData = createElementWithMultipleFields(StringData.fromString(value), StringData.fromString(otherValue)); + rowData.setRowKind(RowKind.INSERT); + + Map<String, AttributeValue> actualResult = + rowDataToAttributeValueConverter.convertRowData(rowData); + Map<String, AttributeValue> expectedResult = new HashMap<>(); + expectedResult.put(key, AttributeValue.builder().s(value).build()); + expectedResult.put(otherField, AttributeValue.builder().s(otherValue).build()); + + assertThat(actualResult).containsAllEntriesOf(expectedResult); Review Comment: @vahmed-hamdy - I took a look at doing this, but `containsExactly` only works for (ordered) Lists, whereas the things we're comparing here are actually `HashMap` . We'd have to convert these HashMap and impose an order on them as lists to compare them, which would seem to be more complicated than what I have here in just running two separate assertions - would you agree ? Let me know if you think there's a better way - I'm admittedly not a Java expert.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org