platinumhamburg commented on code in PR #2573:
URL: https://github.com/apache/fluss/pull/2573#discussion_r2767618981
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -633,6 +633,85 @@ void testPutAutoIncrementColumnAndLookup() throws
Exception {
verifyRecords(expectedRecords, autoIncTable, schema);
}
+ @Test
+ void testLookupWithInsertIfNotExists() throws Exception {
+ TablePath tablePath = TablePath.of("test_db_1",
"test_invalid_insert_lookup_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column("b", DataTypes.INT())
+ .withComment("b is second column")
+ .primaryKey("b")
+ .column("c", new StringType(false))
+ .enableAutoIncrement("a")
+ .build();
+ TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).build();
+ createTable(tablePath, tableDescriptor, false);
+ Table invalidTable = conn.getTable(tablePath);
+
+ assertThatThrownBy(
+ () ->
invalidTable.newLookup().enableInsertIfNotExists().createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "insertIfNotExists cannot be enabled for tables with
non-nullable columns besides primary key and auto increment columns.");
+
+ tablePath = TablePath.of("test_db_1", "test_insert_lookup_table");
+ schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column("b", DataTypes.INT())
+ .withComment("b is second column")
+ .column("c", new StringType(true))
+ .primaryKey("b")
+ .enableAutoIncrement("a")
+ .build();
+ tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1, "b").build();
+ createTable(tablePath, tableDescriptor, true);
+ Table table = conn.getTable(tablePath);
+ RowType rowType = schema.getRowType();
+
+ assertThatThrownBy(
+ () ->
+ table.newLookup()
+ .lookupBy("b")
+ .enableInsertIfNotExists()
+ .createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("insertIfNotExists can not be used with
prefix lookup");
+ assertThatThrownBy(
Review Comment:
This is a duplicate assertion. Moreover, the test schema contains only one
primary key column, 'b', so creating a lookuper with lookupBy("b") should not
result in a PrefixKeyLookuper. This issue was not introduced by this PR;
rather, the test has exposed an incompleteness in the semantic implementation
of TableLookup.
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -49,23 +51,71 @@ private TableLookup(
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
@Nullable List<String> lookupColumnNames) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames, false);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ boolean insertIfNotExists) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null,
insertIfNotExists);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ @Nullable List<String> lookupColumnNames,
+ boolean insertIfNotExists) {
this.tableInfo = tableInfo;
this.schemaGetter = schemaGetter;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.lookupColumnNames = lookupColumnNames;
+ this.insertIfNotExists = insertIfNotExists;
+ }
+
+ @Override
+ public Lookup enableInsertIfNotExists() {
+ if (lookupColumnNames != null) {
+ throw new IllegalArgumentException(
+ "insertIfNotExists can not be used with prefix lookup");
+ }
+
+ if (tableInfo.getSchema().getColumns().stream()
+ .filter(column -> !column.getDataType().isNullable())
+ .filter(column ->
!tableInfo.getPrimaryKeys().contains(column.getName()))
+ .anyMatch(
+ column ->
+ !tableInfo
+ .getSchema()
+ .getAutoIncrementColumnNames()
+ .contains(column.getName()))) {
+ throw new IllegalArgumentException(
+ "insertIfNotExists cannot be enabled for tables with
non-nullable columns besides primary key and auto increment columns.");
+ }
+
+ return new TableLookup(tableInfo, schemaGetter, metadataUpdater,
lookupClient, true);
}
@Override
public Lookup lookupBy(List<String> lookupColumnNames) {
+ if (insertIfNotExists) {
Review Comment:
Ditto, this check can be uniformly moved to createLookuper().
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -49,23 +51,71 @@ private TableLookup(
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
@Nullable List<String> lookupColumnNames) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames, false);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ boolean insertIfNotExists) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null,
insertIfNotExists);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ @Nullable List<String> lookupColumnNames,
+ boolean insertIfNotExists) {
this.tableInfo = tableInfo;
this.schemaGetter = schemaGetter;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.lookupColumnNames = lookupColumnNames;
+ this.insertIfNotExists = insertIfNotExists;
+ }
+
+ @Override
+ public Lookup enableInsertIfNotExists() {
+ if (lookupColumnNames != null) {
Review Comment:
This check can be uniformly moved to createLookuper().
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupType.java:
##########
@@ -23,5 +23,6 @@
@Internal
public enum LookupType {
LOOKUP,
+ LOOKUP_WITH_INSERT_IF_NOT_EXISTS,
Review Comment:
I think there's no need to extend the enum definition here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]