This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 5af03f5fb4d27f1e01b758391544edae4c8fafb1
Author: Jingsong <[email protected]>
AuthorDate: Tue Jan 30 18:47:58 2024 +0800

    [core] Integrate compression to Hash Lookup
    
    This closes #2820
---
 .../shortcodes/generated/core_configuration.html   |  8 ++++-
 .../benchmark/LookupBloomFilterBenchmark.java      |  2 +-
 paimon-common/pom.xml                              |  5 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  9 ++++-
 .../compression/BlockCompressionFactory.java       |  8 ++---
 .../java/org/apache/paimon/io/PageFileInput.java   | 23 +++++++++++--
 .../java/org/apache/paimon/io/PageFileOutput.java  | 13 ++++++--
 .../org/apache/paimon/lookup/hash/HashContext.java | 23 ++++++++++++-
 .../paimon/lookup/hash/HashLookupStoreFactory.java | 12 +++++--
 .../paimon/lookup/hash/HashLookupStoreReader.java  | 15 +++++++--
 .../paimon/lookup/hash/HashLookupStoreWriter.java  | 38 ++++++++++++++--------
 .../io/cache/FileBasedRandomInputViewTest.java     |  3 +-
 .../lookup/hash/HashLookupStoreFactoryTest.java    | 20 ++++++++----
 .../paimon/utils/FileBasedBloomFilterTest.java     |  6 +++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  6 ++--
 .../apache/paimon/table/query/LocalTableQuery.java |  3 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |  3 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |  3 +-
 .../org/apache/paimon/flink/LookupJoinITCase.java  |  9 +++++
 19 files changed, 164 insertions(+), 45 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 29d6157e6..8581a7d13 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -46,7 +46,7 @@ under the License.
         </tr>
         <tr>
             <td><h5>cache-page-size</h5></td>
-            <td style="word-wrap: break-word;">16 kb</td>
+            <td style="word-wrap: break-word;">64 kb</td>
             <td>MemorySize</td>
             <td>Memory page size for caching.</td>
         </tr>
@@ -273,6 +273,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>MemorySize</td>
             <td>Max memory size for lookup cache.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.cache-spill-compression</h5></td>
+            <td style="word-wrap: break-word;">"lz4"</td>
+            <td>String</td>
+            <td>Spill compression for lookup cache, currently none, lz4, lzo 
and zstd are supported.</td>
+        </tr>
         <tr>
             <td><h5>lookup.cache.bloom.filter.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java
index e878301d2..159cc43d0 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java
@@ -130,7 +130,7 @@ public class LookupBloomFilterBenchmark {
         Arrays.fill(value, (byte) 1);
         HashLookupStoreFactory factory =
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 
1024, 0.75);
+                        new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 
1024, 0.75, "none");
 
         File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
         HashLookupStoreWriter writer = factory.createWriter(file, filter);
diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index d6d40a9fc..22d51c924 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -296,6 +296,11 @@ under the License.
                                     <pattern>it.unimi.dsi.fastutil</pattern>
                                     
<shadedPattern>org.apache.paimon.shade.it.unimi.dsi.fastutil</shadedPattern>
                                 </relocation>
+                                <!-- Same to paimon-format. -->
+                                <relocation>
+                                    <pattern>io.airlift</pattern>
+                                    
<shadedPattern>org.apache.paimon.shade.io.airlift</shadedPattern>
+                                </relocation>
                             </relocations>
                             <minimizeJar>true</minimizeJar>
                         </configuration>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index ba238ac99..3e37a0935 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -366,7 +366,7 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<MemorySize> CACHE_PAGE_SIZE =
             key("cache-page-size")
                     .memoryType()
-                    .defaultValue(MemorySize.parse("16 kb"))
+                    .defaultValue(MemorySize.parse("64 kb"))
                     .withDescription("Memory page size for caching.");
 
     public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
@@ -729,6 +729,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Max disk size for lookup cache, you can use this 
option to limit the use of local disks.");
 
+    public static final ConfigOption<String> LOOKUP_CACHE_SPILL_COMPRESSION =
+            key("lookup.cache-spill-compression")
+                    .stringType()
+                    .defaultValue("lz4")
+                    .withDescription(
+                            "Spill compression for lookup cache, currently 
none, lz4, lzo and zstd are supported.");
+
     public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
             key("lookup.cache-max-memory-size")
                     .memoryType()
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index f90d4e039..621370b3d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -37,12 +37,10 @@ public interface BlockCompressionFactory {
 
     /** Creates {@link BlockCompressionFactory} according to the 
configuration. */
     @Nullable
-    static BlockCompressionFactory create(@Nullable String compression) {
-        if (compression == null) {
-            return null;
-        }
-
+    static BlockCompressionFactory create(String compression) {
         switch (compression.toUpperCase()) {
+            case "NONE":
+                return null;
             case "LZ4":
                 return new Lz4BlockCompressionFactory();
             case "LZO":
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
index da699304f..2e5d73ac3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.compression.BlockCompressionFactory;
+
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -36,8 +40,23 @@ public interface PageFileInput extends Closeable {
 
     byte[] readPosition(long position, int length) throws IOException;
 
-    static PageFileInput create(File file, int pageSize) throws IOException {
+    static PageFileInput create(
+            File file,
+            int pageSize,
+            @Nullable BlockCompressionFactory compressionFactory,
+            long uncompressBytes,
+            @Nullable long[] compressPagePositions)
+            throws IOException {
         RandomAccessFile accessFile = new RandomAccessFile(file, "r");
-        return new UncompressedPageFileInput(accessFile, pageSize);
+        if (compressionFactory == null) {
+            return new UncompressedPageFileInput(accessFile, pageSize);
+        } else {
+            return new CompressedPageFileInput(
+                    accessFile,
+                    pageSize,
+                    compressionFactory,
+                    uncompressBytes,
+                    compressPagePositions);
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
index 1a623ad90..d732e21b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
@@ -18,6 +18,10 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.compression.BlockCompressionFactory;
+
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -27,7 +31,12 @@ public interface PageFileOutput extends Closeable {
 
     void write(byte[] bytes, int off, int len) throws IOException;
 
-    static PageFileOutput create(File file) throws IOException {
-        return new UncompressedPageFileOutput(file);
+    static PageFileOutput create(
+            File file, int pageSize, @Nullable BlockCompressionFactory 
compressionFactory)
+            throws IOException {
+        if (compressionFactory == null) {
+            return new UncompressedPageFileOutput(file);
+        }
+        return new CompressedPageFileOutput(file, pageSize, 
compressionFactory);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java
index d7d90badd..3741fc6fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java
@@ -41,6 +41,9 @@ public class HashContext implements Context {
     // Offset of the data for different key length
     final long[] dataOffsets;
 
+    final long uncompressBytes;
+    final long[] compressPages;
+
     public HashContext(
             boolean bloomFilterEnabled,
             long bloomFilterExpectedEntries,
@@ -49,7 +52,9 @@ public class HashContext implements Context {
             int[] slotSizes,
             int[] slots,
             int[] indexOffsets,
-            long[] dataOffsets) {
+            long[] dataOffsets,
+            long uncompressBytes,
+            long[] compressPages) {
         this.bloomFilterEnabled = bloomFilterEnabled;
         this.bloomFilterExpectedEntries = bloomFilterExpectedEntries;
         this.bloomFilterBytes = bloomFilterBytes;
@@ -58,5 +63,21 @@ public class HashContext implements Context {
         this.slots = slots;
         this.indexOffsets = indexOffsets;
         this.dataOffsets = dataOffsets;
+        this.uncompressBytes = uncompressBytes;
+        this.compressPages = compressPages;
+    }
+
+    public HashContext copy(long uncompressBytes, long[] compressPages) {
+        return new HashContext(
+                bloomFilterEnabled,
+                bloomFilterExpectedEntries,
+                bloomFilterBytes,
+                keyCounts,
+                slotSizes,
+                slots,
+                indexOffsets,
+                dataOffsets,
+                uncompressBytes,
+                compressPages);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
index c4dbcffb9..7ece51734 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.utils.BloomFilter;
@@ -33,21 +34,26 @@ public class HashLookupStoreFactory implements 
LookupStoreFactory {
     private final CacheManager cacheManager;
     private final int cachePageSize;
     private final double loadFactor;
+    @Nullable private final BlockCompressionFactory compressionFactory;
 
-    public HashLookupStoreFactory(CacheManager cacheManager, int 
cachePageSize, double loadFactor) {
+    public HashLookupStoreFactory(
+            CacheManager cacheManager, int cachePageSize, double loadFactor, 
String compression) {
         this.cacheManager = cacheManager;
         this.cachePageSize = cachePageSize;
         this.loadFactor = loadFactor;
+        this.compressionFactory = BlockCompressionFactory.create(compression);
     }
 
     @Override
     public HashLookupStoreReader createReader(File file, Context context) 
throws IOException {
-        return new HashLookupStoreReader(file, (HashContext) context, 
cacheManager, cachePageSize);
+        return new HashLookupStoreReader(
+                file, (HashContext) context, cacheManager, cachePageSize, 
compressionFactory);
     }
 
     @Override
     public HashLookupStoreWriter createWriter(File file, @Nullable 
BloomFilter.Builder bloomFilter)
             throws IOException {
-        return new HashLookupStoreWriter(loadFactor, file, bloomFilter);
+        return new HashLookupStoreWriter(
+                loadFactor, file, bloomFilter, compressionFactory, 
cachePageSize);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
index 853cd4049..e08776823 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.compression.BlockCompressionFactory;
 import org.apache.paimon.io.PageFileInput;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.io.cache.FileBasedRandomInputView;
@@ -63,7 +64,11 @@ public class HashLookupStoreReader
     @Nullable private FileBasedBloomFilter bloomFilter;
 
     HashLookupStoreReader(
-            File file, HashContext context, CacheManager cacheManager, int 
cachePageSize)
+            File file,
+            HashContext context,
+            CacheManager cacheManager,
+            int cachePageSize,
+            @Nullable BlockCompressionFactory compressionFactory)
             throws IOException {
         // File path
         if (!file.exists()) {
@@ -83,7 +88,13 @@ public class HashLookupStoreReader
 
         LOG.info("Opening file {}", file.getName());
 
-        PageFileInput fileInput = PageFileInput.create(file, cachePageSize);
+        PageFileInput fileInput =
+                PageFileInput.create(
+                        file,
+                        cachePageSize,
+                        compressionFactory,
+                        context.uncompressBytes,
+                        context.compressPages);
         inputView = new FileBasedRandomInputView(fileInput, cacheManager);
 
         if (context.bloomFilterEnabled) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
index 8853ab827..ea02825b0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.io.CompressedPageFileOutput;
 import org.apache.paimon.io.PageFileOutput;
 import org.apache.paimon.lookup.LookupStoreFactory.Context;
 import org.apache.paimon.lookup.LookupStoreWriter;
@@ -81,10 +83,20 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
 
     @Nullable private final BloomFilter.Builder bloomFilter;
 
-    HashLookupStoreWriter(double loadFactor, File file, @Nullable 
BloomFilter.Builder bloomFilter)
+    @Nullable private final BlockCompressionFactory compressionFactory;
+    private final int compressPageSize;
+
+    HashLookupStoreWriter(
+            double loadFactor,
+            File file,
+            @Nullable BloomFilter.Builder bloomFilter,
+            @Nullable BlockCompressionFactory compressionFactory,
+            int compressPageSize)
             throws IOException {
         this.loadFactor = loadFactor;
         this.outputFile = file;
+        this.compressionFactory = compressionFactory;
+        this.compressPageSize = compressPageSize;
         if (loadFactor <= 0.0 || loadFactor >= 1.0) {
             throw new IllegalArgumentException(
                     "Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
@@ -187,7 +199,9 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
                         new int[keyCounts.length],
                         new int[keyCounts.length],
                         new int[keyCounts.length],
-                        new long[keyCounts.length]);
+                        new long[keyCounts.length],
+                        0,
+                        null);
 
         long indexesLength = bloomFilterBytes;
         long datasLength = 0;
@@ -223,7 +237,9 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
             context.dataOffsets[i] = indexesLength + context.dataOffsets[i];
         }
 
-        try (PageFileOutput output = PageFileOutput.create(outputFile)) {
+        PageFileOutput output =
+                PageFileOutput.create(outputFile, compressPageSize, 
compressionFactory);
+        try {
             // Write bloom filter file
             if (bloomFilter != null) {
                 File bloomFilterFile = new File(tempFolder, "bloomfilter.dat");
@@ -257,11 +273,17 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
             mergeFiles(filesToMerge, output);
         } finally {
             cleanup(filesToMerge);
+            output.close();
         }
 
         LOG.info(
                 "Compressed Total store size: {} Mb",
                 new DecimalFormat("#,##0.0").format(outputFile.length() / 
(1024 * 1024)));
+
+        if (output instanceof CompressedPageFileOutput) {
+            CompressedPageFileOutput compressedOutput = 
(CompressedPageFileOutput) output;
+            context = context.copy(compressedOutput.uncompressBytes(), 
compressedOutput.pages());
+        }
         return context;
     }
 
@@ -472,14 +494,4 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
         }
         return dos;
     }
-
-    private int getNumKeyCount() {
-        int res = 0;
-        for (int count : keyCounts) {
-            if (count != 0) {
-                res++;
-            }
-        }
-        return res;
-    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
index a0be24d8f..f9f11953c 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
@@ -68,7 +68,8 @@ public class FileBasedRandomInputViewTest {
         File file = writeFile(bytes);
         CacheManager cacheManager = new 
CacheManager(MemorySize.ofKibiBytes(128));
         FileBasedRandomInputView view =
-                new FileBasedRandomInputView(PageFileInput.create(file, 1024), 
cacheManager);
+                new FileBasedRandomInputView(
+                        PageFileInput.create(file, 1024, null, 0, null), 
cacheManager);
 
         // read first one
         // this assertThatCode check the ConcurrentModificationException is 
not threw.
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
index c7f386950..e29acddd8 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java
@@ -43,7 +43,6 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -65,25 +64,31 @@ public class HashLookupStoreFactoryTest {
     private final int pageSize = 1024;
 
     private final boolean enableBloomFilter;
+    private final String compress;
 
     private File file;
     private HashLookupStoreFactory factory;
 
     public HashLookupStoreFactoryTest(List<Object> var) {
         this.enableBloomFilter = (Boolean) var.get(0);
+        this.compress = (String) var.get(1);
     }
 
     @SuppressWarnings("unused")
-    @Parameters(name = "enableBf-{0}")
+    @Parameters(name = "enableBf&compress-{0}")
     public static List<List<Object>> getVarSeg() {
-        return Arrays.asList(Collections.singletonList(true), 
Collections.singletonList(false));
+        return Arrays.asList(
+                Arrays.asList(true, "none"),
+                Arrays.asList(false, "none"),
+                Arrays.asList(false, "lz4"),
+                Arrays.asList(true, "lz4"));
     }
 
     @BeforeEach
     public void setUp() throws IOException {
         this.factory =
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d);
+                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d, compress);
         this.file = new File(tempDir.toFile(), UUID.randomUUID().toString());
         if (!file.createNewFile()) {
             throw new IOException("Can not create file: " + file);
@@ -206,7 +211,7 @@ public class HashLookupStoreFactoryTest {
 
         factory =
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d);
+                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d, compress);
 
         Context context = writeStore(factory, file, keys, values);
 
@@ -236,7 +241,7 @@ public class HashLookupStoreFactoryTest {
 
         factory =
                 new HashLookupStoreFactory(
-                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d);
+                        new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 
0.75d, compress);
 
         Context context = writeStore(file, keys, values);
 
@@ -307,7 +312,8 @@ public class HashLookupStoreFactoryTest {
 
         // Read
         factory =
-                new HashLookupStoreFactory(new CacheManager(new 
MemorySize(8096)), pageSize, 0.75d);
+                new HashLookupStoreFactory(
+                        new CacheManager(new MemorySize(8096)), pageSize, 
0.75d, compress);
         HashLookupStoreReader reader = factory.createReader(file, context);
         for (int i = 0; i < keys.length; i++) {
             
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
index bf43680c7..03817cce0 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
@@ -53,7 +53,11 @@ public class FileBasedBloomFilterTest {
         CacheManager cacheManager = new 
CacheManager(MemorySize.ofMebiBytes(1));
         FileBasedBloomFilter filter =
                 new FileBasedBloomFilter(
-                        PageFileInput.create(file, 1000), cacheManager, 100, 
0, 1000);
+                        PageFileInput.create(file, 1024, null, 0, null),
+                        cacheManager,
+                        100,
+                        0,
+                        1000);
 
         Arrays.stream(inputs)
                 .forEach(i -> 
Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 9ae8b4ccd..2b2d3f927 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -288,7 +288,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 new HashLookupStoreFactory(
                         cacheManager,
                         this.options.cachePageSize(),
-                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
+                        
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
                 options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                 options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
                 bfGenerator(options));
@@ -312,7 +313,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 new HashLookupStoreFactory(
                         cacheManager,
                         this.options.cachePageSize(),
-                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
+                        
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
                 options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                 options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
                 bfGenerator(options));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 2009932c5..b8a0a4e88 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -85,7 +85,8 @@ public class LocalTableQuery implements TableQuery {
                 new HashLookupStoreFactory(
                         new CacheManager(options.lookupCacheMaxMemory()),
                         options.cachePageSize(),
-                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR));
+                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
+                        
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION));
 
         if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.LOOKUP) {
             startLevel = 1;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index e4d4e83b0..6db0abc90 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -186,7 +186,8 @@ public class ContainsLevelsTest {
                                 .createRecordReader(
                                         0, file.fileName(), file.fileSize(), 
file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
-                new HashLookupStoreFactory(new 
CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75),
+                new HashLookupStoreFactory(
+                        new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 Duration.ofHours(1),
                 maxDiskSize,
                 rowCount -> BloomFilter.builder(rowCount, 0.01));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index 69dcf5062..a959d3413 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -226,7 +226,8 @@ public class LookupLevelsTest {
                                 .createRecordReader(
                                         0, file.fileName(), file.fileSize(), 
file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
-                new HashLookupStoreFactory(new 
CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75),
+                new HashLookupStoreFactory(
+                        new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 
0.75, "none"),
                 Duration.ofHours(1),
                 maxDiskSize,
                 rowCount -> BloomFilter.builder(rowCount, 0.05));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
index de1316fe4..c0e6bea9a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java
@@ -524,11 +524,17 @@ public class LookupJoinITCase extends CatalogITCaseBase {
 
     @Test
     public void testLookupPartialUpdate() throws Exception {
+        testLookupPartialUpdate("none");
+        testLookupPartialUpdate("zstd");
+    }
+
+    private void testLookupPartialUpdate(String compression) throws Exception {
         sql(
                 "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
                         + " ('merge-engine'='partial-update',"
                         + " 'changelog-producer'='full-compaction',"
                         + " 'changelog-producer.compaction-interval'='1 s',"
+                        + String.format(" 
'lookup.cache-spill-compression'='%s',", compression)
                         + " 'continuous.discovery-interval'='10 ms')");
         sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS 
INT))");
         String query =
@@ -543,6 +549,9 @@ public class LookupJoinITCase extends CatalogITCaseBase {
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 
11, 111, 1111));
 
         iterator.close();
+
+        sql("DROP TABLE DIM2");
+        sql("TRUNCATE TABLE T");
     }
 
     @ParameterizedTest

Reply via email to