wuchong commented on code in PR #2485:
URL: https://github.com/apache/fluss/pull/2485#discussion_r2751004598
##########
fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java:
##########
@@ -793,6 +802,123 @@ void testLookup() throws Exception {
});
}
+ @Test
+ void testLookupWithInsertIfNotExists() throws Exception {
+ TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1);
+ makeKvTableAsLeader(DATA1_TABLE_ID_PK, DATA1_TABLE_PATH_PK,
tb.getBucket());
+
+ CompactedKeyEncoder keyEncoder = new
CompactedKeyEncoder(DATA1_ROW_TYPE, new int[] {0});
+
+ // Scenario 1: All keys missing - should insert and return new values
+ byte[] key100 = keyEncoder.encodeKey(row(new Object[] {100}));
+ byte[] key200 = keyEncoder.encodeKey(row(new Object[] {200}));
+
+ List<byte[]> inserted = lookupWithInsert(tb, Arrays.asList(key100,
key200)).lookupValues();
+ assertThat(inserted).hasSize(2).allMatch(Objects::nonNull);
+ verifyLookup(tb, key100, inserted.get(0));
+ verifyLookup(tb, key200, inserted.get(1));
+
+ // Scenario 2: All keys exist - should return existing values without
modification
+ List<byte[]> existing = lookupWithInsert(tb, Arrays.asList(key100,
key200)).lookupValues();
+ assertThat(existing).containsExactlyElementsOf(inserted);
+
+ // Scenario 3: Mixed - key100 exists, key300 missing
+ byte[] key300 = keyEncoder.encodeKey(row(new Object[] {300}));
+ List<byte[]> mixed = lookupWithInsert(tb, Arrays.asList(key100,
key300)).lookupValues();
+ assertThat(mixed.get(0)).isEqualTo(inserted.get(0)); // existing
+ assertThat(mixed.get(1)).isNotNull(); // newly inserted
+ verifyLookup(tb, key300, mixed.get(1));
+ }
+
+ @Test
+ void testLookupWithInsertIfNotExistsAutoIncrement() throws Exception {
Review Comment:
Could you add an additional test for **concurrent lookups** with
**insert-if-not-exists** and **auto-increment enabled**?
The test should:
- Spawn multiple threads (e.g., 3) that concurrently perform
`lookupWithInsert` operations on the **same set of keys** (e.g., 100, 200, 300).
- Verify that all threads receive **consistent lookup results** (i.e., the
same inserted values for each key).
- Confirm that exactly **3 changelog entries** are written to the log
tablet—one per unique key.
This will validate that concurrent put-and-relookup operations behave
correctly under contention, ensuring idempotency and consistency when
auto-increment and conditional inserts are enabled.
##########
fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode;
+
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Interface for decoding key bytes back to {@link InternalRow}.
+ *
+ * <p>This interface provides functionality to decode binary key bytes into
internal row
+ * representation, typically used for primary key decoding in KV tables.
+ */
+public interface KeyDecoder {
+ /** Decode key bytes to a row containing only key fields, without non-key
fields. */
+ InternalRow decodeKey(byte[] keyBytes);
+
+ static KeyDecoder ofPrimaryKeyDecoder(
Review Comment:
Add Javadoc to describe this method's behavior, and include a reference link
to `org.apache.fluss.row.encode.KeyEncoder#ofPrimaryKeyEncoder`, as their
behaviors are aligned.
##########
fluss-common/src/main/java/org/apache/fluss/row/decode/KeyDecoder.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.row.decode;
+
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Interface for decoding key bytes back to {@link InternalRow}.
+ *
+ * <p>This interface provides functionality to decode binary key bytes into
internal row
+ * representation, typically used for primary key decoding in KV tables.
+ */
+public interface KeyDecoder {
+ /** Decode key bytes to a row containing only key fields, without non-key
fields. */
+ InternalRow decodeKey(byte[] keyBytes);
+
+ static KeyDecoder ofPrimaryKeyDecoder(
+ RowType rowType,
+ List<String> keyFields,
+ short kvFormatVersion,
+ @Nullable DataLakeFormat lakeFormat,
+ boolean isDefaultBucketKey) {
+ if (kvFormatVersion == 1 || (kvFormatVersion == 2 &&
isDefaultBucketKey)) {
+ if (lakeFormat == null || lakeFormat == DataLakeFormat.LANCE) {
+ return CompactedKeyDecoder.createKeyDecoder(rowType,
keyFields);
+ }
+ if (lakeFormat == DataLakeFormat.PAIMON) {
+ // TODO: Implement key decoding support for Paimon lake format
+ throw new UnsupportedOperationException(
+ "Paimon lake format does not support key decoding");
Review Comment:
How much effort would it take to support this? I believe it’s
essential—since most clusters will enable lakehouse integration, this will
become a common and critical path.
If this requires significant work, please create a **blocker issue** to
track it. And please add tests for this case as well.
##########
fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java:
##########
@@ -633,8 +693,84 @@ public void lookups(
tb, new LookupResultForBucket(tb,
ApiError.fromThrowable(e)));
}
}
+
+ if (insertIfNotExists) {
+ checkArgument(
+ timeoutMs != null && requiredAcks != null,
+ "timeoutMs and requiredAcks must be set");
+ Map<TableBucket, MissingKeysContext> entriesPerBucketToInsert =
new HashMap<>();
+ Map<TableBucket, KvRecordBatch> produceEntryData = new HashMap<>();
+ collectMissingKeysForInsert(
+ entriesPerBucket,
+ lookupResultForBucketMap,
+ entriesPerBucketToInsert,
+ produceEntryData);
+ if (!produceEntryData.isEmpty()) {
+ // TODO: Performance optimization: during
lookup-with-insert-if-not-exists flow,
+ // the original key bytes are wrapped in KeyRecordBatch, then
during putRecordsToKv
+ // they are decoded to rows and immediately re-encoded back to
key bytes, causing
+ // redundant encode/decode overhead.
+ putRecordsToKv(
+ timeoutMs,
+ requiredAcks,
+ produceEntryData,
+ null,
Review Comment:
This is incorrect. We should use **partial updates** to modify only the
primary key fields. Otherwise, non-primary-key columns—including auto-increment
fields—will be overwritten with `null`. This issue manifests when multiple
threads concurrently call `lookupAndInsert` on the same keys.
To reproduce this, I’ve added a test:
`testConcurrentLookupWithInsertIfNotExistsAutoIncrement`.
Additionally, we should enforce a validation in `PutRequest`: when
performing a put operation, **auto-increment fields must be excluded from the
target columns**. This ensures that auto-incremented values are never
accidentally overwritten during updates.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]