xx789633 commented on code in PR #2573:
URL: https://github.com/apache/fluss/pull/2573#discussion_r2767843585
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -49,23 +51,71 @@ private TableLookup(
MetadataUpdater metadataUpdater,
LookupClient lookupClient,
@Nullable List<String> lookupColumnNames) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames, false);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ boolean insertIfNotExists) {
+ this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null,
insertIfNotExists);
+ }
+
+ private TableLookup(
+ TableInfo tableInfo,
+ SchemaGetter schemaGetter,
+ MetadataUpdater metadataUpdater,
+ LookupClient lookupClient,
+ @Nullable List<String> lookupColumnNames,
+ boolean insertIfNotExists) {
this.tableInfo = tableInfo;
this.schemaGetter = schemaGetter;
this.metadataUpdater = metadataUpdater;
this.lookupClient = lookupClient;
this.lookupColumnNames = lookupColumnNames;
+ this.insertIfNotExists = insertIfNotExists;
+ }
+
+ @Override
+ public Lookup enableInsertIfNotExists() {
+ if (lookupColumnNames != null) {
+ throw new IllegalArgumentException(
+ "insertIfNotExists can not be used with prefix lookup");
+ }
+
+ if (tableInfo.getSchema().getColumns().stream()
+ .filter(column -> !column.getDataType().isNullable())
+ .filter(column ->
!tableInfo.getPrimaryKeys().contains(column.getName()))
+ .anyMatch(
+ column ->
+ !tableInfo
+ .getSchema()
+ .getAutoIncrementColumnNames()
+ .contains(column.getName()))) {
+ throw new IllegalArgumentException(
+ "insertIfNotExists cannot be enabled for tables with
non-nullable columns besides primary key and auto increment columns.");
+ }
+
+ return new TableLookup(tableInfo, schemaGetter, metadataUpdater,
lookupClient, true);
}
@Override
public Lookup lookupBy(List<String> lookupColumnNames) {
+ if (insertIfNotExists) {
Review Comment:
good suggestion. done.
##########
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java:
##########
@@ -633,6 +633,85 @@ void testPutAutoIncrementColumnAndLookup() throws
Exception {
verifyRecords(expectedRecords, autoIncTable, schema);
}
+ @Test
+ void testLookupWithInsertIfNotExists() throws Exception {
+ TablePath tablePath = TablePath.of("test_db_1",
"test_invalid_insert_lookup_table");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column("b", DataTypes.INT())
+ .withComment("b is second column")
+ .primaryKey("b")
+ .column("c", new StringType(false))
+ .enableAutoIncrement("a")
+ .build();
+ TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).build();
+ createTable(tablePath, tableDescriptor, false);
+ Table invalidTable = conn.getTable(tablePath);
+
+ assertThatThrownBy(
+ () ->
invalidTable.newLookup().enableInsertIfNotExists().createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "insertIfNotExists cannot be enabled for tables with
non-nullable columns besides primary key and auto increment columns.");
+
+ tablePath = TablePath.of("test_db_1", "test_insert_lookup_table");
+ schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .withComment("a is first column")
+ .column("b", DataTypes.INT())
+ .withComment("b is second column")
+ .column("c", new StringType(true))
+ .primaryKey("b")
+ .enableAutoIncrement("a")
+ .build();
+ tableDescriptor =
TableDescriptor.builder().schema(schema).distributedBy(1, "b").build();
+ createTable(tablePath, tableDescriptor, true);
+ Table table = conn.getTable(tablePath);
+ RowType rowType = schema.getRowType();
+
+ assertThatThrownBy(
+ () ->
+ table.newLookup()
+ .lookupBy("b")
+ .enableInsertIfNotExists()
+ .createLookuper())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("insertIfNotExists can not be used with
prefix lookup");
+ assertThatThrownBy(
Review Comment:
updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]