This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b74cc432d8 [flink] Introduce 'memory' type for 'lookup.cache' to
in-memory cache (#5748)
b74cc432d8 is described below
commit b74cc432d87bf19b8eda87e564844b4ab3ec4ab4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 14:02:03 2025 +0800
[flink] Introduce 'memory' type for 'lookup.cache' to in-memory cache
(#5748)
---
.../generated/flink_connector_configuration.html | 2 +-
.../paimon/crosspartition/GlobalIndexAssigner.java | 16 ++--
.../java/org/apache/paimon/lookup/BulkLoader.java | 101 +--------------------
.../java/org/apache/paimon/lookup/ByteArray.java | 61 +++++++++++++
.../org/apache/paimon/lookup/ListBulkLoader.java | 27 ++++++
.../java/org/apache/paimon/lookup/ListState.java | 32 +++++++
.../java/org/apache/paimon/lookup/SetState.java | 35 +++++++
.../main/java/org/apache/paimon/lookup/State.java | 31 +++++++
.../org/apache/paimon/lookup/StateFactory.java | 51 +++++++++++
.../org/apache/paimon/lookup/ValueBulkLoader.java | 25 +++++
.../java/org/apache/paimon/lookup/ValueState.java | 36 ++++++++
.../paimon/lookup/memory/InMemoryListState.java | 77 ++++++++++++++++
.../paimon/lookup/memory/InMemorySetState.java | 69 ++++++++++++++
.../apache/paimon/lookup/memory/InMemoryState.java | 64 +++++++++++++
.../paimon/lookup/memory/InMemoryStateFactory.java | 66 ++++++++++++++
.../paimon/lookup/memory/InMemoryValueState.java | 76 ++++++++++++++++
.../RocksDBBulkLoader.java} | 33 +++++--
.../lookup/{ => rocksdb}/RocksDBListState.java | 17 +---
.../lookup/{ => rocksdb}/RocksDBOptions.java | 2 +-
.../lookup/{ => rocksdb}/RocksDBSetState.java | 10 +-
.../paimon/lookup/{ => rocksdb}/RocksDBState.java | 52 ++++-------
.../lookup/{ => rocksdb}/RocksDBStateFactory.java | 14 ++-
.../lookup/{ => rocksdb}/RocksDBValueState.java | 21 ++---
.../configuration/ConfigOptionsDocGenerator.java | 2 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 5 +-
.../flink/lookup/FileStoreLookupFunction.java | 4 +-
.../paimon/flink/lookup/FullCacheLookupTable.java | 38 +++++---
.../flink/lookup/NoPrimaryKeyLookupTable.java | 19 ++--
.../paimon/flink/lookup/PrimaryKeyLookupTable.java | 11 ++-
.../flink/lookup/SecondaryIndexLookupTable.java | 4 +-
.../org/apache/paimon/flink/LookupJoinITCase.java | 6 +-
.../flink/lookup/FileStoreLookupFunctionTest.java | 2 +-
.../paimon/flink/lookup/LookupTableTest.java | 90 +++++++++++-------
.../paimon/flink/lookup/RocksDBListStateTest.java | 4 +-
34 files changed, 856 insertions(+), 247 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 1a67d4e60f..fa9b8e4d9c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -60,7 +60,7 @@ under the License.
<td><h5>lookup.cache</h5></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
- <td>The cache mode of lookup join.<br /><br />Possible
values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
+ <td>The cache mode of lookup join.<br /><br />Possible
values:<ul><li>"AUTO"</li><li>"FULL"</li><li>"MEMORY"</li></ul></td>
</tr>
<tr>
<td><h5>lookup.dynamic-partition.refresh-interval</h5></td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 892e27e966..fe14048310 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -28,11 +28,11 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBOptions;
-import org.apache.paimon.lookup.RocksDBState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
-import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBValueState;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -71,7 +71,7 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.IntStream;
-import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
+import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.BLOCK_CACHE_SIZE;
import static org.apache.paimon.utils.ListUtils.pickRandomly;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -210,14 +210,14 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
bootstrapRecords.complete();
boolean isEmpty = true;
if (bootstrapKeys.size() > 0) {
- BulkLoader bulkLoader = keyIndex.createBulkLoader();
+ RocksDBBulkLoader bulkLoader = keyIndex.createBulkLoader();
MutableObjectIterator<BinaryRow> keyIterator =
bootstrapKeys.sortedIterator();
BinaryRow row = new BinaryRow(2);
try {
while ((row = keyIterator.next(row)) != null) {
bulkLoader.write(row.getBinary(0), row.getBinary(1));
}
- } catch (BulkLoader.WriteException e) {
+ } catch (RocksDBBulkLoader.WriteException e) {
throw new RuntimeException(
"Exception in bulkLoad, the most suspicious reason is
that "
+ "your data contains duplicates, please check
your sink table. "
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
index 0bcdbbb5d2..f0ea474f52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
@@ -18,106 +18,13 @@
package org.apache.paimon.lookup;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.EnvOptions;
-import org.rocksdb.IngestExternalFileOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.SstFileWriter;
-import org.rocksdb.TtlDB;
+/** Bulk loader for {@link State}, incoming keys must be sorted, and there
must be no repetition. */
+public interface BulkLoader {
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-/** Bulk loader for RocksDB. */
-public class BulkLoader {
-
- private final String uuid = UUID.randomUUID().toString();
-
- private final ColumnFamilyHandle columnFamily;
- private final String path;
- private final RocksDB db;
- private final boolean isTtlEnabled;
- private final Options options;
- private final List<String> files = new ArrayList<>();
- private final int currentTimeSeconds;
-
- private SstFileWriter writer = null;
- private int sstIndex = 0;
- private long recordNum = 0;
-
- public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle
columnFamily, String path) {
- this.db = db;
- this.isTtlEnabled = db instanceof TtlDB;
- this.options = options;
- this.columnFamily = columnFamily;
- this.path = path;
- this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
- }
-
- public void write(byte[] key, byte[] value) throws WriteException {
- try {
- if (writer == null) {
- writer = new SstFileWriter(new EnvOptions(), options);
- String path = new File(this.path, "sst-" + uuid + "-" +
(sstIndex++)).getPath();
- writer.open(path);
- files.add(path);
- }
-
- if (isTtlEnabled) {
- value = appendTimestamp(value);
- }
-
- try {
- writer.put(key, value);
- } catch (RocksDBException e) {
- throw new WriteException(e);
- }
-
- recordNum++;
- if (recordNum % 1000 == 0 && writer.fileSize() >=
options.targetFileSizeBase()) {
- writer.finish();
- writer.close();
- writer = null;
- recordNum = 0;
- }
- } catch (RocksDBException e) {
- throw new RuntimeException(e);
- }
- }
-
- private byte[] appendTimestamp(byte[] value) {
- byte[] newValue = new byte[value.length + 4];
- System.arraycopy(value, 0, newValue, 0, value.length);
- newValue[value.length] = (byte) (currentTimeSeconds & 0xff);
- newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff);
- newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) &
0xff);
- newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) &
0xff);
- return newValue;
- }
-
- public void finish() {
- try {
- if (writer != null) {
- writer.finish();
- writer.close();
- }
-
- if (files.size() > 0) {
- IngestExternalFileOptions ingestOptions = new
IngestExternalFileOptions();
- db.ingestExternalFile(columnFamily, files, ingestOptions);
- ingestOptions.close();
- }
- } catch (RocksDBException e) {
- throw new RuntimeException(e);
- }
- }
+ void finish();
/** Exception during writing. */
- public static class WriteException extends Exception {
+ class WriteException extends Exception {
public WriteException(Throwable cause) {
super(cause);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java
new file mode 100644
index 0000000000..95a279d20f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import org.apache.paimon.utils.SortUtil;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+
+/** A class wraps byte[] to implement equals and hashCode. */
+public class ByteArray implements Comparable<ByteArray> {
+
+ public final byte[] bytes;
+
+ public ByteArray(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ByteArray byteArray = (ByteArray) o;
+ return Arrays.equals(bytes, byteArray.bytes);
+ }
+
+ public static ByteArray wrapBytes(byte[] bytes) {
+ return new ByteArray(bytes);
+ }
+
+ @Override
+ public int compareTo(@NotNull ByteArray o) {
+ return SortUtil.compareBinary(bytes, o.bytes);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java
new file mode 100644
index 0000000000..6496d5186b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.util.List;
+
+/** List bulk loader to load key values, incoming keys must be sorted. */
+public interface ListBulkLoader extends BulkLoader {
+
+ void write(byte[] key, List<byte[]> value) throws WriteException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java
new file mode 100644
index 0000000000..8e3d7c2c8b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+/** {@link State} interface for list state in Operations. */
+public interface ListState<K, V> extends State<K, V> {
+
+ void add(K key, V value) throws IOException;
+
+ List<V> get(K key) throws IOException;
+
+ ListBulkLoader createBulkLoader();
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java
new file mode 100644
index 0000000000..7874fb7178
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link State} interface for set state in Operations, the values must be
sorted by byte array to
+ * be returned.
+ */
+public interface SetState<K, V> extends State<K, V> {
+
+ List<V> get(K key) throws IOException;
+
+ void retract(K key, V value) throws IOException;
+
+ void add(K key, V value) throws IOException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/State.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java
new file mode 100644
index 0000000000..e4174a9e21
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import java.io.IOException;
+
+/** Interface that different types of state must implement. */
+public interface State<K, V> {
+
+ byte[] serializeKey(K key) throws IOException;
+
+ byte[] serializeValue(V value) throws IOException;
+
+ V deserializeValue(byte[] valueBytes) throws IOException;
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java
new file mode 100644
index 0000000000..478c6f9f3b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import org.apache.paimon.data.serializer.Serializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** State factory to create {@link State}. */
+public interface StateFactory extends Closeable {
+
+ <K, V> ValueState<K, V> valueState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize)
+ throws IOException;
+
+ <K, V> SetState<K, V> setState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize)
+ throws IOException;
+
+ <K, V> ListState<K, V> listState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize)
+ throws IOException;
+
+ boolean preferBulkLoad();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java
new file mode 100644
index 0000000000..eef737078f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+/** Value bulk loader to load key values, incoming keys must be sorted. */
+public interface ValueBulkLoader extends BulkLoader {
+
+ void write(byte[] key, byte[] value) throws WriteException;
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java
new file mode 100644
index 0000000000..f04e456b3b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** {@link State} interface for single-value state. The value can be deleted
or updated. */
+public interface ValueState<K, V> extends State<K, V> {
+
+ @Nullable
+ V get(K key) throws IOException;
+
+ void put(K key, V value) throws IOException;
+
+ void delete(K key) throws IOException;
+
+ ValueBulkLoader createBulkLoader();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
new file mode 100644
index 0000000000..992fbc1abd
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ListState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory list state. */
+public class InMemoryListState<K, V> extends InMemoryState<K, V> implements
ListState<K, V> {
+
+ private final Map<ByteArray, List<byte[]>> values;
+
+ public InMemoryListState(Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
+ super(keySerializer, valueSerializer);
+ this.values = new HashMap<>();
+ }
+
+ @Override
+ public void add(K key, V value) throws IOException {
+ byte[] keyBytes = serializeKey(key);
+ byte[] valueBytes = serializeValue(value);
+ values.computeIfAbsent(wrapBytes(keyBytes), k -> new
ArrayList<>()).add(valueBytes);
+ }
+
+ @Override
+ public List<V> get(K key) throws IOException {
+ List<byte[]> list = this.values.get(wrapBytes(serializeKey(key)));
+ List<V> result = new ArrayList<>();
+ if (list != null) {
+ for (byte[] value : list) {
+ result.add(deserializeValue(value));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public ListBulkLoader createBulkLoader() {
+ return new ListBulkLoader() {
+
+ @Override
+ public void write(byte[] key, List<byte[]> value) {
+ // copy the list, outside will reuse list
+ values.put(wrapBytes(key), new ArrayList<>(value));
+ }
+
+ @Override
+ public void finish() {}
+ };
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
new file mode 100644
index 0000000000..f694a57643
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.SetState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory set state. */
+public class InMemorySetState<K, V> extends InMemoryState<K, V> implements
SetState<K, V> {
+
+ private final Map<ByteArray, TreeSet<ByteArray>> values;
+
+ public InMemorySetState(Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
+ super(keySerializer, valueSerializer);
+ this.values = new HashMap<>();
+ }
+
+ @Override
+ public List<V> get(K key) throws IOException {
+ Set<ByteArray> set = values.get(wrapBytes(serializeKey(key)));
+ List<V> result = new ArrayList<>();
+ if (set != null) {
+ for (ByteArray value : set) {
+ result.add(deserializeValue(value.bytes));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void retract(K key, V value) throws IOException {
+
values.get(wrapBytes(serializeKey(key))).remove(wrapBytes(serializeValue(value)));
+ }
+
+ @Override
+ public void add(K key, V value) throws IOException {
+ byte[] keyBytes = serializeKey(key);
+ byte[] valueBytes = serializeValue(value);
+ values.computeIfAbsent(wrapBytes(keyBytes), k -> new TreeSet<>())
+ .add(wrapBytes(valueBytes));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java
new file mode 100644
index 0000000000..6b79776a70
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.io.DataInputDeserializer;
+import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.lookup.State;
+
+import java.io.IOException;
+
+/** In-memory state. */
+public abstract class InMemoryState<K, V> implements State<K, V> {
+
+ protected final Serializer<K> keySerializer;
+ protected final Serializer<V> valueSerializer;
+ protected final DataOutputSerializer keyOutView;
+ protected final DataInputDeserializer valueInputView;
+ protected final DataOutputSerializer valueOutputView;
+
+ public InMemoryState(Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.keyOutView = new DataOutputSerializer(32);
+ this.valueInputView = new DataInputDeserializer();
+ this.valueOutputView = new DataOutputSerializer(32);
+ }
+
+ @Override
+ public byte[] serializeKey(K key) throws IOException {
+ keyOutView.clear();
+ keySerializer.serialize(key, keyOutView);
+ return keyOutView.getCopyOfBuffer();
+ }
+
+ @Override
+ public byte[] serializeValue(V value) throws IOException {
+ valueOutputView.clear();
+ valueSerializer.serialize(value, valueOutputView);
+ return valueOutputView.getCopyOfBuffer();
+ }
+
+ @Override
+ public V deserializeValue(byte[] valueBytes) throws IOException {
+ valueInputView.setBuffer(valueBytes);
+ return valueSerializer.deserialize(valueInputView);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
new file mode 100644
index 0000000000..77aa1cf17e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ListState;
+import org.apache.paimon.lookup.SetState;
+import org.apache.paimon.lookup.StateFactory;
+import org.apache.paimon.lookup.ValueState;
+
+import java.io.IOException;
+
+/** Factory to create in-memory state. */
+public class InMemoryStateFactory implements StateFactory {
+
+ @Override
+ public <K, V> ValueState<K, V> valueState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize) {
+ return new InMemoryValueState<>(keySerializer, valueSerializer);
+ }
+
+ @Override
+ public <K, V> SetState<K, V> setState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize) {
+ return new InMemorySetState<>(keySerializer, valueSerializer);
+ }
+
+ @Override
+ public <K, V> ListState<K, V> listState(
+ String name,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ long lruCacheSize) {
+ return new InMemoryListState<>(keySerializer, valueSerializer);
+ }
+
+ @Override
+ public boolean preferBulkLoad() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
new file mode 100644
index 0000000000..07db364ef1
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.memory;
+
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.lookup.ValueState;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.paimon.lookup.ByteArray.wrapBytes;
+
+/** In-memory value state. */
+public class InMemoryValueState<K, V> extends InMemoryState<K, V> implements
ValueState<K, V> {
+
+ private final Map<ByteArray, byte[]> values;
+
+ public InMemoryValueState(Serializer<K> keySerializer, Serializer<V>
valueSerializer) {
+ super(keySerializer, valueSerializer);
+ this.values = new HashMap<>();
+ }
+
+ @Override
+ public @Nullable V get(K key) throws IOException {
+ byte[] bytes = values.get(wrapBytes(serializeKey(key)));
+ if (bytes == null) {
+ return null;
+ }
+ return deserializeValue(bytes);
+ }
+
+ @Override
+ public void put(K key, V value) throws IOException {
+ values.put(wrapBytes(serializeKey(key)), serializeValue(value));
+ }
+
+ @Override
+ public void delete(K key) throws IOException {
+ values.remove(wrapBytes(serializeKey(key)));
+ }
+
+ @Override
+ public ValueBulkLoader createBulkLoader() {
+ return new ValueBulkLoader() {
+
+ @Override
+ public void write(byte[] key, byte[] value) {
+ values.put(wrapBytes(key), value);
+ }
+
+ @Override
+ public void finish() {}
+ };
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
similarity index 82%
copy from paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
copy to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
index 0bcdbbb5d2..8278376f73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java
@@ -16,7 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
+
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.utils.ListDelimitedSerializer;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.EnvOptions;
@@ -28,14 +32,16 @@ import org.rocksdb.SstFileWriter;
import org.rocksdb.TtlDB;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/** Bulk loader for RocksDB. */
-public class BulkLoader {
+public class RocksDBBulkLoader implements ValueBulkLoader, ListBulkLoader {
private final String uuid = UUID.randomUUID().toString();
+ private final ListDelimitedSerializer listSerializer = new
ListDelimitedSerializer();
private final ColumnFamilyHandle columnFamily;
private final String path;
@@ -49,7 +55,8 @@ public class BulkLoader {
private int sstIndex = 0;
private long recordNum = 0;
- public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle
columnFamily, String path) {
+ public RocksDBBulkLoader(
+ RocksDB db, Options options, ColumnFamilyHandle columnFamily,
String path) {
this.db = db;
this.isTtlEnabled = db instanceof TtlDB;
this.options = options;
@@ -58,6 +65,7 @@ public class BulkLoader {
this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000);
}
+ @Override
public void write(byte[] key, byte[] value) throws WriteException {
try {
if (writer == null) {
@@ -89,6 +97,17 @@ public class BulkLoader {
}
}
+ @Override
+ public void write(byte[] key, List<byte[]> value) throws WriteException {
+ byte[] bytes;
+ try {
+ bytes = listSerializer.serializeList(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ write(key, bytes);
+ }
+
private byte[] appendTimestamp(byte[] value) {
byte[] newValue = new byte[value.length + 4];
System.arraycopy(value, 0, newValue, 0, value.length);
@@ -99,6 +118,7 @@ public class BulkLoader {
return newValue;
}
+ @Override
public void finish() {
try {
if (writer != null) {
@@ -115,11 +135,4 @@ public class BulkLoader {
throw new RuntimeException(e);
}
}
-
- /** Exception during writing. */
- public static class WriteException extends Exception {
- public WriteException(Throwable cause) {
- super(cause);
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
similarity index 87%
rename from
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
index 7fe29d209a..70716c40cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ListState;
import org.apache.paimon.utils.ListDelimitedSerializer;
import org.rocksdb.ColumnFamilyHandle;
@@ -29,7 +30,7 @@ import java.util.Collections;
import java.util.List;
/** RocksDB state for key -> List of value. */
-public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> {
+public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>>
implements ListState<K, V> {
private final ListDelimitedSerializer listSerializer = new
ListDelimitedSerializer();
@@ -42,6 +43,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K,
V, List<V>> {
super(stateFactory, columnFamily, keySerializer, valueSerializer,
lruCacheSize);
}
+ @Override
public void add(K key, V value) throws IOException {
byte[] keyBytes = serializeKey(key);
byte[] valueBytes = serializeValue(value);
@@ -53,6 +55,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K,
V, List<V>> {
cache.invalidate(wrap(keyBytes));
}
+ @Override
public List<V> get(K key) throws IOException {
byte[] keyBytes = serializeKey(key);
return cache.get(
@@ -71,14 +74,4 @@ public class RocksDBListState<K, V> extends RocksDBState<K,
V, List<V>> {
return rows;
});
}
-
- public byte[] serializeValue(V value) throws IOException {
- valueOutputView.clear();
- valueSerializer.serialize(value, valueOutputView);
- return valueOutputView.getCopyOfBuffer();
- }
-
- public byte[] serializeList(List<byte[]> valueList) throws IOException {
- return listSerializer.serializeList(valueList);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
similarity index 99%
rename from
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
index e8c3fecdf8..2cde6d6e7b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.options.ConfigOption;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
similarity index 95%
rename from
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
index 5c06cbd51a..9f654e152c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java
@@ -16,9 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.SetState;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
@@ -32,7 +34,8 @@ import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Rocksdb state for key -> Set values. */
-public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> {
+public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>>
+ implements SetState<K, V> {
private static final byte[] EMPTY = new byte[0];
@@ -45,6 +48,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V,
List<byte[]>> {
super(stateFactory, columnFamily, keySerializer, valueSerializer,
lruCacheSize);
}
+ @Override
public List<V> get(K key) throws IOException {
ByteArray keyBytes = wrap(serializeKey(key));
List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
@@ -73,6 +77,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V,
List<byte[]>> {
return values;
}
+ @Override
public void retract(K key, V value) throws IOException {
try {
byte[] bytes = invalidKeyAndGetKVBytes(key, value);
@@ -84,6 +89,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V,
List<byte[]>> {
}
}
+ @Override
public void add(K key, V value) throws IOException {
try {
byte[] bytes = invalidKeyAndGetKVBytes(key, value);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
similarity index 82%
rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
index 0181917a7a..62257cfd34 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java
@@ -16,13 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.State;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -37,10 +39,9 @@ import org.rocksdb.WriteOptions;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Arrays;
/** Rocksdb state for key value. */
-public abstract class RocksDBState<K, V, CacheV> {
+public abstract class RocksDBState<K, V, CacheV> implements State<K, V> {
protected final RocksDBStateFactory stateFactory;
@@ -85,12 +86,26 @@ public abstract class RocksDBState<K, V, CacheV> {
.build();
}
+ @Override
public byte[] serializeKey(K key) throws IOException {
keyOutView.clear();
keySerializer.serialize(key, keyOutView);
return keyOutView.getCopyOfBuffer();
}
+ @Override
+ public byte[] serializeValue(V value) throws IOException {
+ valueOutputView.clear();
+ valueSerializer.serialize(value, valueOutputView);
+ return valueOutputView.getCopyOfBuffer();
+ }
+
+ @Override
+ public V deserializeValue(byte[] valueBytes) throws IOException {
+ valueInputView.setBuffer(valueBytes);
+ return valueSerializer.deserialize(valueInputView);
+ }
+
protected ByteArray wrap(byte[] bytes) {
return new ByteArray(bytes);
}
@@ -99,8 +114,8 @@ public abstract class RocksDBState<K, V, CacheV> {
return new Reference(bytes);
}
- public BulkLoader createBulkLoader() {
- return new BulkLoader(db, stateFactory.options(), columnFamily,
stateFactory.path());
+ public RocksDBBulkLoader createBulkLoader() {
+ return new RocksDBBulkLoader(db, stateFactory.options(), columnFamily,
stateFactory.path());
}
public static BinaryExternalSortBuffer createBulkLoadSorter(
@@ -117,33 +132,6 @@ public abstract class RocksDBState<K, V, CacheV> {
options.sequenceFieldSortOrderIsAscending());
}
- /** A class wraps byte[] to implement equals and hashCode. */
- protected static class ByteArray {
-
- protected final byte[] bytes;
-
- protected ByteArray(byte[] bytes) {
- this.bytes = bytes;
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(bytes);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ByteArray byteArray = (ByteArray) o;
- return Arrays.equals(bytes, byteArray.bytes);
- }
- }
-
/** A class wraps byte[] to indicate contain or not contain. */
protected static class Reference {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
similarity index 94%
rename from
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
index 9168fbf4cb..883c82906b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.StateFactory;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -31,13 +32,12 @@ import org.rocksdb.TtlDB;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
/** Factory to create state. */
-public class RocksDBStateFactory implements Closeable {
+public class RocksDBStateFactory implements StateFactory {
public static final String MERGE_OPERATOR_NAME = "stringappendtest";
@@ -85,6 +85,7 @@ public class RocksDBStateFactory implements Closeable {
return path;
}
+ @Override
public <K, V> RocksDBValueState<K, V> valueState(
String name,
Serializer<K> keySerializer,
@@ -95,6 +96,7 @@ public class RocksDBStateFactory implements Closeable {
this, createColumnFamily(name), keySerializer,
valueSerializer, lruCacheSize);
}
+ @Override
public <K, V> RocksDBSetState<K, V> setState(
String name,
Serializer<K> keySerializer,
@@ -105,6 +107,7 @@ public class RocksDBStateFactory implements Closeable {
this, createColumnFamily(name), keySerializer,
valueSerializer, lruCacheSize);
}
+ @Override
public <K, V> RocksDBListState<K, V> listState(
String name,
Serializer<K> keySerializer,
@@ -116,6 +119,11 @@ public class RocksDBStateFactory implements Closeable {
this, createColumnFamily(name), keySerializer,
valueSerializer, lruCacheSize);
}
+ @Override
+ public boolean preferBulkLoad() {
+ return true;
+ }
+
private ColumnFamilyHandle createColumnFamily(String name) throws
IOException {
try {
return db.createColumnFamily(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
similarity index 86%
rename from
paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
rename to
paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
index 444ed5ad06..3d8768a9c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java
@@ -16,9 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.rocksdb;
import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.lookup.ByteArray;
+import org.apache.paimon.lookup.ValueState;
import org.rocksdb.ColumnFamilyHandle;
@@ -29,7 +31,8 @@ import java.io.IOException;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Rocksdb state for key -> a single value. */
-public class RocksDBValueState<K, V> extends RocksDBState<K, V,
RocksDBState.Reference> {
+public class RocksDBValueState<K, V> extends RocksDBState<K, V,
RocksDBState.Reference>
+ implements ValueState<K, V> {
public RocksDBValueState(
RocksDBStateFactory stateFactory,
@@ -41,6 +44,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K,
V, RocksDBState.Ref
}
@Nullable
+ @Override
public V get(K key) throws IOException {
try {
Reference valueRef = get(wrap(serializeKey(key)));
@@ -60,6 +64,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K,
V, RocksDBState.Ref
return valueRef;
}
+ @Override
public void put(K key, V value) throws IOException {
checkArgument(value != null);
@@ -73,6 +78,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K,
V, RocksDBState.Ref
}
}
+ @Override
public void delete(K key) throws IOException {
try {
byte[] keyBytes = serializeKey(key);
@@ -85,15 +91,4 @@ public class RocksDBValueState<K, V> extends RocksDBState<K,
V, RocksDBState.Ref
throw new IOException(e);
}
}
-
- public V deserializeValue(byte[] valueBytes) throws IOException {
- valueInputView.setBuffer(valueBytes);
- return valueSerializer.deserialize(valueInputView);
- }
-
- public byte[] serializeValue(V value) throws IOException {
- valueOutputView.clear();
- valueSerializer.serialize(value, valueOutputView);
- return valueOutputView.getCopyOfBuffer();
- }
}
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 71792962b8..219e7df99c 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -74,7 +74,7 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation[] {
new OptionsClassLocation("paimon-api",
"org.apache.paimon.options"),
new OptionsClassLocation("paimon-api", "org.apache.paimon"),
- new OptionsClassLocation("paimon-core",
"org.apache.paimon.lookup"),
+ new OptionsClassLocation("paimon-core",
"org.apache.paimon.lookup.rocksdb"),
new OptionsClassLocation("paimon-core",
"org.apache.paimon.jdbc"),
new OptionsClassLocation("paimon-core",
"org.apache.paimon.table"),
new OptionsClassLocation("paimon-core",
"org.apache.paimon.iceberg"),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index ed8d9a7ac8..d524920e82 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -541,7 +541,10 @@ public class FlinkConnectorOptions {
AUTO,
/** Use full caching mode. */
- FULL
+ FULL,
+
+ /** Use in-memory caching mode. */
+ MEMORY
}
/** Watermark emit strategy for scan. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index dec278fb70..e3e15c4ce6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -71,8 +71,8 @@ import static
org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
import static
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
-import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
-import static
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
+import static
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static
org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A lookup {@link TableFunction} for file store. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 4154b6742c..27795a00ff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -24,9 +24,11 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.StateFactory;
+import org.apache.paimon.lookup.memory.InMemoryStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
+import org.apache.paimon.lookup.rocksdb.RocksDBState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -64,11 +66,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
/** Lookup table of full cache. */
public abstract class FullCacheLookupTable implements LookupTable {
+
private static final Logger LOG =
LoggerFactory.getLogger(FullCacheLookupTable.class);
protected final Object lock = new Object();
@@ -79,7 +84,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final int appendUdsFieldNumber;
- protected RocksDBStateFactory stateFactory;
+ protected StateFactory stateFactory;
@Nullable private ExecutorService refreshExecutor;
private final AtomicReference<Exception> cachedException;
private final int maxPendingSnapshotCount;
@@ -143,11 +148,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
}
protected void init() throws Exception {
- this.stateFactory =
- new RocksDBStateFactory(
- context.tempPath.toString(),
- context.table.coreOptions().toConfiguration(),
- null);
+ this.stateFactory = createStateFactory();
this.refreshExecutor =
this.refreshAsync
? Executors.newSingleThreadExecutor(
@@ -158,6 +159,16 @@ public abstract class FullCacheLookupTable implements
LookupTable {
: null;
}
+ private StateFactory createStateFactory() throws IOException {
+ String diskDir = context.tempPath.toString();
+ Options options = context.table.coreOptions().toConfiguration();
+ if (options.get(LOOKUP_CACHE_MODE) == MEMORY) {
+ return new InMemoryStateFactory();
+ } else {
+ return new RocksDBStateFactory(diskDir, options, null);
+ }
+ }
+
protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate,
specificPartition);
@@ -168,6 +179,11 @@ public abstract class FullCacheLookupTable implements
LookupTable {
scanPredicate,
context.requiredCachedBucketIds,
cacheRowFilter);
+ if (!stateFactory.preferBulkLoad()) {
+ doRefresh();
+ return;
+ }
+
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(context.tempPath.toString()),
context.table.coreOptions());
@@ -189,7 +205,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
while ((row = keyIterator.next(row)) != null) {
bulkLoader.write(row.getBinary(0), row.getBinary(1));
}
- } catch (BulkLoader.WriteException e) {
+ } catch (RocksDBBulkLoader.WriteException e) {
throw new RuntimeException(
"Exception in bulkLoad, the most suspicious reason is that
"
+ "your data contains duplicates, please check
your lookup table. ",
@@ -331,7 +347,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
/** Bulk loader for the table. */
public interface TableBulkLoader {
- void write(byte[] key, byte[] value) throws BulkLoader.WriteException,
IOException;
+ void write(byte[] key, byte[] value) throws
RocksDBBulkLoader.WriteException, IOException;
void finish() throws IOException;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
index 63af4f3506..546bc5a60a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java
@@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBListState;
+import org.apache.paimon.lookup.ListBulkLoader;
+import org.apache.paimon.lookup.ListState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.KeyProjectedRow;
@@ -40,7 +41,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
private final KeyProjectedRow joinKeyRow;
- private RocksDBListState<InternalRow, InternalRow> state;
+ private ListState<InternalRow, InternalRow> state;
public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
super(context);
@@ -105,7 +106,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
@Override
public TableBulkLoader createBulkLoader() {
- BulkLoader bulkLoader = state.createBulkLoader();
+ ListBulkLoader bulkLoader = state.createBulkLoader();
return new TableBulkLoader() {
private final List<byte[]> values = new ArrayList<>();
@@ -113,7 +114,7 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
private byte[] currentKey;
@Override
- public void write(byte[] key, byte[] value) throws IOException {
+ public void write(byte[] key, byte[] value) {
if (currentKey != null && !Arrays.equals(key, currentKey)) {
flush();
}
@@ -122,16 +123,16 @@ public class NoPrimaryKeyLookupTable extends
FullCacheLookupTable {
}
@Override
- public void finish() throws IOException {
+ public void finish() {
flush();
bulkLoader.finish();
}
- private void flush() throws IOException {
+ private void flush() {
if (currentKey != null && values.size() > 0) {
try {
- bulkLoader.write(currentKey,
state.serializeList(values));
- } catch (BulkLoader.WriteException e) {
+ bulkLoader.write(currentKey, values);
+ } catch (RocksDBBulkLoader.WriteException e) {
throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
index 2a3099e9a6..485a9cfaa6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java
@@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.BulkLoader;
-import org.apache.paimon.lookup.RocksDBValueState;
+import org.apache.paimon.lookup.ValueBulkLoader;
+import org.apache.paimon.lookup.ValueState;
+import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;
@@ -44,7 +45,7 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
@Nullable private final ProjectedRow keyRearrange;
- protected RocksDBValueState<InternalRow, InternalRow> tableState;
+ protected ValueState<InternalRow, InternalRow> tableState;
public PrimaryKeyLookupTable(Context context, long lruCacheSize,
List<String> joinKey) {
super(context);
@@ -129,12 +130,12 @@ public class PrimaryKeyLookupTable extends
FullCacheLookupTable {
@Override
public TableBulkLoader createBulkLoader() {
- BulkLoader bulkLoader = tableState.createBulkLoader();
+ ValueBulkLoader bulkLoader = tableState.createBulkLoader();
return new TableBulkLoader() {
@Override
public void write(byte[] key, byte[] value)
- throws BulkLoader.WriteException, IOException {
+ throws RocksDBBulkLoader.WriteException, IOException {
bulkLoader.write(key, value);
bulkLoadWritePlus(key, value);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
index 11c9cba24b..668570d77d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.lookup;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
-import org.apache.paimon.lookup.RocksDBSetState;
+import org.apache.paimon.lookup.SetState;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.KeyProjectedRow;
@@ -35,7 +35,7 @@ public class SecondaryIndexLookupTable extends
PrimaryKeyLookupTable {
private final KeyProjectedRow secKeyRow;
- private RocksDBSetState<InternalRow, InternalRow> indexState;
+ private SetState<InternalRow, InternalRow> indexState;
public SecondaryIndexLookupTable(Context context, long lruCacheSize) {
super(context, lruCacheSize / 2, context.table.primaryKeys());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index dea30d4d13..652ef5bd9c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -57,8 +57,8 @@ public class LookupJoinITCase extends CatalogITCaseBase {
+ "PARTITIONED BY (`i`) WITH
('continuous.discovery-interval'='1 ms' %s)";
String fullOption = ", 'lookup.cache' = 'full'";
-
String lruOption = ", 'changelog-producer'='lookup'";
+ String memoryOption = ", 'lookup.cache' = 'memory'";
switch (cacheMode) {
case FULL:
@@ -69,6 +69,10 @@ public class LookupJoinITCase extends CatalogITCaseBase {
tEnv.executeSql(String.format(dim, lruOption));
tEnv.executeSql(String.format(partitioned, lruOption));
break;
+ case MEMORY:
+ tEnv.executeSql(String.format(dim, memoryOption));
+ tEnv.executeSql(String.format(partitioned, memoryOption));
+ break;
default:
throw new UnsupportedOperationException();
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index dcbc405d31..4f7ff334f6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -26,7 +26,7 @@ import org.apache.paimon.flink.FlinkRowData;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor;
import
org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoteQueryExecutor;
-import org.apache.paimon.lookup.RocksDBOptions;
+import org.apache.paimon.lookup.rocksdb.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index 88b9471330..4a3ca45c3b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -42,6 +42,8 @@ import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -50,15 +52,15 @@ import org.apache.paimon.utils.SortUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -71,21 +73,34 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.FULL;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY;
import static org.apache.paimon.types.DataTypes.INT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link LookupTable}. */
+@ExtendWith(ParameterizedTestExtension.class)
public class LookupTableTest extends TableTestBase {
- @TempDir Path tempDir;
+ private final boolean inMemory;
+ @TempDir Path tempDir;
private RowType rowType;
-
private IOManager ioManager;
-
private FullCacheLookupTable table;
+ public LookupTableTest(boolean inMemory) {
+ this.inMemory = inMemory;
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "{0}")
+ public static List<Boolean> getVarSeg() {
+ return Arrays.asList(true, false);
+ }
+
@BeforeEach
public void before() throws IOException {
this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
@@ -100,6 +115,9 @@ public class LookupTableTest extends TableTestBase {
}
private FileStoreTable createTable(List<String> primaryKeys, Options
options) throws Exception {
+ if (inMemory) {
+ options.set(LOOKUP_CACHE_MODE, MEMORY);
+ }
Identifier identifier = new Identifier("default", "t");
Schema schema =
new Schema(
@@ -112,7 +130,7 @@ public class LookupTableTest extends TableTestBase {
return (FileStoreTable) catalog.getTable(identifier);
}
- @Test
+ @TestTemplate
public void testPkTable() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
FullCacheLookupTable.Context context =
@@ -132,7 +150,7 @@ public class LookupTableTest extends TableTestBase {
table.open();
// test bulk load error
- {
+ if (!inMemory) {
TableBulkLoader bulkLoader = table.createBulkLoader();
bulkLoader.write(new byte[] {1}, new byte[] {1});
assertThatThrownBy(() -> bulkLoader.write(new byte[] {1}, new
byte[] {2}))
@@ -172,7 +190,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(table.get(row(3))).hasSize(0);
}
- @Test
+ @TestTemplate
public void testPkTableWithSequenceField() throws Exception {
Options options = new Options();
options.set(CoreOptions.SEQUENCE_FIELD, "f1");
@@ -223,7 +241,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 1, 22, 222);
}
- @Test
+ @TestTemplate
public void testPkTableWithSequenceFieldProjection() throws Exception {
Options options = new Options();
options.set(CoreOptions.SEQUENCE_FIELD, "f2");
@@ -263,7 +281,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 1, 22);
}
- @Test
+ @TestTemplate
public void testPkTablePkFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
FullCacheLookupTable.Context context =
@@ -299,7 +317,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(table.get(row(3))).hasSize(0);
}
- @Test
+ @TestTemplate
public void testPkTableNonPkFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
FullCacheLookupTable.Context context =
@@ -328,7 +346,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result).hasSize(0);
}
- @Test
+ @TestTemplate
public void testSecKeyTable() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
FullCacheLookupTable.Context context =
@@ -376,7 +394,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
}
- @Test
+ @TestTemplate
public void testSecKeyTableWithSequenceField() throws Exception {
Options options = new Options();
options.set(CoreOptions.SEQUENCE_FIELD, "f1");
@@ -431,7 +449,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result.stream().map(row ->
row.getInt(2))).doesNotContain(333);
}
- @Test
+ @TestTemplate
public void testSecKeyTablePkFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
FullCacheLookupTable.Context context =
@@ -476,7 +494,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(table.get(row(33))).hasSize(0);
}
- @Test
+ @TestTemplate
public void testNoPrimaryKeyTable() throws Exception {
FileStoreTable storeTable = createTable(emptyList(), new Options());
FullCacheLookupTable.Context context =
@@ -524,7 +542,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result.stream().map(row -> row.getInt(0))).contains(1);
}
- @Test
+ @TestTemplate
public void testNoPrimaryKeyTableFilter() throws Exception {
FileStoreTable storeTable = createTable(emptyList(), new Options());
FullCacheLookupTable.Context context =
@@ -559,7 +577,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(1), 1, 11, 111);
}
- @Test
+ @TestTemplate
public void testPkTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
writeWithBucketAssigner(
@@ -600,7 +618,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(res).isEmpty();
}
- @Test
+ @TestTemplate
public void testRefreshExecutorRebuildAfterReopen() throws Exception {
Options options = new Options();
options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true);
@@ -635,7 +653,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(res.get(0), 1, 22, 222);
}
- @Test
+ @TestTemplate
public void testNoPkTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(emptyList(), new Options());
writeWithBucketAssigner(
@@ -676,7 +694,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(res).isEmpty();
}
- @Test
+ @TestTemplate
public void testSecKeyTableWithCacheRowFilter() throws Exception {
FileStoreTable storeTable = createTable(singletonList("f0"), new
Options());
writeWithBucketAssigner(
@@ -717,7 +735,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(res).isEmpty();
}
- @Test
+ @TestTemplate
public void testPartialLookupTable() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
@@ -750,7 +768,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result).hasSize(0);
}
- @Test
+ @TestTemplate
public void testPartialLookupTableWithRowFilter() throws Exception {
Options options = new Options();
options.set(CoreOptions.BUCKET.key(), "2");
@@ -771,7 +789,7 @@ public class LookupTableTest extends TableTestBase {
assertThat(result).isEmpty();
}
- @Test
+ @TestTemplate
public void testPartialLookupTableWithProjection() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
@@ -803,7 +821,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 22, -2);
}
- @Test
+ @TestTemplate
public void testPartialLookupTableJoinKeyOrder() throws Exception {
FileStoreTable dimTable = createDimTable();
PrimaryKeyPartialLookupTable table =
@@ -835,9 +853,17 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 22, -2);
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws
Exception {
+ @TestTemplate
+ public void testPKLookupTableNotRefreshAsync() throws Exception {
+ innerTestPKLookupTableRefreshAsync(false);
+ }
+
+ @TestTemplate
+ public void testPKLookupTableRefreshAsync() throws Exception {
+ innerTestPKLookupTableRefreshAsync(true);
+ }
+
+ private void innerTestPKLookupTableRefreshAsync(boolean refreshAsync)
throws Exception {
Options options = new Options();
options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync);
FileStoreTable storeTable = createTable(singletonList("f0"), options);
@@ -902,13 +928,11 @@ public class LookupTableTest extends TableTestBase {
table.close();
}
- @Test
+ @TestTemplate
public void testFullCacheLookupTableWithForceLookup() throws Exception {
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.PARTIAL_UPDATE);
- options.set(
- FlinkConnectorOptions.LOOKUP_CACHE_MODE,
- FlinkConnectorOptions.LookupCacheMode.FULL);
+ options.set(LOOKUP_CACHE_MODE, inMemory ? MEMORY : FULL);
options.set(CoreOptions.WRITE_ONLY, true);
options.set(CoreOptions.FORCE_LOOKUP, true);
options.set(CoreOptions.BUCKET, 1);
@@ -964,7 +988,7 @@ public class LookupTableTest extends TableTestBase {
assertRow(result.get(0), 1, 22, 222); // new value
}
- @Test
+ @TestTemplate
public void testPartialLookupTableWithForceLookup() throws Exception {
Options options = new Options();
options.set(CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.PARTIAL_UPDATE);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
index 7a942ddb56..d98d199623 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java
@@ -24,8 +24,8 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.lookup.RocksDBListState;
-import org.apache.paimon.lookup.RocksDBStateFactory;
+import org.apache.paimon.lookup.rocksdb.RocksDBListState;
+import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;