This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit b99bf0b623dd2999ba4d268b99e44a9e8071026b Author: Jingsong Lee <[email protected]> AuthorDate: Mon Jun 16 14:02:03 2025 +0800 [flink] Introduce 'memory' type for 'lookup.cache' to in-memory cache (#5748) --- .../generated/flink_connector_configuration.html | 2 +- .../paimon/crosspartition/GlobalIndexAssigner.java | 16 ++-- .../java/org/apache/paimon/lookup/BulkLoader.java | 101 +-------------------- .../java/org/apache/paimon/lookup/ByteArray.java | 61 +++++++++++++ .../org/apache/paimon/lookup/ListBulkLoader.java | 27 ++++++ .../java/org/apache/paimon/lookup/ListState.java | 32 +++++++ .../java/org/apache/paimon/lookup/SetState.java | 35 +++++++ .../main/java/org/apache/paimon/lookup/State.java | 31 +++++++ .../org/apache/paimon/lookup/StateFactory.java | 51 +++++++++++ .../org/apache/paimon/lookup/ValueBulkLoader.java | 25 +++++ .../java/org/apache/paimon/lookup/ValueState.java | 36 ++++++++ .../paimon/lookup/memory/InMemoryListState.java | 77 ++++++++++++++++ .../paimon/lookup/memory/InMemorySetState.java | 69 ++++++++++++++ .../apache/paimon/lookup/memory/InMemoryState.java | 64 +++++++++++++ .../paimon/lookup/memory/InMemoryStateFactory.java | 66 ++++++++++++++ .../paimon/lookup/memory/InMemoryValueState.java | 76 ++++++++++++++++ .../RocksDBBulkLoader.java} | 33 +++++-- .../lookup/{ => rocksdb}/RocksDBListState.java | 17 +--- .../lookup/{ => rocksdb}/RocksDBOptions.java | 2 +- .../lookup/{ => rocksdb}/RocksDBSetState.java | 10 +- .../paimon/lookup/{ => rocksdb}/RocksDBState.java | 52 ++++------- .../lookup/{ => rocksdb}/RocksDBStateFactory.java | 14 ++- .../lookup/{ => rocksdb}/RocksDBValueState.java | 21 ++--- .../configuration/ConfigOptionsDocGenerator.java | 2 +- .../apache/paimon/flink/FlinkConnectorOptions.java | 5 +- .../flink/lookup/FileStoreLookupFunction.java | 4 +- .../paimon/flink/lookup/FullCacheLookupTable.java | 38 +++++--- .../flink/lookup/NoPrimaryKeyLookupTable.java | 19 ++-- .../paimon/flink/lookup/PrimaryKeyLookupTable.java | 11 ++- .../flink/lookup/SecondaryIndexLookupTable.java | 4 +- .../org/apache/paimon/flink/LookupJoinITCase.java | 6 +- .../flink/lookup/FileStoreLookupFunctionTest.java | 2 +- .../paimon/flink/lookup/LookupTableTest.java | 90 +++++++++++------- .../paimon/flink/lookup/RocksDBListStateTest.java | 4 +- 34 files changed, 856 insertions(+), 247 deletions(-) diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 1a67d4e60f..fa9b8e4d9c 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -60,7 +60,7 @@ under the License. <td><h5>lookup.cache</h5></td> <td style="word-wrap: break-word;">AUTO</td> <td><p>Enum</p></td> - <td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td> + <td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"AUTO"</li><li>"FULL"</li><li>"MEMORY"</li></ul></td> </tr> <tr> <td><h5>lookup.dynamic-partition.refresh-interval</h5></td> diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index 892e27e966..fe14048310 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -28,11 +28,11 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.RowBuffer; -import org.apache.paimon.lookup.BulkLoader; -import org.apache.paimon.lookup.RocksDBOptions; -import org.apache.paimon.lookup.RocksDBState; -import org.apache.paimon.lookup.RocksDBStateFactory; -import org.apache.paimon.lookup.RocksDBValueState; +import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader; +import org.apache.paimon.lookup.rocksdb.RocksDBOptions; +import org.apache.paimon.lookup.rocksdb.RocksDBState; +import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory; +import org.apache.paimon.lookup.rocksdb.RocksDBValueState; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -71,7 +71,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; -import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE; +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.BLOCK_CACHE_SIZE; import static org.apache.paimon.utils.ListUtils.pickRandomly; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -210,14 +210,14 @@ public class GlobalIndexAssigner implements Serializable, Closeable { bootstrapRecords.complete(); boolean isEmpty = true; if (bootstrapKeys.size() > 0) { - BulkLoader bulkLoader = keyIndex.createBulkLoader(); + RocksDBBulkLoader bulkLoader = keyIndex.createBulkLoader(); MutableObjectIterator<BinaryRow> keyIterator = bootstrapKeys.sortedIterator(); BinaryRow row = new BinaryRow(2); try { while ((row = keyIterator.next(row)) != null) { bulkLoader.write(row.getBinary(0), row.getBinary(1)); } - } catch (BulkLoader.WriteException e) { + } catch (RocksDBBulkLoader.WriteException e) { throw new RuntimeException( "Exception in bulkLoad, the most suspicious reason is that " + "your data contains duplicates, please check your sink table. " diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java index 0bcdbbb5d2..f0ea474f52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java @@ -18,106 +18,13 @@ package org.apache.paimon.lookup; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.EnvOptions; -import org.rocksdb.IngestExternalFileOptions; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.SstFileWriter; -import org.rocksdb.TtlDB; +/** Bulk loader for {@link State}, incoming keys must be sorted, and there must be no repetition. */ +public interface BulkLoader { -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -/** Bulk loader for RocksDB. */ -public class BulkLoader { - - private final String uuid = UUID.randomUUID().toString(); - - private final ColumnFamilyHandle columnFamily; - private final String path; - private final RocksDB db; - private final boolean isTtlEnabled; - private final Options options; - private final List<String> files = new ArrayList<>(); - private final int currentTimeSeconds; - - private SstFileWriter writer = null; - private int sstIndex = 0; - private long recordNum = 0; - - public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle columnFamily, String path) { - this.db = db; - this.isTtlEnabled = db instanceof TtlDB; - this.options = options; - this.columnFamily = columnFamily; - this.path = path; - this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000); - } - - public void write(byte[] key, byte[] value) throws WriteException { - try { - if (writer == null) { - writer = new SstFileWriter(new EnvOptions(), options); - String path = new File(this.path, "sst-" + uuid + "-" + (sstIndex++)).getPath(); - writer.open(path); - files.add(path); - } - - if (isTtlEnabled) { - value = appendTimestamp(value); - } - - try { - writer.put(key, value); - } catch (RocksDBException e) { - throw new WriteException(e); - } - - recordNum++; - if (recordNum % 1000 == 0 && writer.fileSize() >= options.targetFileSizeBase()) { - writer.finish(); - writer.close(); - writer = null; - recordNum = 0; - } - } catch (RocksDBException e) { - throw new RuntimeException(e); - } - } - - private byte[] appendTimestamp(byte[] value) { - byte[] newValue = new byte[value.length + 4]; - System.arraycopy(value, 0, newValue, 0, value.length); - newValue[value.length] = (byte) (currentTimeSeconds & 0xff); - newValue[value.length + 1] = (byte) ((currentTimeSeconds >> 8) & 0xff); - newValue[value.length + 2] = (byte) ((currentTimeSeconds >> 16) & 0xff); - newValue[value.length + 3] = (byte) ((currentTimeSeconds >> 24) & 0xff); - return newValue; - } - - public void finish() { - try { - if (writer != null) { - writer.finish(); - writer.close(); - } - - if (files.size() > 0) { - IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions(); - db.ingestExternalFile(columnFamily, files, ingestOptions); - ingestOptions.close(); - } - } catch (RocksDBException e) { - throw new RuntimeException(e); - } - } + void finish(); /** Exception during writing. */ - public static class WriteException extends Exception { + class WriteException extends Exception { public WriteException(Throwable cause) { super(cause); } diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java new file mode 100644 index 0000000000..95a279d20f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ByteArray.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import org.apache.paimon.utils.SortUtil; + +import org.jetbrains.annotations.NotNull; + +import java.util.Arrays; + +/** A class wraps byte[] to implement equals and hashCode. */ +public class ByteArray implements Comparable<ByteArray> { + + public final byte[] bytes; + + public ByteArray(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ByteArray byteArray = (ByteArray) o; + return Arrays.equals(bytes, byteArray.bytes); + } + + public static ByteArray wrapBytes(byte[] bytes) { + return new ByteArray(bytes); + } + + @Override + public int compareTo(@NotNull ByteArray o) { + return SortUtil.compareBinary(bytes, o.bytes); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java new file mode 100644 index 0000000000..6496d5186b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListBulkLoader.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import java.util.List; + +/** List bulk loader to load key values, incoming keys must be sorted. */ +public interface ListBulkLoader extends BulkLoader { + + void write(byte[] key, List<byte[]> value) throws WriteException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java new file mode 100644 index 0000000000..8e3d7c2c8b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ListState.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import java.io.IOException; +import java.util.List; + +/** {@link State} interface for list state in Operations. */ +public interface ListState<K, V> extends State<K, V> { + + void add(K key, V value) throws IOException; + + List<V> get(K key) throws IOException; + + ListBulkLoader createBulkLoader(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java new file mode 100644 index 0000000000..7874fb7178 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/SetState.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import java.io.IOException; +import java.util.List; + +/** + * {@link State} interface for set state in Operations, the values must be sorted by byte array to + * be returned. + */ +public interface SetState<K, V> extends State<K, V> { + + List<V> get(K key) throws IOException; + + void retract(K key, V value) throws IOException; + + void add(K key, V value) throws IOException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/State.java b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java new file mode 100644 index 0000000000..e4174a9e21 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/State.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import java.io.IOException; + +/** Interface that different types of state must implement. */ +public interface State<K, V> { + + byte[] serializeKey(K key) throws IOException; + + byte[] serializeValue(V value) throws IOException; + + V deserializeValue(byte[] valueBytes) throws IOException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java new file mode 100644 index 0000000000..478c6f9f3b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/StateFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import org.apache.paimon.data.serializer.Serializer; + +import java.io.Closeable; +import java.io.IOException; + +/** State factory to create {@link State}. */ +public interface StateFactory extends Closeable { + + <K, V> ValueState<K, V> valueState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) + throws IOException; + + <K, V> SetState<K, V> setState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) + throws IOException; + + <K, V> ListState<K, V> listState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) + throws IOException; + + boolean preferBulkLoad(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java new file mode 100644 index 0000000000..eef737078f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueBulkLoader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +/** Value bulk loader to load key values, incoming keys must be sorted. */ +public interface ValueBulkLoader extends BulkLoader { + + void write(byte[] key, byte[] value) throws WriteException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java new file mode 100644 index 0000000000..f04e456b3b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/ValueState.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup; + +import javax.annotation.Nullable; + +import java.io.IOException; + +/** {@link State} interface for single-value state. The value can be deleted or updated. */ +public interface ValueState<K, V> extends State<K, V> { + + @Nullable + V get(K key) throws IOException; + + void put(K key, V value) throws IOException; + + void delete(K key) throws IOException; + + ValueBulkLoader createBulkLoader(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java new file mode 100644 index 0000000000..992fbc1abd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryListState.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup.memory; + +import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.ListBulkLoader; +import org.apache.paimon.lookup.ListState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.paimon.lookup.ByteArray.wrapBytes; + +/** In-memory list state. */ +public class InMemoryListState<K, V> extends InMemoryState<K, V> implements ListState<K, V> { + + private final Map<ByteArray, List<byte[]>> values; + + public InMemoryListState(Serializer<K> keySerializer, Serializer<V> valueSerializer) { + super(keySerializer, valueSerializer); + this.values = new HashMap<>(); + } + + @Override + public void add(K key, V value) throws IOException { + byte[] keyBytes = serializeKey(key); + byte[] valueBytes = serializeValue(value); + values.computeIfAbsent(wrapBytes(keyBytes), k -> new ArrayList<>()).add(valueBytes); + } + + @Override + public List<V> get(K key) throws IOException { + List<byte[]> list = this.values.get(wrapBytes(serializeKey(key))); + List<V> result = new ArrayList<>(); + if (list != null) { + for (byte[] value : list) { + result.add(deserializeValue(value)); + } + } + return result; + } + + @Override + public ListBulkLoader createBulkLoader() { + return new ListBulkLoader() { + + @Override + public void write(byte[] key, List<byte[]> value) { + // copy the list, outside will reuse list + values.put(wrapBytes(key), new ArrayList<>(value)); + } + + @Override + public void finish() {} + }; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java new file mode 100644 index 0000000000..f694a57643 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemorySetState.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup.memory; + +import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.SetState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.apache.paimon.lookup.ByteArray.wrapBytes; + +/** In-memory set state. */ +public class InMemorySetState<K, V> extends InMemoryState<K, V> implements SetState<K, V> { + + private final Map<ByteArray, TreeSet<ByteArray>> values; + + public InMemorySetState(Serializer<K> keySerializer, Serializer<V> valueSerializer) { + super(keySerializer, valueSerializer); + this.values = new HashMap<>(); + } + + @Override + public List<V> get(K key) throws IOException { + Set<ByteArray> set = values.get(wrapBytes(serializeKey(key))); + List<V> result = new ArrayList<>(); + if (set != null) { + for (ByteArray value : set) { + result.add(deserializeValue(value.bytes)); + } + } + return result; + } + + @Override + public void retract(K key, V value) throws IOException { + values.get(wrapBytes(serializeKey(key))).remove(wrapBytes(serializeValue(value))); + } + + @Override + public void add(K key, V value) throws IOException { + byte[] keyBytes = serializeKey(key); + byte[] valueBytes = serializeValue(value); + values.computeIfAbsent(wrapBytes(keyBytes), k -> new TreeSet<>()) + .add(wrapBytes(valueBytes)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java new file mode 100644 index 0000000000..6b79776a70 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryState.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup.memory; + +import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataOutputSerializer; +import org.apache.paimon.lookup.State; + +import java.io.IOException; + +/** In-memory state. */ +public abstract class InMemoryState<K, V> implements State<K, V> { + + protected final Serializer<K> keySerializer; + protected final Serializer<V> valueSerializer; + protected final DataOutputSerializer keyOutView; + protected final DataInputDeserializer valueInputView; + protected final DataOutputSerializer valueOutputView; + + public InMemoryState(Serializer<K> keySerializer, Serializer<V> valueSerializer) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.keyOutView = new DataOutputSerializer(32); + this.valueInputView = new DataInputDeserializer(); + this.valueOutputView = new DataOutputSerializer(32); + } + + @Override + public byte[] serializeKey(K key) throws IOException { + keyOutView.clear(); + keySerializer.serialize(key, keyOutView); + return keyOutView.getCopyOfBuffer(); + } + + @Override + public byte[] serializeValue(V value) throws IOException { + valueOutputView.clear(); + valueSerializer.serialize(value, valueOutputView); + return valueOutputView.getCopyOfBuffer(); + } + + @Override + public V deserializeValue(byte[] valueBytes) throws IOException { + valueInputView.setBuffer(valueBytes); + return valueSerializer.deserialize(valueInputView); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java new file mode 100644 index 0000000000..77aa1cf17e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryStateFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup.memory; + +import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ListState; +import org.apache.paimon.lookup.SetState; +import org.apache.paimon.lookup.StateFactory; +import org.apache.paimon.lookup.ValueState; + +import java.io.IOException; + +/** Factory to create in-memory state. */ +public class InMemoryStateFactory implements StateFactory { + + @Override + public <K, V> ValueState<K, V> valueState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) { + return new InMemoryValueState<>(keySerializer, valueSerializer); + } + + @Override + public <K, V> SetState<K, V> setState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) { + return new InMemorySetState<>(keySerializer, valueSerializer); + } + + @Override + public <K, V> ListState<K, V> listState( + String name, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + long lruCacheSize) { + return new InMemoryListState<>(keySerializer, valueSerializer); + } + + @Override + public boolean preferBulkLoad() { + return false; + } + + @Override + public void close() throws IOException {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java new file mode 100644 index 0000000000..07db364ef1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/memory/InMemoryValueState.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.lookup.memory; + +import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.ValueBulkLoader; +import org.apache.paimon.lookup.ValueState; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.lookup.ByteArray.wrapBytes; + +/** In-memory value state. */ +public class InMemoryValueState<K, V> extends InMemoryState<K, V> implements ValueState<K, V> { + + private final Map<ByteArray, byte[]> values; + + public InMemoryValueState(Serializer<K> keySerializer, Serializer<V> valueSerializer) { + super(keySerializer, valueSerializer); + this.values = new HashMap<>(); + } + + @Override + public @Nullable V get(K key) throws IOException { + byte[] bytes = values.get(wrapBytes(serializeKey(key))); + if (bytes == null) { + return null; + } + return deserializeValue(bytes); + } + + @Override + public void put(K key, V value) throws IOException { + values.put(wrapBytes(serializeKey(key)), serializeValue(value)); + } + + @Override + public void delete(K key) throws IOException { + values.remove(wrapBytes(serializeKey(key))); + } + + @Override + public ValueBulkLoader createBulkLoader() { + return new ValueBulkLoader() { + + @Override + public void write(byte[] key, byte[] value) { + values.put(wrapBytes(key), value); + } + + @Override + public void finish() {} + }; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java similarity index 82% copy from paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java copy to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java index 0bcdbbb5d2..8278376f73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/BulkLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBBulkLoader.java @@ -16,7 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; + +import org.apache.paimon.lookup.ListBulkLoader; +import org.apache.paimon.lookup.ValueBulkLoader; +import org.apache.paimon.utils.ListDelimitedSerializer; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.EnvOptions; @@ -28,14 +32,16 @@ import org.rocksdb.SstFileWriter; import org.rocksdb.TtlDB; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; /** Bulk loader for RocksDB. */ -public class BulkLoader { +public class RocksDBBulkLoader implements ValueBulkLoader, ListBulkLoader { private final String uuid = UUID.randomUUID().toString(); + private final ListDelimitedSerializer listSerializer = new ListDelimitedSerializer(); private final ColumnFamilyHandle columnFamily; private final String path; @@ -49,7 +55,8 @@ public class BulkLoader { private int sstIndex = 0; private long recordNum = 0; - public BulkLoader(RocksDB db, Options options, ColumnFamilyHandle columnFamily, String path) { + public RocksDBBulkLoader( + RocksDB db, Options options, ColumnFamilyHandle columnFamily, String path) { this.db = db; this.isTtlEnabled = db instanceof TtlDB; this.options = options; @@ -58,6 +65,7 @@ public class BulkLoader { this.currentTimeSeconds = (int) (System.currentTimeMillis() / 1000); } + @Override public void write(byte[] key, byte[] value) throws WriteException { try { if (writer == null) { @@ -89,6 +97,17 @@ public class BulkLoader { } } + @Override + public void write(byte[] key, List<byte[]> value) throws WriteException { + byte[] bytes; + try { + bytes = listSerializer.serializeList(value); + } catch (IOException e) { + throw new RuntimeException(e); + } + write(key, bytes); + } + private byte[] appendTimestamp(byte[] value) { byte[] newValue = new byte[value.length + 4]; System.arraycopy(value, 0, newValue, 0, value.length); @@ -99,6 +118,7 @@ public class BulkLoader { return newValue; } + @Override public void finish() { try { if (writer != null) { @@ -115,11 +135,4 @@ public class BulkLoader { throw new RuntimeException(e); } } - - /** Exception during writing. */ - public static class WriteException extends Exception { - public WriteException(Throwable cause) { - super(cause); - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java similarity index 87% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java index 7fe29d209a..70716c40cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBListState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBListState.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ListState; import org.apache.paimon.utils.ListDelimitedSerializer; import org.rocksdb.ColumnFamilyHandle; @@ -29,7 +30,7 @@ import java.util.Collections; import java.util.List; /** RocksDB state for key -> List of value. */ -public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> { +public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> implements ListState<K, V> { private final ListDelimitedSerializer listSerializer = new ListDelimitedSerializer(); @@ -42,6 +43,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> { super(stateFactory, columnFamily, keySerializer, valueSerializer, lruCacheSize); } + @Override public void add(K key, V value) throws IOException { byte[] keyBytes = serializeKey(key); byte[] valueBytes = serializeValue(value); @@ -53,6 +55,7 @@ public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> { cache.invalidate(wrap(keyBytes)); } + @Override public List<V> get(K key) throws IOException { byte[] keyBytes = serializeKey(key); return cache.get( @@ -71,14 +74,4 @@ public class RocksDBListState<K, V> extends RocksDBState<K, V, List<V>> { return rows; }); } - - public byte[] serializeValue(V value) throws IOException { - valueOutputView.clear(); - valueSerializer.serialize(value, valueOutputView); - return valueOutputView.getCopyOfBuffer(); - } - - public byte[] serializeList(List<byte[]> valueList) throws IOException { - return listSerializer.serializeList(valueList); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java similarity index 99% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java index e8c3fecdf8..2cde6d6e7b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBOptions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.annotation.Documentation; import org.apache.paimon.options.ConfigOption; diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java similarity index 95% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java index 5c06cbd51a..9f654e152c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBSetState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBSetState.java @@ -16,9 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.SetState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -32,7 +34,8 @@ import java.util.List; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Rocksdb state for key -> Set values. */ -public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> { +public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> + implements SetState<K, V> { private static final byte[] EMPTY = new byte[0]; @@ -45,6 +48,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> { super(stateFactory, columnFamily, keySerializer, valueSerializer, lruCacheSize); } + @Override public List<V> get(K key) throws IOException { ByteArray keyBytes = wrap(serializeKey(key)); List<byte[]> valueBytes = cache.getIfPresent(keyBytes); @@ -73,6 +77,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> { return values; } + @Override public void retract(K key, V value) throws IOException { try { byte[] bytes = invalidKeyAndGetKVBytes(key, value); @@ -84,6 +89,7 @@ public class RocksDBSetState<K, V> extends RocksDBState<K, V, List<byte[]>> { } } + @Override public void add(K key, V value) throws IOException { try { byte[] bytes = invalidKeyAndGetKVBytes(key, value); diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java similarity index 82% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java index 0181917a7a..62257cfd34 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBState.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.CoreOptions; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.io.DataInputDeserializer; import org.apache.paimon.io.DataOutputSerializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.State; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -37,10 +39,9 @@ import org.rocksdb.WriteOptions; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Arrays; /** Rocksdb state for key value. */ -public abstract class RocksDBState<K, V, CacheV> { +public abstract class RocksDBState<K, V, CacheV> implements State<K, V> { protected final RocksDBStateFactory stateFactory; @@ -85,12 +86,26 @@ public abstract class RocksDBState<K, V, CacheV> { .build(); } + @Override public byte[] serializeKey(K key) throws IOException { keyOutView.clear(); keySerializer.serialize(key, keyOutView); return keyOutView.getCopyOfBuffer(); } + @Override + public byte[] serializeValue(V value) throws IOException { + valueOutputView.clear(); + valueSerializer.serialize(value, valueOutputView); + return valueOutputView.getCopyOfBuffer(); + } + + @Override + public V deserializeValue(byte[] valueBytes) throws IOException { + valueInputView.setBuffer(valueBytes); + return valueSerializer.deserialize(valueInputView); + } + protected ByteArray wrap(byte[] bytes) { return new ByteArray(bytes); } @@ -99,8 +114,8 @@ public abstract class RocksDBState<K, V, CacheV> { return new Reference(bytes); } - public BulkLoader createBulkLoader() { - return new BulkLoader(db, stateFactory.options(), columnFamily, stateFactory.path()); + public RocksDBBulkLoader createBulkLoader() { + return new RocksDBBulkLoader(db, stateFactory.options(), columnFamily, stateFactory.path()); } public static BinaryExternalSortBuffer createBulkLoadSorter( @@ -117,33 +132,6 @@ public abstract class RocksDBState<K, V, CacheV> { options.sequenceFieldSortOrderIsAscending()); } - /** A class wraps byte[] to implement equals and hashCode. */ - protected static class ByteArray { - - protected final byte[] bytes; - - protected ByteArray(byte[] bytes) { - this.bytes = bytes; - } - - @Override - public int hashCode() { - return Arrays.hashCode(bytes); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ByteArray byteArray = (ByteArray) o; - return Arrays.equals(bytes, byteArray.bytes); - } - } - /** A class wraps byte[] to indicate contain or not contain. */ protected static class Reference { diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java similarity index 94% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java index 9168fbf4cb..883c82906b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBStateFactory.java @@ -16,9 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.StateFactory; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; @@ -31,13 +32,12 @@ import org.rocksdb.TtlDB; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Duration; /** Factory to create state. */ -public class RocksDBStateFactory implements Closeable { +public class RocksDBStateFactory implements StateFactory { public static final String MERGE_OPERATOR_NAME = "stringappendtest"; @@ -85,6 +85,7 @@ public class RocksDBStateFactory implements Closeable { return path; } + @Override public <K, V> RocksDBValueState<K, V> valueState( String name, Serializer<K> keySerializer, @@ -95,6 +96,7 @@ public class RocksDBStateFactory implements Closeable { this, createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize); } + @Override public <K, V> RocksDBSetState<K, V> setState( String name, Serializer<K> keySerializer, @@ -105,6 +107,7 @@ public class RocksDBStateFactory implements Closeable { this, createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize); } + @Override public <K, V> RocksDBListState<K, V> listState( String name, Serializer<K> keySerializer, @@ -116,6 +119,11 @@ public class RocksDBStateFactory implements Closeable { this, createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize); } + @Override + public boolean preferBulkLoad() { + return true; + } + private ColumnFamilyHandle createColumnFamily(String name) throws IOException { try { return db.createColumnFamily( diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java similarity index 86% rename from paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java rename to paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java index 444ed5ad06..3d8768a9c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBValueState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/rocksdb/RocksDBValueState.java @@ -16,9 +16,11 @@ * limitations under the License. */ -package org.apache.paimon.lookup; +package org.apache.paimon.lookup.rocksdb; import org.apache.paimon.data.serializer.Serializer; +import org.apache.paimon.lookup.ByteArray; +import org.apache.paimon.lookup.ValueState; import org.rocksdb.ColumnFamilyHandle; @@ -29,7 +31,8 @@ import java.io.IOException; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Rocksdb state for key -> a single value. */ -public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Reference> { +public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Reference> + implements ValueState<K, V> { public RocksDBValueState( RocksDBStateFactory stateFactory, @@ -41,6 +44,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref } @Nullable + @Override public V get(K key) throws IOException { try { Reference valueRef = get(wrap(serializeKey(key))); @@ -60,6 +64,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref return valueRef; } + @Override public void put(K key, V value) throws IOException { checkArgument(value != null); @@ -73,6 +78,7 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref } } + @Override public void delete(K key) throws IOException { try { byte[] keyBytes = serializeKey(key); @@ -85,15 +91,4 @@ public class RocksDBValueState<K, V> extends RocksDBState<K, V, RocksDBState.Ref throw new IOException(e); } } - - public V deserializeValue(byte[] valueBytes) throws IOException { - valueInputView.setBuffer(valueBytes); - return valueSerializer.deserialize(valueInputView); - } - - public byte[] serializeValue(V value) throws IOException { - valueOutputView.clear(); - valueSerializer.serialize(value, valueOutputView); - return valueOutputView.getCopyOfBuffer(); - } } diff --git a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java index 71792962b8..219e7df99c 100644 --- a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java +++ b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java @@ -74,7 +74,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation[] { new OptionsClassLocation("paimon-api", "org.apache.paimon.options"), new OptionsClassLocation("paimon-api", "org.apache.paimon"), - new OptionsClassLocation("paimon-core", "org.apache.paimon.lookup"), + new OptionsClassLocation("paimon-core", "org.apache.paimon.lookup.rocksdb"), new OptionsClassLocation("paimon-core", "org.apache.paimon.jdbc"), new OptionsClassLocation("paimon-core", "org.apache.paimon.table"), new OptionsClassLocation("paimon-core", "org.apache.paimon.iceberg"), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index ed8d9a7ac8..d524920e82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -541,7 +541,10 @@ public class FlinkConnectorOptions { AUTO, /** Use full caching mode. */ - FULL + FULL, + + /** Use in-memory caching mode. */ + MEMORY } /** Watermark emit strategy for scan. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index dec278fb70..e3e15c4ce6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -71,8 +71,8 @@ import static org.apache.paimon.CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; -import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; -import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CACHE_ROWS; +import static org.apache.paimon.lookup.rocksdb.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A lookup {@link TableFunction} for file store. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 4154b6742c..27795a00ff 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -24,9 +24,11 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.lookup.BulkLoader; -import org.apache.paimon.lookup.RocksDBState; -import org.apache.paimon.lookup.RocksDBStateFactory; +import org.apache.paimon.lookup.StateFactory; +import org.apache.paimon.lookup.memory.InMemoryStateFactory; +import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader; +import org.apache.paimon.lookup.rocksdb.RocksDBState; +import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -64,11 +66,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT; +import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY; /** Lookup table of full cache. */ public abstract class FullCacheLookupTable implements LookupTable { + private static final Logger LOG = LoggerFactory.getLogger(FullCacheLookupTable.class); protected final Object lock = new Object(); @@ -79,7 +84,7 @@ public abstract class FullCacheLookupTable implements LookupTable { @Nullable protected final FieldsComparator userDefinedSeqComparator; protected final int appendUdsFieldNumber; - protected RocksDBStateFactory stateFactory; + protected StateFactory stateFactory; @Nullable private ExecutorService refreshExecutor; private final AtomicReference<Exception> cachedException; private final int maxPendingSnapshotCount; @@ -143,11 +148,7 @@ public abstract class FullCacheLookupTable implements LookupTable { } protected void init() throws Exception { - this.stateFactory = - new RocksDBStateFactory( - context.tempPath.toString(), - context.table.coreOptions().toConfiguration(), - null); + this.stateFactory = createStateFactory(); this.refreshExecutor = this.refreshAsync ? Executors.newSingleThreadExecutor( @@ -158,6 +159,16 @@ public abstract class FullCacheLookupTable implements LookupTable { : null; } + private StateFactory createStateFactory() throws IOException { + String diskDir = context.tempPath.toString(); + Options options = context.table.coreOptions().toConfiguration(); + if (options.get(LOOKUP_CACHE_MODE) == MEMORY) { + return new InMemoryStateFactory(); + } else { + return new RocksDBStateFactory(diskDir, options, null); + } + } + protected void bootstrap() throws Exception { Predicate scanPredicate = PredicateBuilder.andNullable(context.tablePredicate, specificPartition); @@ -168,6 +179,11 @@ public abstract class FullCacheLookupTable implements LookupTable { scanPredicate, context.requiredCachedBucketIds, cacheRowFilter); + if (!stateFactory.preferBulkLoad()) { + doRefresh(); + return; + } + BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter( IOManager.create(context.tempPath.toString()), context.table.coreOptions()); @@ -189,7 +205,7 @@ public abstract class FullCacheLookupTable implements LookupTable { while ((row = keyIterator.next(row)) != null) { bulkLoader.write(row.getBinary(0), row.getBinary(1)); } - } catch (BulkLoader.WriteException e) { + } catch (RocksDBBulkLoader.WriteException e) { throw new RuntimeException( "Exception in bulkLoad, the most suspicious reason is that " + "your data contains duplicates, please check your lookup table. ", @@ -331,7 +347,7 @@ public abstract class FullCacheLookupTable implements LookupTable { /** Bulk loader for the table. */ public interface TableBulkLoader { - void write(byte[] key, byte[] value) throws BulkLoader.WriteException, IOException; + void write(byte[] key, byte[] value) throws RocksDBBulkLoader.WriteException, IOException; void finish() throws IOException; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 63af4f3506..546bc5a60a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; -import org.apache.paimon.lookup.BulkLoader; -import org.apache.paimon.lookup.RocksDBListState; +import org.apache.paimon.lookup.ListBulkLoader; +import org.apache.paimon.lookup.ListState; +import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.KeyProjectedRow; @@ -40,7 +41,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { private final KeyProjectedRow joinKeyRow; - private RocksDBListState<InternalRow, InternalRow> state; + private ListState<InternalRow, InternalRow> state; public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) { super(context); @@ -105,7 +106,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { @Override public TableBulkLoader createBulkLoader() { - BulkLoader bulkLoader = state.createBulkLoader(); + ListBulkLoader bulkLoader = state.createBulkLoader(); return new TableBulkLoader() { private final List<byte[]> values = new ArrayList<>(); @@ -113,7 +114,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { private byte[] currentKey; @Override - public void write(byte[] key, byte[] value) throws IOException { + public void write(byte[] key, byte[] value) { if (currentKey != null && !Arrays.equals(key, currentKey)) { flush(); } @@ -122,16 +123,16 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable { } @Override - public void finish() throws IOException { + public void finish() { flush(); bulkLoader.finish(); } - private void flush() throws IOException { + private void flush() { if (currentKey != null && values.size() > 0) { try { - bulkLoader.write(currentKey, state.serializeList(values)); - } catch (BulkLoader.WriteException e) { + bulkLoader.write(currentKey, values); + } catch (RocksDBBulkLoader.WriteException e) { throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index 2a3099e9a6..485a9cfaa6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -20,8 +20,9 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; -import org.apache.paimon.lookup.BulkLoader; -import org.apache.paimon.lookup.RocksDBValueState; +import org.apache.paimon.lookup.ValueBulkLoader; +import org.apache.paimon.lookup.ValueState; +import org.apache.paimon.lookup.rocksdb.RocksDBBulkLoader; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowKind; @@ -44,7 +45,7 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable { @Nullable private final ProjectedRow keyRearrange; - protected RocksDBValueState<InternalRow, InternalRow> tableState; + protected ValueState<InternalRow, InternalRow> tableState; public PrimaryKeyLookupTable(Context context, long lruCacheSize, List<String> joinKey) { super(context); @@ -129,12 +130,12 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable { @Override public TableBulkLoader createBulkLoader() { - BulkLoader bulkLoader = tableState.createBulkLoader(); + ValueBulkLoader bulkLoader = tableState.createBulkLoader(); return new TableBulkLoader() { @Override public void write(byte[] key, byte[] value) - throws BulkLoader.WriteException, IOException { + throws RocksDBBulkLoader.WriteException, IOException { bulkLoader.write(key, value); bulkLoadWritePlus(key, value); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 11c9cba24b..668570d77d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -20,7 +20,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalSerializers; -import org.apache.paimon.lookup.RocksDBSetState; +import org.apache.paimon.lookup.SetState; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.KeyProjectedRow; @@ -35,7 +35,7 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable { private final KeyProjectedRow secKeyRow; - private RocksDBSetState<InternalRow, InternalRow> indexState; + private SetState<InternalRow, InternalRow> indexState; public SecondaryIndexLookupTable(Context context, long lruCacheSize) { super(context, lruCacheSize / 2, context.table.primaryKeys()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index dea30d4d13..652ef5bd9c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -57,8 +57,8 @@ public class LookupJoinITCase extends CatalogITCaseBase { + "PARTITIONED BY (`i`) WITH ('continuous.discovery-interval'='1 ms' %s)"; String fullOption = ", 'lookup.cache' = 'full'"; - String lruOption = ", 'changelog-producer'='lookup'"; + String memoryOption = ", 'lookup.cache' = 'memory'"; switch (cacheMode) { case FULL: @@ -69,6 +69,10 @@ public class LookupJoinITCase extends CatalogITCaseBase { tEnv.executeSql(String.format(dim, lruOption)); tEnv.executeSql(String.format(partitioned, lruOption)); break; + case MEMORY: + tEnv.executeSql(String.format(dim, memoryOption)); + tEnv.executeSql(String.format(partitioned, memoryOption)); + break; default: throw new UnsupportedOperationException(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java index dcbc405d31..4f7ff334f6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java @@ -26,7 +26,7 @@ import org.apache.paimon.flink.FlinkRowData; import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.LocalQueryExecutor; import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.QueryExecutor; import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable.RemoteQueryExecutor; -import org.apache.paimon.lookup.RocksDBOptions; +import org.apache.paimon.lookup.rocksdb.RocksDBOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 88b9471330..4a3ca45c3b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -42,6 +42,8 @@ import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -50,15 +52,15 @@ import org.apache.paimon.utils.SortUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -71,21 +73,34 @@ import java.util.concurrent.TimeoutException; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; +import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.FULL; +import static org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode.MEMORY; import static org.apache.paimon.types.DataTypes.INT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link LookupTable}. */ +@ExtendWith(ParameterizedTestExtension.class) public class LookupTableTest extends TableTestBase { - @TempDir Path tempDir; + private final boolean inMemory; + @TempDir Path tempDir; private RowType rowType; - private IOManager ioManager; - private FullCacheLookupTable table; + public LookupTableTest(boolean inMemory) { + this.inMemory = inMemory; + } + + @SuppressWarnings("unused") + @Parameters(name = "{0}") + public static List<Boolean> getVarSeg() { + return Arrays.asList(true, false); + } + @BeforeEach public void before() throws IOException { this.rowType = RowType.of(new IntType(), new IntType(), new IntType()); @@ -100,6 +115,9 @@ public class LookupTableTest extends TableTestBase { } private FileStoreTable createTable(List<String> primaryKeys, Options options) throws Exception { + if (inMemory) { + options.set(LOOKUP_CACHE_MODE, MEMORY); + } Identifier identifier = new Identifier("default", "t"); Schema schema = new Schema( @@ -112,7 +130,7 @@ public class LookupTableTest extends TableTestBase { return (FileStoreTable) catalog.getTable(identifier); } - @Test + @TestTemplate public void testPkTable() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); FullCacheLookupTable.Context context = @@ -132,7 +150,7 @@ public class LookupTableTest extends TableTestBase { table.open(); // test bulk load error - { + if (!inMemory) { TableBulkLoader bulkLoader = table.createBulkLoader(); bulkLoader.write(new byte[] {1}, new byte[] {1}); assertThatThrownBy(() -> bulkLoader.write(new byte[] {1}, new byte[] {2})) @@ -172,7 +190,7 @@ public class LookupTableTest extends TableTestBase { assertThat(table.get(row(3))).hasSize(0); } - @Test + @TestTemplate public void testPkTableWithSequenceField() throws Exception { Options options = new Options(); options.set(CoreOptions.SEQUENCE_FIELD, "f1"); @@ -223,7 +241,7 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(0), 1, 22, 222); } - @Test + @TestTemplate public void testPkTableWithSequenceFieldProjection() throws Exception { Options options = new Options(); options.set(CoreOptions.SEQUENCE_FIELD, "f2"); @@ -263,7 +281,7 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(0), 1, 22); } - @Test + @TestTemplate public void testPkTablePkFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); FullCacheLookupTable.Context context = @@ -299,7 +317,7 @@ public class LookupTableTest extends TableTestBase { assertThat(table.get(row(3))).hasSize(0); } - @Test + @TestTemplate public void testPkTableNonPkFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); FullCacheLookupTable.Context context = @@ -328,7 +346,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result).hasSize(0); } - @Test + @TestTemplate public void testSecKeyTable() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); FullCacheLookupTable.Context context = @@ -376,7 +394,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result.stream().map(row -> row.getInt(0))).contains(1); } - @Test + @TestTemplate public void testSecKeyTableWithSequenceField() throws Exception { Options options = new Options(); options.set(CoreOptions.SEQUENCE_FIELD, "f1"); @@ -431,7 +449,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result.stream().map(row -> row.getInt(2))).doesNotContain(333); } - @Test + @TestTemplate public void testSecKeyTablePkFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); FullCacheLookupTable.Context context = @@ -476,7 +494,7 @@ public class LookupTableTest extends TableTestBase { assertThat(table.get(row(33))).hasSize(0); } - @Test + @TestTemplate public void testNoPrimaryKeyTable() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options()); FullCacheLookupTable.Context context = @@ -524,7 +542,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result.stream().map(row -> row.getInt(0))).contains(1); } - @Test + @TestTemplate public void testNoPrimaryKeyTableFilter() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options()); FullCacheLookupTable.Context context = @@ -559,7 +577,7 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(1), 1, 11, 111); } - @Test + @TestTemplate public void testPkTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); writeWithBucketAssigner( @@ -600,7 +618,7 @@ public class LookupTableTest extends TableTestBase { assertThat(res).isEmpty(); } - @Test + @TestTemplate public void testRefreshExecutorRebuildAfterReopen() throws Exception { Options options = new Options(); options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true); @@ -635,7 +653,7 @@ public class LookupTableTest extends TableTestBase { assertRow(res.get(0), 1, 22, 222); } - @Test + @TestTemplate public void testNoPkTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options()); writeWithBucketAssigner( @@ -676,7 +694,7 @@ public class LookupTableTest extends TableTestBase { assertThat(res).isEmpty(); } - @Test + @TestTemplate public void testSecKeyTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); writeWithBucketAssigner( @@ -717,7 +735,7 @@ public class LookupTableTest extends TableTestBase { assertThat(res).isEmpty(); } - @Test + @TestTemplate public void testPartialLookupTable() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = @@ -750,7 +768,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result).hasSize(0); } - @Test + @TestTemplate public void testPartialLookupTableWithRowFilter() throws Exception { Options options = new Options(); options.set(CoreOptions.BUCKET.key(), "2"); @@ -771,7 +789,7 @@ public class LookupTableTest extends TableTestBase { assertThat(result).isEmpty(); } - @Test + @TestTemplate public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = @@ -803,7 +821,7 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(0), 22, -2); } - @Test + @TestTemplate public void testPartialLookupTableJoinKeyOrder() throws Exception { FileStoreTable dimTable = createDimTable(); PrimaryKeyPartialLookupTable table = @@ -835,9 +853,17 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(0), 22, -2); } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception { + @TestTemplate + public void testPKLookupTableNotRefreshAsync() throws Exception { + innerTestPKLookupTableRefreshAsync(false); + } + + @TestTemplate + public void testPKLookupTableRefreshAsync() throws Exception { + innerTestPKLookupTableRefreshAsync(true); + } + + private void innerTestPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception { Options options = new Options(); options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, refreshAsync); FileStoreTable storeTable = createTable(singletonList("f0"), options); @@ -902,13 +928,11 @@ public class LookupTableTest extends TableTestBase { table.close(); } - @Test + @TestTemplate public void testFullCacheLookupTableWithForceLookup() throws Exception { Options options = new Options(); options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); - options.set( - FlinkConnectorOptions.LOOKUP_CACHE_MODE, - FlinkConnectorOptions.LookupCacheMode.FULL); + options.set(LOOKUP_CACHE_MODE, inMemory ? MEMORY : FULL); options.set(CoreOptions.WRITE_ONLY, true); options.set(CoreOptions.FORCE_LOOKUP, true); options.set(CoreOptions.BUCKET, 1); @@ -964,7 +988,7 @@ public class LookupTableTest extends TableTestBase { assertRow(result.get(0), 1, 22, 222); // new value } - @Test + @TestTemplate public void testPartialLookupTableWithForceLookup() throws Exception { Options options = new Options(); options.set(CoreOptions.MERGE_ENGINE, CoreOptions.MergeEngine.PARTIAL_UPDATE); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java index 7a942ddb56..d98d199623 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/RocksDBListStateTest.java @@ -24,8 +24,8 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.lookup.RocksDBListState; -import org.apache.paimon.lookup.RocksDBStateFactory; +import org.apache.paimon.lookup.rocksdb.RocksDBListState; +import org.apache.paimon.lookup.rocksdb.RocksDBStateFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind;
