This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit b99bf0b623dd2999ba4d268b99e44a9e8071026b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 14:02:03 2025 +0800

    [flink] Introduce 'memory' type for 'lookup.cache' to in-memory cache 
(#5748)
---
 .../generated/flink_connector_configuration.html   |   2 +-
 .../paimon/crosspartition/GlobalIndexAssigner.java |  16 ++--
 .../java/org/apache/paimon/lookup/BulkLoader.java  | 101 +--------------------
 .../java/org/apache/paimon/lookup/ByteArray.java   |  61 +++++++++++++
 .../org/apache/paimon/lookup/ListBulkLoader.java   |  27 ++++++
 .../java/org/apache/paimon/lookup/ListState.java   |  32 +++++++
 .../java/org/apache/paimon/lookup/SetState.java    |  35 +++++++
 .../main/java/org/apache/paimon/lookup/State.java  |  31 +++++++
 .../org/apache/paimon/lookup/StateFactory.java     |  51 +++++++++++
 .../org/apache/paimon/lookup/ValueBulkLoader.java  |  25 +++++
 .../java/org/apache/paimon/lookup/ValueState.java  |  36 ++++++++
 .../paimon/lookup/memory/InMemoryListState.java    |  77 ++++++++++++++++
 .../paimon/lookup/memory/InMemorySetState.java     |  69 ++++++++++++++
 .../apache/paimon/lookup/memory/InMemoryState.java |  64 +++++++++++++
 .../paimon/lookup/memory/InMemoryStateFactory.java |  66 ++++++++++++++
 .../paimon/lookup/memory/InMemoryValueState.java   |  76 ++++++++++++++++
 .../RocksDBBulkLoader.java}                        |  33 +++++--
 .../lookup/{ => rocksdb}/RocksDBListState.java     |  17 +---
 .../lookup/{ => rocksdb}/RocksDBOptions.java       |   2 +-
 .../lookup/{ => rocksdb}/RocksDBSetState.java      |  10 +-
 .../paimon/lookup/{ => rocksdb}/RocksDBState.java  |  52 ++++-------
 .../lookup/{ => rocksdb}/RocksDBStateFactory.java  |  14 ++-
 .../lookup/{ => rocksdb}/RocksDBValueState.java    |  21 ++---
 .../configuration/ConfigOptionsDocGenerator.java   |   2 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |   5 +-
 .../flink/lookup/FileStoreLookupFunction.java      |   4 +-
 .../paimon/flink/lookup/FullCacheLookupTable.java  |  38 +++++---
 .../flink/lookup/NoPrimaryKeyLookupTable.java      |  19 ++--
 .../paimon/flink/lookup/PrimaryKeyLookupTable.java |  11 ++-
 .../flink/lookup/SecondaryIndexLookupTable.java    |   4 +-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |   6 +-
 .../flink/lookup/FileStoreLookupFunctionTest.java  |   2 +-
 .../paimon/flink/lookup/LookupTableTest.java       |  90 +++++++++++-------
 .../paimon/flink/lookup/RocksDBListStateTest.java  |   4 +-
 34 files changed, 856 insertions(+), 247 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 1a67d4e60f..fa9b8e4d9c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -60,7 +60,7 @@ under the License.
             <td><h5>lookup.cache</h5></td>
             <td style="word-wrap: break-word;">AUTO</td>
             <td><p>Enum</p></td>
-            <td>The cache mode of lookup join.<br /><br />Possible 
values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
+            <td>The cache mode of lookup join.<br /><br />Possible 
values:<ul><li>"AUTO"</li><li>"FULL"</li><li>"MEMORY"</li></ul></td>
         </tr>
         <tr>
             <td><h5>lookup.dynamic-partition.refresh-interval</h5></td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 892e27e966..fe14048310 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -28,11 +28,11 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.RowBuffer;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBOptions;
-import org.apache.paimon.lookup.RocksDBState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
-import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBValueState;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -71,7 +71,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
-import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
+import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.BLOCK_CACHE_SIZE;
 import static org.apache.paimon.utils.ListUtils.pickRandomly;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -210,14 +210,14 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         bootstrapRecords.complete();
         boolean isEmpty = true;
         if (bootstrapKeys.size() > 0) {
-            BulkLoader bulkLoader = keyIndex.createBulkLoader();
+            RocksDBBulkLoader bulkLoader = keyIndex.createBulkLoader();
             MutableObjectIterator<BinaryRow> keyIterator = 
bootstrapKeys.sortedIterator();
             BinaryRow row = new BinaryRow(2);
             try {
                 while ((row = keyIterator.next(row)) != null) {
                     bulkLoader.write(row.getBinary(0), row.getBinary(1));
                 }
-            } catch (BulkLoader.WriteException e) {
+            } catch (RocksDBBulkLoader.WriteException e) {
                 throw new RuntimeException(
                         "Exception in bulkLoad, the most suspicious reason is 
that "
                                 + "your data contains duplicates, please check 
your sink table. "
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
index 0bcdbbb5d2..f0ea474f52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
@@ -18,106 +18,13 @@
 
 package org.apache.paimon.lookup;
 
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.EnvOptions;
-import org.rocksdb.IngestExternalFileOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.SstFileWriter;
-import org.rocksdb.TtlDB;
+/** Bulk loader for {@link State}, incoming keys must be sorted, and there 
must be no repetition. */
+public interface BulkLoader {
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/** Bulk loader for RocksDB. */
-public class BulkLoader {
-
-    private final String uuid = UUID.randomUUID().toString();
-
-    private final ColumnFamilyHandle columnFamily;
-    private final String path;
-    private final RocksDB db;
-    private final boolean isTtlEnabled;
-    private final Options options;
-    private final List<String> files = new ArrayList<>();
-    private final int currentTimeSeconds;
-
-    private SstFileWriter writer = null;
-    private int sstIndex = 0;
-    private long recordNum = 0;
-
-    public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle 
columnFamily, String path) {
-        this.db = db;
-        this.isTtlEnabled = db instanceof TtlDB;
-        this.options = options;
-        this.columnFamily = columnFamily;
-        this.path = path;
-        this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
-    }
-
-    public void write(byte[] key, byte[] value) throws WriteException {
-        try {
-            if (writer == null) {
-                writer = new SstFileWriter(new EnvOptions(), options);
-                String path = new File(this.path, "sst-" + uuid + "-" + 
(sstIndex++)).getPath();
-                writer.open(path);
-                files.add(path);
-            }
-
-            if (isTtlEnabled) {
-                value = appendTimestamp(value);
-            }
-
-            try {
-                writer.put(key, value);
-            } catch (RocksDBException e) {
-                throw new WriteException(e);
-            }
-
-            recordNum++;
-            if (recordNum % 1000 == 0 && writer.fileSize() >= 
options.targetFileSizeBase()) {
-                writer.finish();
-                writer.close();
-                writer = null;
-                recordNum = 0;
-            }
-        } catch (RocksDBException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private byte[] appendTimestamp(byte[] value) {
-        byte[] newValue = new byte[value.length + 4];
-        System.arraycopy(value, 0, newValue, 0, value.length);
-        newValue[value.length] = (byte) (currentTimeSeconds & 0xff);
-        newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff);
-        newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) & 
0xff);
-        newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) & 
0xff);
-        return newValue;
-    }
-
-    public void finish() {
-        try {
-            if (writer != null) {
-                writer.finish();
-                writer.close();
-            }
-
-            if (files.size() > 0) {
-                IngestExternalFileOptions ingestOptions = new 
IngestExternalFileOptions();
-                db.ingestExternalFile(columnFamily, files, ingestOptions);
-                ingestOptions.close();
-            }
-        } catch (RocksDBException e) {
-            throw new RuntimeException(e);
-        }
-    }
+    void finish();
 
     /** Exception during writing. */
-    public static class WriteException extends Exception {
+    class WriteException extends Exception {
         public WriteException(Throwable cause) {
             super(cause);
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java
new file mode 100644
index 0000000000..95a279d20f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import org.apache.paimon.utils.SortUtil;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+
+/** A class wraps byte[] to implement equals and hashCode. */
+public class ByteArray implements Comparable<ByteArray> {
+
+    public final byte[] bytes;
+
+    public ByteArray(byte[] bytes) {
+        this.bytes = bytes;
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(bytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ByteArray byteArray = (ByteArray) o;
+        return Arrays.equals(bytes, byteArray.bytes);
+    }
+
+    public static ByteArray wrapBytes(byte[] bytes) {
+        return new ByteArray(bytes);
+    }
+
+    @Override
+    public int compareTo(@NotNull ByteArray o) {
+        return SortUtil.compareBinary(bytes, o.bytes);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java
new file mode 100644
index 0000000000..6496d5186b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.util.List;
+
+/** List bulk loader to load key values, incoming keys must be sorted. */
+public interface ListBulkLoader extends BulkLoader {
+
+    void write(byte[] key, List<byte[]> value) throws WriteException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java
new file mode 100644
index 0000000000..8e3d7c2c8b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+/** {@link State} interface for list state in Operations. */
+public interface ListState<K, V> extends State<K, V> {
+
+    void add(K key, V value) throws IOException;
+
+    List<V> get(K key) throws IOException;
+
+    ListBulkLoader createBulkLoader();
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java
new file mode 100644
index 0000000000..7874fb7178
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link State} interface for set state in Operations, the values must be 
sorted by byte array to
+ * be returned.
+ */
+public interface SetState<K, V> extends State<K, V> {
+
+    List<V> get(K key) throws IOException;
+
+    void retract(K key, V value) throws IOException;
+
+    void add(K key, V value) throws IOException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/State.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java
new file mode 100644
index 0000000000..e4174a9e21
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+
+/** Interface that different types of state must implement. */
+public interface State<K, V> {
+
+    byte[] serializeKey(K key) throws IOException;
+
+    byte[] serializeValue(V value) throws IOException;
+
+    V deserializeValue(byte[] valueBytes) throws IOException;
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java
new file mode 100644
index 0000000000..478c6f9f3b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import org.apache.paimon.data.serializer.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** State factory to create {@link State}. */
+public interface StateFactory extends Closeable {
+
+    <K, V> ValueState<K, V> valueState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize)
+            throws IOException;
+
+    <K, V> SetState<K, V> setState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize)
+            throws IOException;
+
+    <K, V> ListState<K, V> listState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize)
+            throws IOException;
+
+    boolean preferBulkLoad();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java
new file mode 100644
index 0000000000..eef737078f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+/** Value bulk loader to load key values, incoming keys must be sorted. */
+public interface ValueBulkLoader extends BulkLoader {
+
+    void write(byte[] key, byte[] value) throws WriteException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java
new file mode 100644
index 0000000000..f04e456b3b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** {@link State} interface for single-value state. The value can be deleted 
or updated. */
+public interface ValueState<K, V> extends State<K, V> {
+
+    @Nullable
+    V get(K key) throws IOException;
+
+    void put(K key, V value) throws IOException;
+
+    void delete(K key) throws IOException;
+
+    ValueBulkLoader createBulkLoader();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
new file mode 100644
index 0000000000..992fbc1abd
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ListState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory list state. */
+public class InMemoryListState<K, V> extends InMemoryState<K, V> implements 
ListState<K, V> {
+
+    private final Map<ByteArray, List<byte[]>> values;
+
+    public InMemoryListState(Serializer<K> keySerializer, Serializer<V> 
valueSerializer) {
+        super(keySerializer, valueSerializer);
+        this.values = new HashMap<>();
+    }
+
+    @Override
+    public void add(K key, V value) throws IOException {
+        byte[] keyBytes = serializeKey(key);
+        byte[] valueBytes = serializeValue(value);
+        values.computeIfAbsent(wrapBytes(keyBytes), k -> new 
ArrayList<>()).add(valueBytes);
+    }
+
+    @Override
+    public List<V> get(K key) throws IOException {
+        List<byte[]> list = this.values.get(wrapBytes(serializeKey(key)));
+        List<V> result = new ArrayList<>();
+        if (list != null) {
+            for (byte[] value : list) {
+                result.add(deserializeValue(value));
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public ListBulkLoader createBulkLoader() {
+        return new ListBulkLoader() {
+
+            @Override
+            public void write(byte[] key, List<byte[]> value) {
+                // copy the list, outside will reuse list
+                values.put(wrapBytes(key), new ArrayList<>(value));
+            }
+
+            @Override
+            public void finish() {}
+        };
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
new file mode 100644
index 0000000000..f694a57643
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.SetState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory set state. */
+public class InMemorySetState<K, V> extends InMemoryState<K, V> implements 
SetState<K, V> {
+
+    private final Map<ByteArray, TreeSet<ByteArray>> values;
+
+    public InMemorySetState(Serializer<K> keySerializer, Serializer<V> 
valueSerializer) {
+        super(keySerializer, valueSerializer);
+        this.values = new HashMap<>();
+    }
+
+    @Override
+    public List<V> get(K key) throws IOException {
+        Set<ByteArray> set = values.get(wrapBytes(serializeKey(key)));
+        List<V> result = new ArrayList<>();
+        if (set != null) {
+            for (ByteArray value : set) {
+                result.add(deserializeValue(value.bytes));
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void retract(K key, V value) throws IOException {
+        
values.get(wrapBytes(serializeKey(key))).remove(wrapBytes(serializeValue(value)));
+    }
+
+    @Override
+    public void add(K key, V value) throws IOException {
+        byte[] keyBytes = serializeKey(key);
+        byte[] valueBytes = serializeValue(value);
+        values.computeIfAbsent(wrapBytes(keyBytes), k -> new TreeSet<>())
+                .add(wrapBytes(valueBytes));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java
new file mode 100644
index 0000000000..6b79776a70
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.lookup.State;
+
+import java.io.IOException;
+
+/** In-memory state. */
+public abstract class InMemoryState<K, V> implements State<K, V> {
+
+    protected final Serializer<K> keySerializer;
+    protected final Serializer<V> valueSerializer;
+    protected final DataOutputSerializer keyOutView;
+    protected final DataInputDeserializer valueInputView;
+    protected final DataOutputSerializer valueOutputView;
+
+    public InMemoryState(Serializer<K> keySerializer, Serializer<V> 
valueSerializer) {
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyOutView = new DataOutputSerializer(32);
+        this.valueInputView = new DataInputDeserializer();
+        this.valueOutputView = new DataOutputSerializer(32);
+    }
+
+    @Override
+    public byte[] serializeKey(K key) throws IOException {
+        keyOutView.clear();
+        keySerializer.serialize(key, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    @Override
+    public byte[] serializeValue(V value) throws IOException {
+        valueOutputView.clear();
+        valueSerializer.serialize(value, valueOutputView);
+        return valueOutputView.getCopyOfBuffer();
+    }
+
+    @Override
+    public V deserializeValue(byte[] valueBytes) throws IOException {
+        valueInputView.setBuffer(valueBytes);
+        return valueSerializer.deserialize(valueInputView);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
new file mode 100644
index 0000000000..77aa1cf17e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ListState;
+import org.apache.paimon.lookup.SetState;
+import org.apache.paimon.lookup.StateFactory;
+import org.apache.paimon.lookup.ValueState;
+
+import java.io.IOException;
+
+/** Factory to create in-memory state. */
+public class InMemoryStateFactory implements StateFactory {
+
+    @Override
+    public <K, V> ValueState<K, V> valueState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize) {
+        return new InMemoryValueState<>(keySerializer, valueSerializer);
+    }
+
+    @Override
+    public <K, V> SetState<K, V> setState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize) {
+        return new InMemorySetState<>(keySerializer, valueSerializer);
+    }
+
+    @Override
+    public <K, V> ListState<K, V> listState(
+            String name,
+            Serializer<K> keySerializer,
+            Serializer<V> valueSerializer,
+            long lruCacheSize) {
+        return new InMemoryListState<>(keySerializer, valueSerializer);
+    }
+
+    @Override
+    public boolean preferBulkLoad() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
new file mode 100644
index 0000000000..07db364ef1
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.lookup.ValueState;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory value state. */
+public class InMemoryValueState<K, V> extends InMemoryState<K, V> implements 
ValueState<K, V> {
+
+    private final Map<ByteArray, byte[]> values;
+
+    public InMemoryValueState(Serializer<K> keySerializer, Serializer<V> 
valueSerializer) {
+        super(keySerializer, valueSerializer);
+        this.values = new HashMap<>();
+    }
+
+    @Override
+    public @Nullable V get(K key) throws IOException {
+        byte[] bytes = values.get(wrapBytes(serializeKey(key)));
+        if (bytes == null) {
+            return null;
+        }
+        return deserializeValue(bytes);
+    }
+
+    @Override
+    public void put(K key, V value) throws IOException {
+        values.put(wrapBytes(serializeKey(key)), serializeValue(value));
+    }
+
+    @Override
+    public void delete(K key) throws IOException {
+        values.remove(wrapBytes(serializeKey(key)));
+    }
+
+    @Override
+    public ValueBulkLoader createBulkLoader() {
+        return new ValueBulkLoader() {
+
+            @Override
+            public void write(byte[] key, byte[] value) {
+                values.put(wrapBytes(key), value);
+            }
+
+            @Override
+            public void finish() {}
+        };
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
similarity index 82%
copy from paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
copy to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
index 0bcdbbb5d2..8278376f73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
@@ -16,7 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
+
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.utils.ListDelimitedSerializer;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.EnvOptions;
@@ -28,14 +32,16 @@ import org.rocksdb.SstFileWriter;
 import org.rocksdb.TtlDB;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
 /** Bulk loader for RocksDB. */
-public class BulkLoader {
+public class RocksDBBulkLoader implements ValueBulkLoader, ListBulkLoader {
 
     private final String uuid = UUID.randomUUID().toString();
+    private final ListDelimitedSerializer listSerializer = new 
ListDelimitedSerializer();
 
     private final ColumnFamilyHandle columnFamily;
     private final String path;
@@ -49,7 +55,8 @@ public class BulkLoader {
     private int sstIndex = 0;
     private long recordNum = 0;
 
-    public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle 
columnFamily, String path) {
+    public RocksDBBulkLoader(
+            RocksDB db, Options options, ColumnFamilyHandle columnFamily, 
String path) {
         this.db = db;
         this.isTtlEnabled = db instanceof TtlDB;
         this.options = options;
@@ -58,6 +65,7 @@ public class BulkLoader {
         this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
     }
 
+    @Override
     public void write(byte[] key, byte[] value) throws WriteException {
         try {
             if (writer == null) {
@@ -89,6 +97,17 @@ public class BulkLoader {
         }
     }
 
+    @Override
+    public void write(byte[] key, List<byte[]> value) throws WriteException {
+        byte[] bytes;
+        try {
+            bytes = listSerializer.serializeList(value);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        write(key, bytes);
+    }
+
     private byte[] appendTimestamp(byte[] value) {
         byte[] newValue = new byte[value.length + 4];
         System.arraycopy(value, 0, newValue, 0, value.length);
@@ -99,6 +118,7 @@ public class BulkLoader {
         return newValue;
     }
 
+    @Override
     public void finish() {
         try {
             if (writer != null) {
@@ -115,11 +135,4 @@ public class BulkLoader {
             throw new RuntimeException(e);
         }
     }
-
-    /** Exception during writing. */
-    public static class WriteException extends Exception {
-        public WriteException(Throwable cause) {
-            super(cause);
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
similarity index 87%
rename from 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
index 7fe29d209a..70716c40cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ListState;
 import org.apache.paimon.utils.ListDelimitedSerializer;
 
 import org.rocksdb.ColumnFamilyHandle;
@@ -29,7 +30,7 @@ import java.util.Collections;
 import java.util.List;
 
 /** RocksDB state for key -> List of value. */
-public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> {
+public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> 
implements ListState<K, V> {
 
     private final ListDelimitedSerializer listSerializer = new 
ListDelimitedSerializer();
 
@@ -42,6 +43,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K, 
V, List<V>> {
         super(stateFactory, columnFamily, keySerializer, valueSerializer, 
lruCacheSize);
     }
 
+    @Override
     public void add(K key, V value) throws IOException {
         byte[] keyBytes = serializeKey(key);
         byte[] valueBytes = serializeValue(value);
@@ -53,6 +55,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K, 
V, List<V>> {
         cache.invalidate(wrap(keyBytes));
     }
 
+    @Override
     public List<V> get(K key) throws IOException {
         byte[] keyBytes = serializeKey(key);
         return cache.get(
@@ -71,14 +74,4 @@ public class RocksDBListState<K, V> extends RocksDBState<K, 
V, List<V>> {
                     return rows;
                 });
     }
-
-    public byte[] serializeValue(V value) throws IOException {
-        valueOutputView.clear();
-        valueSerializer.serialize(value, valueOutputView);
-        return valueOutputView.getCopyOfBuffer();
-    }
-
-    public byte[] serializeList(List<byte[]> valueList) throws IOException {
-        return listSerializer.serializeList(valueList);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
similarity index 99%
rename from 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
index e8c3fecdf8..2cde6d6e7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.annotation.Documentation;
 import org.apache.paimon.options.ConfigOption;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
similarity index 95%
rename from 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
index 5c06cbd51a..9f654e152c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.SetState;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDBException;
@@ -32,7 +34,8 @@ import java.util.List;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Rocksdb state for key -> Set values. */
-public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> {
+public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>>
+        implements SetState<K, V> {
 
     private static final byte[] EMPTY = new byte[0];
 
@@ -45,6 +48,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, 
List<byte[]>> {
         super(stateFactory, columnFamily, keySerializer, valueSerializer, 
lruCacheSize);
     }
 
+    @Override
     public List<V> get(K key) throws IOException {
         ByteArray keyBytes = wrap(serializeKey(key));
         List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
@@ -73,6 +77,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, 
List<byte[]>> {
         return values;
     }
 
+    @Override
     public void retract(K key, V value) throws IOException {
         try {
             byte[] bytes = invalidKeyAndGetKVBytes(key, value);
@@ -84,6 +89,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, 
List<byte[]>> {
         }
     }
 
+    @Override
     public void add(K key, V value) throws IOException {
         try {
             byte[] bytes = invalidKeyAndGetKVBytes(key, value);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
similarity index 82%
rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
index 0181917a7a..62257cfd34 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.State;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -37,10 +39,9 @@ import org.rocksdb.WriteOptions;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Arrays;
 
 /** Rocksdb state for key value. */
-public abstract class RocksDBState<K, V, CacheV> {
+public abstract class RocksDBState<K, V, CacheV> implements State<K, V> {
 
     protected final RocksDBStateFactory stateFactory;
 
@@ -85,12 +86,26 @@ public abstract class RocksDBState<K, V, CacheV> {
                         .build();
     }
 
+    @Override
     public byte[] serializeKey(K key) throws IOException {
         keyOutView.clear();
         keySerializer.serialize(key, keyOutView);
         return keyOutView.getCopyOfBuffer();
     }
 
+    @Override
+    public byte[] serializeValue(V value) throws IOException {
+        valueOutputView.clear();
+        valueSerializer.serialize(value, valueOutputView);
+        return valueOutputView.getCopyOfBuffer();
+    }
+
+    @Override
+    public V deserializeValue(byte[] valueBytes) throws IOException {
+        valueInputView.setBuffer(valueBytes);
+        return valueSerializer.deserialize(valueInputView);
+    }
+
     protected ByteArray wrap(byte[] bytes) {
         return new ByteArray(bytes);
     }
@@ -99,8 +114,8 @@ public abstract class RocksDBState<K, V, CacheV> {
         return new Reference(bytes);
     }
 
-    public BulkLoader createBulkLoader() {
-        return new BulkLoader(db, stateFactory.options(), columnFamily, 
stateFactory.path());
+    public RocksDBBulkLoader createBulkLoader() {
+        return new RocksDBBulkLoader(db, stateFactory.options(), columnFamily, 
stateFactory.path());
     }
 
     public static BinaryExternalSortBuffer createBulkLoadSorter(
@@ -117,33 +132,6 @@ public abstract class RocksDBState<K, V, CacheV> {
                 options.sequenceFieldSortOrderIsAscending());
     }
 
-    /** A class wraps byte[] to implement equals and hashCode. */
-    protected static class ByteArray {
-
-        protected final byte[] bytes;
-
-        protected ByteArray(byte[] bytes) {
-            this.bytes = bytes;
-        }
-
-        @Override
-        public int hashCode() {
-            return Arrays.hashCode(bytes);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ByteArray byteArray = (ByteArray) o;
-            return Arrays.equals(bytes, byteArray.bytes);
-        }
-    }
-
     /** A class wraps byte[] to indicate contain or not contain. */
     protected static class Reference {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
similarity index 94%
rename from 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
index 9168fbf4cb..883c82906b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.StateFactory;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -31,13 +32,12 @@ import org.rocksdb.TtlDB;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 
 /** Factory to create state. */
-public class RocksDBStateFactory implements Closeable {
+public class RocksDBStateFactory implements StateFactory {
 
     public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
@@ -85,6 +85,7 @@ public class RocksDBStateFactory implements Closeable {
         return path;
     }
 
+    @Override
     public <K, V> RocksDBValueState<K, V> valueState(
             String name,
             Serializer<K> keySerializer,
@@ -95,6 +96,7 @@ public class RocksDBStateFactory implements Closeable {
                 this, createColumnFamily(name), keySerializer, 
valueSerializer, lruCacheSize);
     }
 
+    @Override
     public <K, V> RocksDBSetState<K, V> setState(
             String name,
             Serializer<K> keySerializer,
@@ -105,6 +107,7 @@ public class RocksDBStateFactory implements Closeable {
                 this, createColumnFamily(name), keySerializer, 
valueSerializer, lruCacheSize);
     }
 
+    @Override
     public <K, V> RocksDBListState<K, V> listState(
             String name,
             Serializer<K> keySerializer,
@@ -116,6 +119,11 @@ public class RocksDBStateFactory implements Closeable {
                 this, createColumnFamily(name), keySerializer, 
valueSerializer, lruCacheSize);
     }
 
+    @Override
+    public boolean preferBulkLoad() {
+        return true;
+    }
+
     private ColumnFamilyHandle createColumnFamily(String name) throws 
IOException {
         try {
             return db.createColumnFamily(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
similarity index 86%
rename from 
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
rename to 
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
index 444ed5ad06..3d8768a9c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
@@ -16,9 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
 
 import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ValueState;
 
 import org.rocksdb.ColumnFamilyHandle;
 
@@ -29,7 +31,8 @@ import java.io.IOException;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Rocksdb state for key -> a single value. */
-public class RocksDBValueState<K, V> extends RocksDBState<K, V, 
RocksDBState.Reference> {
+public class RocksDBValueState<K, V> extends RocksDBState<K, V, 
RocksDBState.Reference>
+        implements ValueState<K, V> {
 
     public RocksDBValueState(
             RocksDBStateFactory stateFactory,
@@ -41,6 +44,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
     }
 
     @Nullable
+    @Override
     public V get(K key) throws IOException {
         try {
             Reference valueRef = get(wrap(serializeKey(key)));
@@ -60,6 +64,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
         return valueRef;
     }
 
+    @Override
     public void put(K key, V value) throws IOException {
         checkArgument(value != null);
 
@@ -73,6 +78,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
         }
     }
 
+    @Override
     public void delete(K key) throws IOException {
         try {
             byte[] keyBytes = serializeKey(key);
@@ -85,15 +91,4 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, 
V, RocksDBState.Ref
             throw new IOException(e);
         }
     }
-
-    public V deserializeValue(byte[] valueBytes) throws IOException {
-        valueInputView.setBuffer(valueBytes);
-        return valueSerializer.deserialize(valueInputView);
-    }
-
-    public byte[] serializeValue(V value) throws IOException {
-        valueOutputView.clear();
-        valueSerializer.serialize(value, valueOutputView);
-        return valueOutputView.getCopyOfBuffer();
-    }
 }
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 71792962b8..219e7df99c 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -74,7 +74,7 @@ public class ConfigOptionsDocGenerator {
             new OptionsClassLocation[] {
                 new OptionsClassLocation("paimon-api", 
"org.apache.paimon.options"),
                 new OptionsClassLocation("paimon-api", "org.apache.paimon"),
-                new OptionsClassLocation("paimon-core", 
"org.apache.paimon.lookup"),
+                new OptionsClassLocation("paimon-core", 
"org.apache.paimon.lookup.rocksdb"),
                 new OptionsClassLocation("paimon-core", 
"org.apache.paimon.jdbc"),
                 new OptionsClassLocation("paimon-core", 
"org.apache.paimon.table"),
                 new OptionsClassLocation("paimon-core", 
"org.apache.paimon.iceberg"),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ed8d9a7ac8..d524920e82 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -541,7 +541,10 @@ public class FlinkConnectorOptions {
         AUTO,
 
         /** Use full caching mode. */
-        FULL
+        FULL,
+
+        /** Use in-memory caching mode. */
+        MEMORY
     }
 
     /** Watermark emit strategy for scan. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index dec278fb70..e3e15c4ce6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -71,8 +71,8 @@ import static 
org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
-import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
-import static 
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
+import static 
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static 
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A lookup {@link TableFunction} for file store. */
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 4154b6742c..27795a00ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -24,9 +24,11 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.StateFactory;
+import org.apache.paimon.lookup.memory.InMemoryStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
+import org.apache.paimon.lookup.rocksdb.RocksDBState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -64,11 +66,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
 
 /** Lookup table of full cache. */
 public abstract class FullCacheLookupTable implements LookupTable {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(FullCacheLookupTable.class);
 
     protected final Object lock = new Object();
@@ -79,7 +84,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     @Nullable protected final FieldsComparator userDefinedSeqComparator;
     protected final int appendUdsFieldNumber;
 
-    protected RocksDBStateFactory stateFactory;
+    protected StateFactory stateFactory;
     @Nullable private ExecutorService refreshExecutor;
     private final AtomicReference<Exception> cachedException;
     private final int maxPendingSnapshotCount;
@@ -143,11 +148,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     }
 
     protected void init() throws Exception {
-        this.stateFactory =
-                new RocksDBStateFactory(
-                        context.tempPath.toString(),
-                        context.table.coreOptions().toConfiguration(),
-                        null);
+        this.stateFactory = createStateFactory();
         this.refreshExecutor =
                 this.refreshAsync
                         ? Executors.newSingleThreadExecutor(
@@ -158,6 +159,16 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         : null;
     }
 
+    private StateFactory createStateFactory() throws IOException {
+        String diskDir = context.tempPath.toString();
+        Options options = context.table.coreOptions().toConfiguration();
+        if (options.get(LOOKUP_CACHE_MODE) == MEMORY) {
+            return new InMemoryStateFactory();
+        } else {
+            return new RocksDBStateFactory(diskDir, options, null);
+        }
+    }
+
     protected void bootstrap() throws Exception {
         Predicate scanPredicate =
                 PredicateBuilder.andNullable(context.tablePredicate, 
specificPartition);
@@ -168,6 +179,11 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
                         scanPredicate,
                         context.requiredCachedBucketIds,
                         cacheRowFilter);
+        if (!stateFactory.preferBulkLoad()) {
+            doRefresh();
+            return;
+        }
+
         BinaryExternalSortBuffer bulkLoadSorter =
                 RocksDBState.createBulkLoadSorter(
                         IOManager.create(context.tempPath.toString()), 
context.table.coreOptions());
@@ -189,7 +205,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
             while ((row = keyIterator.next(row)) != null) {
                 bulkLoader.write(row.getBinary(0), row.getBinary(1));
             }
-        } catch (BulkLoader.WriteException e) {
+        } catch (RocksDBBulkLoader.WriteException e) {
             throw new RuntimeException(
                     "Exception in bulkLoad, the most suspicious reason is that 
"
                             + "your data contains duplicates, please check 
your lookup table. ",
@@ -331,7 +347,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     /** Bulk loader for the table. */
     public interface TableBulkLoader {
 
-        void write(byte[] key, byte[] value) throws BulkLoader.WriteException, 
IOException;
+        void write(byte[] key, byte[] value) throws 
RocksDBBulkLoader.WriteException, IOException;
 
         void finish() throws IOException;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index 63af4f3506..546bc5a60a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBListState;
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ListState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.KeyProjectedRow;
@@ -40,7 +41,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     private final KeyProjectedRow joinKeyRow;
 
-    private RocksDBListState<InternalRow, InternalRow> state;
+    private ListState<InternalRow, InternalRow> state;
 
     public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
         super(context);
@@ -105,7 +106,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public TableBulkLoader createBulkLoader() {
-        BulkLoader bulkLoader = state.createBulkLoader();
+        ListBulkLoader bulkLoader = state.createBulkLoader();
         return new TableBulkLoader() {
 
             private final List<byte[]> values = new ArrayList<>();
@@ -113,7 +114,7 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
             private byte[] currentKey;
 
             @Override
-            public void write(byte[] key, byte[] value) throws IOException {
+            public void write(byte[] key, byte[] value) {
                 if (currentKey != null && !Arrays.equals(key, currentKey)) {
                     flush();
                 }
@@ -122,16 +123,16 @@ public class NoPrimaryKeyLookupTable extends 
FullCacheLookupTable {
             }
 
             @Override
-            public void finish() throws IOException {
+            public void finish() {
                 flush();
                 bulkLoader.finish();
             }
 
-            private void flush() throws IOException {
+            private void flush() {
                 if (currentKey != null && values.size() > 0) {
                     try {
-                        bulkLoader.write(currentKey, 
state.serializeList(values));
-                    } catch (BulkLoader.WriteException e) {
+                        bulkLoader.write(currentKey, values);
+                    } catch (RocksDBBulkLoader.WriteException e) {
                         throw new RuntimeException(e);
                     }
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 2a3099e9a6..485a9cfaa6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.lookup.ValueState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowKind;
@@ -44,7 +45,7 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Nullable private final ProjectedRow keyRearrange;
 
-    protected RocksDBValueState<InternalRow, InternalRow> tableState;
+    protected ValueState<InternalRow, InternalRow> tableState;
 
     public PrimaryKeyLookupTable(Context context, long lruCacheSize, 
List<String> joinKey) {
         super(context);
@@ -129,12 +130,12 @@ public class PrimaryKeyLookupTable extends 
FullCacheLookupTable {
 
     @Override
     public TableBulkLoader createBulkLoader() {
-        BulkLoader bulkLoader = tableState.createBulkLoader();
+        ValueBulkLoader bulkLoader = tableState.createBulkLoader();
         return new TableBulkLoader() {
 
             @Override
             public void write(byte[] key, byte[] value)
-                    throws BulkLoader.WriteException, IOException {
+                    throws RocksDBBulkLoader.WriteException, IOException {
                 bulkLoader.write(key, value);
                 bulkLoadWritePlus(key, value);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 11c9cba24b..668570d77d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.RocksDBSetState;
+import org.apache.paimon.lookup.SetState;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.KeyProjectedRow;
@@ -35,7 +35,7 @@ public class SecondaryIndexLookupTable extends 
PrimaryKeyLookupTable {
 
     private final KeyProjectedRow secKeyRow;
 
-    private RocksDBSetState<InternalRow, InternalRow> indexState;
+    private SetState<InternalRow, InternalRow> indexState;
 
     public SecondaryIndexLookupTable(Context context, long lruCacheSize) {
         super(context, lruCacheSize / 2, context.table.primaryKeys());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index dea30d4d13..652ef5bd9c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -57,8 +57,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                         + "PARTITIONED BY (`i`) WITH 
('continuous.discovery-interval'='1 ms' %s)";
 
         String fullOption = ", 'lookup.cache' = 'full'";
-
         String lruOption = ", 'changelog-producer'='lookup'";
+        String memoryOption = ", 'lookup.cache' = 'memory'";
 
         switch (cacheMode) {
             case FULL:
@@ -69,6 +69,10 @@ public class LookupJoinITCase extends CatalogITCaseBase {
                 tEnv.executeSql(String.format(dim, lruOption));
                 tEnv.executeSql(String.format(partitioned, lruOption));
                 break;
+            case MEMORY:
+                tEnv.executeSql(String.format(dim, memoryOption));
+                tEnv.executeSql(String.format(partitioned, memoryOption));
+                break;
             default:
                 throw new UnsupportedOperationException();
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index dcbc405d31..4f7ff334f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -26,7 +26,7 @@ import org.apache.paimon.flink.FlinkRowData;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
 import 
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoteQueryExecutor;
-import org.apache.paimon.lookup.RocksDBOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 88b9471330..4a3ca45c3b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -42,6 +42,8 @@ import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -50,15 +52,15 @@ import org.apache.paimon.utils.SortUtil;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -71,21 +73,34 @@ import java.util.concurrent.TimeoutException;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.FULL;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
 import static org.apache.paimon.types.DataTypes.INT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link LookupTable}. */
+@ExtendWith(ParameterizedTestExtension.class)
 public class LookupTableTest extends TableTestBase {
 
-    @TempDir Path tempDir;
+    private final boolean inMemory;
 
+    @TempDir Path tempDir;
     private RowType rowType;
-
     private IOManager ioManager;
-
     private FullCacheLookupTable table;
 
+    public LookupTableTest(boolean inMemory) {
+        this.inMemory = inMemory;
+    }
+
+    @SuppressWarnings("unused")
+    @Parameters(name = "{0}")
+    public static List<Boolean> getVarSeg() {
+        return Arrays.asList(true, false);
+    }
+
     @BeforeEach
     public void before() throws IOException {
         this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
@@ -100,6 +115,9 @@ public class LookupTableTest extends TableTestBase {
     }
 
     private FileStoreTable createTable(List<String> primaryKeys, Options 
options) throws Exception {
+        if (inMemory) {
+            options.set(LOOKUP_CACHE_MODE, MEMORY);
+        }
         Identifier identifier = new Identifier("default", "t");
         Schema schema =
                 new Schema(
@@ -112,7 +130,7 @@ public class LookupTableTest extends TableTestBase {
         return (FileStoreTable) catalog.getTable(identifier);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTable() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         FullCacheLookupTable.Context context =
@@ -132,7 +150,7 @@ public class LookupTableTest extends TableTestBase {
         table.open();
 
         // test bulk load error
-        {
+        if (!inMemory) {
             TableBulkLoader bulkLoader = table.createBulkLoader();
             bulkLoader.write(new byte[] {1}, new byte[] {1});
             assertThatThrownBy(() -> bulkLoader.write(new byte[] {1}, new 
byte[] {2}))
@@ -172,7 +190,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(table.get(row(3))).hasSize(0);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTableWithSequenceField() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.SEQUENCE_FIELD, "f1");
@@ -223,7 +241,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 1, 22, 222);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTableWithSequenceFieldProjection() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.SEQUENCE_FIELD, "f2");
@@ -263,7 +281,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 1, 22);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTablePkFilter() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         FullCacheLookupTable.Context context =
@@ -299,7 +317,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(table.get(row(3))).hasSize(0);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTableNonPkFilter() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         FullCacheLookupTable.Context context =
@@ -328,7 +346,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result).hasSize(0);
     }
 
-    @Test
+    @TestTemplate
     public void testSecKeyTable() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         FullCacheLookupTable.Context context =
@@ -376,7 +394,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
     }
 
-    @Test
+    @TestTemplate
     public void testSecKeyTableWithSequenceField() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.SEQUENCE_FIELD, "f1");
@@ -431,7 +449,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result.stream().map(row -> 
row.getInt(2))).doesNotContain(333);
     }
 
-    @Test
+    @TestTemplate
     public void testSecKeyTablePkFilter() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         FullCacheLookupTable.Context context =
@@ -476,7 +494,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(table.get(row(33))).hasSize(0);
     }
 
-    @Test
+    @TestTemplate
     public void testNoPrimaryKeyTable() throws Exception {
         FileStoreTable storeTable = createTable(emptyList(), new Options());
         FullCacheLookupTable.Context context =
@@ -524,7 +542,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
     }
 
-    @Test
+    @TestTemplate
     public void testNoPrimaryKeyTableFilter() throws Exception {
         FileStoreTable storeTable = createTable(emptyList(), new Options());
         FullCacheLookupTable.Context context =
@@ -559,7 +577,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(1), 1, 11, 111);
     }
 
-    @Test
+    @TestTemplate
     public void testPkTableWithCacheRowFilter() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         writeWithBucketAssigner(
@@ -600,7 +618,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(res).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     public void testRefreshExecutorRebuildAfterReopen() throws Exception {
         Options options = new Options();
         options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true);
@@ -635,7 +653,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(res.get(0), 1, 22, 222);
     }
 
-    @Test
+    @TestTemplate
     public void testNoPkTableWithCacheRowFilter() throws Exception {
         FileStoreTable storeTable = createTable(emptyList(), new Options());
         writeWithBucketAssigner(
@@ -676,7 +694,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(res).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     public void testSecKeyTableWithCacheRowFilter() throws Exception {
         FileStoreTable storeTable = createTable(singletonList("f0"), new 
Options());
         writeWithBucketAssigner(
@@ -717,7 +735,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(res).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     public void testPartialLookupTable() throws Exception {
         FileStoreTable dimTable = createDimTable();
         PrimaryKeyPartialLookupTable table =
@@ -750,7 +768,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result).hasSize(0);
     }
 
-    @Test
+    @TestTemplate
     public void testPartialLookupTableWithRowFilter() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.BUCKET.key(), "2");
@@ -771,7 +789,7 @@ public class LookupTableTest extends TableTestBase {
         assertThat(result).isEmpty();
     }
 
-    @Test
+    @TestTemplate
     public void testPartialLookupTableWithProjection() throws Exception {
         FileStoreTable dimTable = createDimTable();
         PrimaryKeyPartialLookupTable table =
@@ -803,7 +821,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 22, -2);
     }
 
-    @Test
+    @TestTemplate
     public void testPartialLookupTableJoinKeyOrder() throws Exception {
         FileStoreTable dimTable = createDimTable();
         PrimaryKeyPartialLookupTable table =
@@ -835,9 +853,17 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 22, -2);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws 
Exception {
+    @TestTemplate
+    public void testPKLookupTableNotRefreshAsync() throws Exception {
+        innerTestPKLookupTableRefreshAsync(false);
+    }
+
+    @TestTemplate
+    public void testPKLookupTableRefreshAsync() throws Exception {
+        innerTestPKLookupTableRefreshAsync(true);
+    }
+
+    private void innerTestPKLookupTableRefreshAsync(boolean refreshAsync) 
throws Exception {
         Options options = new Options();
         options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
         FileStoreTable storeTable = createTable(singletonList("f0"), options);
@@ -902,13 +928,11 @@ public class LookupTableTest extends TableTestBase {
         table.close();
     }
 
-    @Test
+    @TestTemplate
     public void testFullCacheLookupTableWithForceLookup() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.MERGE_ENGINE, 
CoreOptions.MergeEngine.PARTIAL_UPDATE);
-        options.set(
-                FlinkConnectorOptions.LOOKUP_CACHE_MODE,
-                FlinkConnectorOptions.LookupCacheMode.FULL);
+        options.set(LOOKUP_CACHE_MODE, inMemory ? MEMORY : FULL);
         options.set(CoreOptions.WRITE_ONLY, true);
         options.set(CoreOptions.FORCE_LOOKUP, true);
         options.set(CoreOptions.BUCKET, 1);
@@ -964,7 +988,7 @@ public class LookupTableTest extends TableTestBase {
         assertRow(result.get(0), 1, 22, 222); // new value
     }
 
-    @Test
+    @TestTemplate
     public void testPartialLookupTableWithForceLookup() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.MERGE_ENGINE, 
CoreOptions.MergeEngine.PARTIAL_UPDATE);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
index 7a942ddb56..d98d199623 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.lookup.RocksDBListState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBListState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;

Reply via email to