Copilot commented on code in PR #2554:
URL: https://github.com/apache/fluss/pull/2554#discussion_r2761874026
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -64,12 +65,53 @@ public Lookup lookupBy(List<String> lookupColumnNames) {
@Override
public Lookuper createLookuper() {
- if (lookupColumnNames == null) {
+ if (lookupColumnNames == null || isPrimaryKey(lookupColumnNames)) {
return new PrimaryKeyLookuper(tableInfo, schemaGetter,
metadataUpdater, lookupClient);
- } else {
+ }
+
+ if (isPrefixKey(lookupColumnNames)) {
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid lookup columns %s for table '%s'. "
+ + "Lookup columns must be either the
complete primary key %s "
+ + "or a valid prefix key.",
Review Comment:
For log tables (no primary key), `createLookuper()` currently throws an
"Invalid lookup columns ... complete primary key []" error when `lookupBy(...)`
is used because `isPrefixKey` returns false and the generic error path is
taken. This is misleading/inconsistent with
`PrimaryKeyLookuper`/`PrefixKeyLookuper`, which explicitly reject log tables
(e.g., PrimaryKeyLookuper.java:65-68). Consider adding an early
`!tableInfo.hasPrimaryKey()` check in `TableLookup.createLookuper()` to throw a
"Log table ... doesn't support lookup"-style exception instead, and adjust
tests accordingly.
##########
fluss-client/src/test/java/org/apache/fluss/client/lookup/TableLookupTest.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.lookup;
+
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.SchemaGetter;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+
+/** Tests for {@link TableLookup}. */
+public class TableLookupTest {
+
+ @Test
+ void testCreateLookuperWithoutLookupColumns() {
+ // Test default behavior: null lookupColumnNames should create
PrimaryKeyLookuper
+ TableInfo tableInfo = createTestTableInfo(false, false);
+ TableLookup tableLookup = createTableLookup(tableInfo, null);
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrimaryKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithPrimaryKey() {
+ // Test: lookupBy with complete primary key should create
PrimaryKeyLookuper
+ TableInfo tableInfo = createTestTableInfo(false, false);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Arrays.asList("a", "c", "d"));
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrimaryKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithPrefixKeyNonPartitioned() {
+ // Test: lookupBy with prefix key (non-partitioned) should create
PrefixKeyLookuper
+ TableInfo tableInfo = createTestTableInfo(false, false);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Collections.singletonList("a"));
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrefixKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithPrefixKeyPartitioned() {
+ // Test: lookupBy with prefix key (partitioned) should create
PrefixKeyLookuper
+ // Primary key: [a, c, d], Partition key: [d], Bucket key: [a]
+ // Lookup columns: [a, d] (bucket key + partition key)
+ TableInfo tableInfo = createTestTableInfo(true, false);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Arrays.asList("a", "d"));
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrefixKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithPrimaryKeyPartitioned() {
+ // Test: lookupBy with complete primary key (partitioned) should
create PrimaryKeyLookuper
+ TableInfo tableInfo = createTestTableInfo(true, true);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Arrays.asList("a", "c", "d"));
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrimaryKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithInvalidLookupColumns() {
+ // Test: invalid lookup columns should throw IllegalArgumentException
+ TableInfo tableInfo = createTestTableInfo(false, false);
+
+ // Test with columns that are neither primary key nor valid prefix key
+ TableLookup tableLookup1 = createTableLookup(tableInfo,
Arrays.asList("b", "c"));
+ assertThatThrownBy(tableLookup1::createLookuper)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid lookup columns");
+
+ // Test with partial primary key but not a valid prefix
+ TableLookup tableLookup2 = createTableLookup(tableInfo,
Arrays.asList("c", "d"));
+ assertThatThrownBy(tableLookup2::createLookuper)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid lookup columns");
+ }
+
+ @Test
+ void testCreateLookuperWithOutOfOrderPrimaryKey() {
+ // Test: out-of-order primary key columns should fail
+ TableInfo tableInfo = createTestTableInfo(false, false);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Arrays.asList("c", "a", "d"));
+
+ assertThatThrownBy(tableLookup::createLookuper)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid lookup columns");
+ }
+
+ @Test
+ void testCreateLookuperWithEmptyLookupColumns() {
+ // Test: empty lookup columns should fail
+ TableInfo tableInfo = createTestTableInfo(false, false);
+ TableLookup tableLookup = createTableLookup(tableInfo,
Collections.emptyList());
+
+ assertThatThrownBy(tableLookup::createLookuper)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid lookup columns");
+ }
+
+ @Test
+ void testCreateLookuperWithTwoColumnPrefix() {
+ // Test: two column prefix key
+ // Primary key: [a, c, d], Bucket key: [a, c]
+ // Lookup columns: [a, c]
+ TableInfo tableInfo =
createTestTableInfoWithBucketKey(Arrays.asList("a", "c"));
+ TableLookup tableLookup = createTableLookup(tableInfo,
Arrays.asList("a", "c"));
+
+ Lookuper lookuper = tableLookup.createLookuper();
+ assertThat(lookuper).isInstanceOf(PrefixKeyLookuper.class);
+ }
+
+ @Test
+ void testCreateLookuperWithLogTable() {
+ // Test: log table (no primary key) should reject any lookup
+ TableInfo logTableInfo = createLogTableInfo();
+ TableLookup tableLookup = createTableLookup(logTableInfo,
Collections.singletonList("a"));
+
+ assertThatThrownBy(tableLookup::createLookuper)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid lookup columns");
Review Comment:
This test asserts a generic "Invalid lookup columns" message for log tables.
The lookup implementations already use a dedicated log-table error (e.g.,
PrimaryKeyLookuper: "Log table ... doesn't support lookup"), which is more
accurate and easier to diagnose. Consider asserting the log-table-specific
message here (and updating `TableLookup.createLookuper()` to surface it
consistently when `!tableInfo.hasPrimaryKey()`).
```suggestion
.hasMessageContaining("Log table");
```
##########
fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java:
##########
@@ -64,12 +65,53 @@ public Lookup lookupBy(List<String> lookupColumnNames) {
@Override
public Lookuper createLookuper() {
- if (lookupColumnNames == null) {
+ if (lookupColumnNames == null || isPrimaryKey(lookupColumnNames)) {
return new PrimaryKeyLookuper(tableInfo, schemaGetter,
metadataUpdater, lookupClient);
- } else {
+ }
+
+ if (isPrefixKey(lookupColumnNames)) {
return new PrefixKeyLookuper(
tableInfo, schemaGetter, metadataUpdater, lookupClient,
lookupColumnNames);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid lookup columns %s for table '%s'. "
+ + "Lookup columns must be either the
complete primary key %s "
+ + "or a valid prefix key.",
+ lookupColumnNames,
+ tableInfo.getTablePath(),
+ tableInfo.getPrimaryKeys()));
+ }
+ }
+
+ private boolean isPrimaryKey(List<String> lookupColumns) {
+ return lookupColumns.equals(tableInfo.getPrimaryKeys());
+ }
+
+ private boolean isPrefixKey(List<String> lookupColumns) {
+ if (!tableInfo.hasPrimaryKey()) {
+ return false;
+ }
+
+ List<String> physicalLookupColumns =
+ lookupColumns.stream()
+ .filter(col ->
!tableInfo.getPartitionKeys().contains(col))
+ .collect(Collectors.toList());
+
+ List<String> physicalPrimaryKeys = tableInfo.getPhysicalPrimaryKeys();
+
+ if (physicalLookupColumns.isEmpty()
+ || physicalLookupColumns.size() >= physicalPrimaryKeys.size())
{
+ return false;
+ }
+
+ for (int i = 0; i < physicalLookupColumns.size(); i++) {
+ if
(!physicalLookupColumns.get(i).equals(physicalPrimaryKeys.get(i))) {
+ return false;
+ }
}
+
Review Comment:
`isPrefixKey(...)` currently only checks whether the *non-partition* lookup
columns form a prefix of `tableInfo.getPhysicalPrimaryKeys()`. However,
`PrefixKeyLookuper` actually requires the lookup columns to match the bucket
keys (in order) and include all partition keys
(PrefixKeyLookuper.java:127-149). As a result, `TableLookup` may route to
`PrefixKeyLookuper` for lookup columns that can never work, only to have the
constructor throw. Suggest aligning `isPrefixKey` with
`PrefixKeyLookuper.validatePrefixLookup` semantics (bucket keys + required
partition fields), so routing and error messages are deterministic.
```suggestion
// PrefixKeyLookuper requires that:
// 1) The lookup columns start with the bucket keys, in order; and
// 2) The lookup columns include all partition key columns.
//
// We mirror that here so routing and error messages are
deterministic.
List<String> bucketKeys = tableInfo.getBucketKeys();
if (bucketKeys == null || bucketKeys.isEmpty()) {
return false;
}
// Lookup columns must at least cover all bucket keys as a leading
prefix.
if (lookupColumns.size() < bucketKeys.size()) {
return false;
}
for (int i = 0; i < bucketKeys.size(); i++) {
if (!bucketKeys.get(i).equals(lookupColumns.get(i))) {
return false;
}
}
// Lookup columns must include all partition keys (order does not
matter).
List<String> partitionKeys = tableInfo.getPartitionKeys();
if (!lookupColumns.containsAll(partitionKeys)) {
return false;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]