This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 5af03f5fb4d27f1e01b758391544edae4c8fafb1 Author: Jingsong <[email protected]> AuthorDate: Tue Jan 30 18:47:58 2024 +0800 [core] Integrate compression to Hash Lookup This closes #2820 --- .../shortcodes/generated/core_configuration.html | 8 ++++- .../benchmark/LookupBloomFilterBenchmark.java | 2 +- paimon-common/pom.xml | 5 +++ .../main/java/org/apache/paimon/CoreOptions.java | 9 ++++- .../compression/BlockCompressionFactory.java | 8 ++--- .../java/org/apache/paimon/io/PageFileInput.java | 23 +++++++++++-- .../java/org/apache/paimon/io/PageFileOutput.java | 13 ++++++-- .../org/apache/paimon/lookup/hash/HashContext.java | 23 ++++++++++++- .../paimon/lookup/hash/HashLookupStoreFactory.java | 12 +++++-- .../paimon/lookup/hash/HashLookupStoreReader.java | 15 +++++++-- .../paimon/lookup/hash/HashLookupStoreWriter.java | 38 ++++++++++++++-------- .../io/cache/FileBasedRandomInputViewTest.java | 3 +- .../lookup/hash/HashLookupStoreFactoryTest.java | 20 ++++++++---- .../paimon/utils/FileBasedBloomFilterTest.java | 6 +++- .../paimon/operation/KeyValueFileStoreWrite.java | 6 ++-- .../apache/paimon/table/query/LocalTableQuery.java | 3 +- .../paimon/mergetree/ContainsLevelsTest.java | 3 +- .../apache/paimon/mergetree/LookupLevelsTest.java | 3 +- .../org/apache/paimon/flink/LookupJoinITCase.java | 9 +++++ 19 files changed, 164 insertions(+), 45 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 29d6157e6..8581a7d13 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -46,7 +46,7 @@ under the License. </tr> <tr> <td><h5>cache-page-size</h5></td> - <td style="word-wrap: break-word;">16 kb</td> + <td style="word-wrap: break-word;">64 kb</td> <td>MemorySize</td> <td>Memory page size for caching.</td> </tr> @@ -273,6 +273,12 @@ Mainly to resolve data skew on primary keys. We recommend starting with 64 mb wh <td>MemorySize</td> <td>Max memory size for lookup cache.</td> </tr> + <tr> + <td><h5>lookup.cache-spill-compression</h5></td> + <td style="word-wrap: break-word;">"lz4"</td> + <td>String</td> + <td>Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.</td> + </tr> <tr> <td><h5>lookup.cache.bloom.filter.enabled</h5></td> <td style="word-wrap: break-word;">true</td> diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java index e878301d2..159cc43d0 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/LookupBloomFilterBenchmark.java @@ -130,7 +130,7 @@ public class LookupBloomFilterBenchmark { Arrays.fill(value, (byte) 1); HashLookupStoreFactory factory = new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 1024, 0.75); + new CacheManager(MemorySize.ofMebiBytes(10)), 16 * 1024, 0.75, "none"); File file = new File(tempDir.toFile(), UUID.randomUUID().toString()); HashLookupStoreWriter writer = factory.createWriter(file, filter); diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml index d6d40a9fc..22d51c924 100644 --- a/paimon-common/pom.xml +++ b/paimon-common/pom.xml @@ -296,6 +296,11 @@ under the License. <pattern>it.unimi.dsi.fastutil</pattern> <shadedPattern>org.apache.paimon.shade.it.unimi.dsi.fastutil</shadedPattern> </relocation> + <!-- Same to paimon-format. --> + <relocation> + <pattern>io.airlift</pattern> + <shadedPattern>org.apache.paimon.shade.io.airlift</shadedPattern> + </relocation> </relocations> <minimizeJar>true</minimizeJar> </configuration> diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index ba238ac99..3e37a0935 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -366,7 +366,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption<MemorySize> CACHE_PAGE_SIZE = key("cache-page-size") .memoryType() - .defaultValue(MemorySize.parse("16 kb")) + .defaultValue(MemorySize.parse("64 kb")) .withDescription("Memory page size for caching."); public static final ConfigOption<MemorySize> TARGET_FILE_SIZE = @@ -729,6 +729,13 @@ public class CoreOptions implements Serializable { .withDescription( "Max disk size for lookup cache, you can use this option to limit the use of local disks."); + public static final ConfigOption<String> LOOKUP_CACHE_SPILL_COMPRESSION = + key("lookup.cache-spill-compression") + .stringType() + .defaultValue("lz4") + .withDescription( + "Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported."); + public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE = key("lookup.cache-max-memory-size") .memoryType() diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java index f90d4e039..621370b3d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java @@ -37,12 +37,10 @@ public interface BlockCompressionFactory { /** Creates {@link BlockCompressionFactory} according to the configuration. */ @Nullable - static BlockCompressionFactory create(@Nullable String compression) { - if (compression == null) { - return null; - } - + static BlockCompressionFactory create(String compression) { switch (compression.toUpperCase()) { + case "NONE": + return null; case "LZ4": return new Lz4BlockCompressionFactory(); case "LZO": diff --git a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java index da699304f..2e5d73ac3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java @@ -18,6 +18,10 @@ package org.apache.paimon.io; +import org.apache.paimon.compression.BlockCompressionFactory; + +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -36,8 +40,23 @@ public interface PageFileInput extends Closeable { byte[] readPosition(long position, int length) throws IOException; - static PageFileInput create(File file, int pageSize) throws IOException { + static PageFileInput create( + File file, + int pageSize, + @Nullable BlockCompressionFactory compressionFactory, + long uncompressBytes, + @Nullable long[] compressPagePositions) + throws IOException { RandomAccessFile accessFile = new RandomAccessFile(file, "r"); - return new UncompressedPageFileInput(accessFile, pageSize); + if (compressionFactory == null) { + return new UncompressedPageFileInput(accessFile, pageSize); + } else { + return new CompressedPageFileInput( + accessFile, + pageSize, + compressionFactory, + uncompressBytes, + compressPagePositions); + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java index 1a623ad90..d732e21b0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java @@ -18,6 +18,10 @@ package org.apache.paimon.io; +import org.apache.paimon.compression.BlockCompressionFactory; + +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.File; import java.io.IOException; @@ -27,7 +31,12 @@ public interface PageFileOutput extends Closeable { void write(byte[] bytes, int off, int len) throws IOException; - static PageFileOutput create(File file) throws IOException { - return new UncompressedPageFileOutput(file); + static PageFileOutput create( + File file, int pageSize, @Nullable BlockCompressionFactory compressionFactory) + throws IOException { + if (compressionFactory == null) { + return new UncompressedPageFileOutput(file); + } + return new CompressedPageFileOutput(file, pageSize, compressionFactory); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java index d7d90badd..3741fc6fd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java @@ -41,6 +41,9 @@ public class HashContext implements Context { // Offset of the data for different key length final long[] dataOffsets; + final long uncompressBytes; + final long[] compressPages; + public HashContext( boolean bloomFilterEnabled, long bloomFilterExpectedEntries, @@ -49,7 +52,9 @@ public class HashContext implements Context { int[] slotSizes, int[] slots, int[] indexOffsets, - long[] dataOffsets) { + long[] dataOffsets, + long uncompressBytes, + long[] compressPages) { this.bloomFilterEnabled = bloomFilterEnabled; this.bloomFilterExpectedEntries = bloomFilterExpectedEntries; this.bloomFilterBytes = bloomFilterBytes; @@ -58,5 +63,21 @@ public class HashContext implements Context { this.slots = slots; this.indexOffsets = indexOffsets; this.dataOffsets = dataOffsets; + this.uncompressBytes = uncompressBytes; + this.compressPages = compressPages; + } + + public HashContext copy(long uncompressBytes, long[] compressPages) { + return new HashContext( + bloomFilterEnabled, + bloomFilterExpectedEntries, + bloomFilterBytes, + keyCounts, + slotSizes, + slots, + indexOffsets, + dataOffsets, + uncompressBytes, + compressPages); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java index c4dbcffb9..7ece51734 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java @@ -18,6 +18,7 @@ package org.apache.paimon.lookup.hash; +import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.LookupStoreFactory; import org.apache.paimon.utils.BloomFilter; @@ -33,21 +34,26 @@ public class HashLookupStoreFactory implements LookupStoreFactory { private final CacheManager cacheManager; private final int cachePageSize; private final double loadFactor; + @Nullable private final BlockCompressionFactory compressionFactory; - public HashLookupStoreFactory(CacheManager cacheManager, int cachePageSize, double loadFactor) { + public HashLookupStoreFactory( + CacheManager cacheManager, int cachePageSize, double loadFactor, String compression) { this.cacheManager = cacheManager; this.cachePageSize = cachePageSize; this.loadFactor = loadFactor; + this.compressionFactory = BlockCompressionFactory.create(compression); } @Override public HashLookupStoreReader createReader(File file, Context context) throws IOException { - return new HashLookupStoreReader(file, (HashContext) context, cacheManager, cachePageSize); + return new HashLookupStoreReader( + file, (HashContext) context, cacheManager, cachePageSize, compressionFactory); } @Override public HashLookupStoreWriter createWriter(File file, @Nullable BloomFilter.Builder bloomFilter) throws IOException { - return new HashLookupStoreWriter(loadFactor, file, bloomFilter); + return new HashLookupStoreWriter( + loadFactor, file, bloomFilter, compressionFactory, cachePageSize); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java index 853cd4049..e08776823 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java @@ -18,6 +18,7 @@ package org.apache.paimon.lookup.hash; +import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.io.PageFileInput; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.io.cache.FileBasedRandomInputView; @@ -63,7 +64,11 @@ public class HashLookupStoreReader @Nullable private FileBasedBloomFilter bloomFilter; HashLookupStoreReader( - File file, HashContext context, CacheManager cacheManager, int cachePageSize) + File file, + HashContext context, + CacheManager cacheManager, + int cachePageSize, + @Nullable BlockCompressionFactory compressionFactory) throws IOException { // File path if (!file.exists()) { @@ -83,7 +88,13 @@ public class HashLookupStoreReader LOG.info("Opening file {}", file.getName()); - PageFileInput fileInput = PageFileInput.create(file, cachePageSize); + PageFileInput fileInput = + PageFileInput.create( + file, + cachePageSize, + compressionFactory, + context.uncompressBytes, + context.compressPages); inputView = new FileBasedRandomInputView(fileInput, cacheManager); if (context.bloomFilterEnabled) { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java index 8853ab827..ea02825b0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java @@ -18,6 +18,8 @@ package org.apache.paimon.lookup.hash; +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.io.CompressedPageFileOutput; import org.apache.paimon.io.PageFileOutput; import org.apache.paimon.lookup.LookupStoreFactory.Context; import org.apache.paimon.lookup.LookupStoreWriter; @@ -81,10 +83,20 @@ public class HashLookupStoreWriter implements LookupStoreWriter { @Nullable private final BloomFilter.Builder bloomFilter; - HashLookupStoreWriter(double loadFactor, File file, @Nullable BloomFilter.Builder bloomFilter) + @Nullable private final BlockCompressionFactory compressionFactory; + private final int compressPageSize; + + HashLookupStoreWriter( + double loadFactor, + File file, + @Nullable BloomFilter.Builder bloomFilter, + @Nullable BlockCompressionFactory compressionFactory, + int compressPageSize) throws IOException { this.loadFactor = loadFactor; this.outputFile = file; + this.compressionFactory = compressionFactory; + this.compressPageSize = compressPageSize; if (loadFactor <= 0.0 || loadFactor >= 1.0) { throw new IllegalArgumentException( "Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); @@ -187,7 +199,9 @@ public class HashLookupStoreWriter implements LookupStoreWriter { new int[keyCounts.length], new int[keyCounts.length], new int[keyCounts.length], - new long[keyCounts.length]); + new long[keyCounts.length], + 0, + null); long indexesLength = bloomFilterBytes; long datasLength = 0; @@ -223,7 +237,9 @@ public class HashLookupStoreWriter implements LookupStoreWriter { context.dataOffsets[i] = indexesLength + context.dataOffsets[i]; } - try (PageFileOutput output = PageFileOutput.create(outputFile)) { + PageFileOutput output = + PageFileOutput.create(outputFile, compressPageSize, compressionFactory); + try { // Write bloom filter file if (bloomFilter != null) { File bloomFilterFile = new File(tempFolder, "bloomfilter.dat"); @@ -257,11 +273,17 @@ public class HashLookupStoreWriter implements LookupStoreWriter { mergeFiles(filesToMerge, output); } finally { cleanup(filesToMerge); + output.close(); } LOG.info( "Compressed Total store size: {} Mb", new DecimalFormat("#,##0.0").format(outputFile.length() / (1024 * 1024))); + + if (output instanceof CompressedPageFileOutput) { + CompressedPageFileOutput compressedOutput = (CompressedPageFileOutput) output; + context = context.copy(compressedOutput.uncompressBytes(), compressedOutput.pages()); + } return context; } @@ -472,14 +494,4 @@ public class HashLookupStoreWriter implements LookupStoreWriter { } return dos; } - - private int getNumKeyCount() { - int res = 0; - for (int count : keyCounts) { - if (count != 0) { - res++; - } - } - return res; - } } diff --git a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java index a0be24d8f..f9f11953c 100644 --- a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java @@ -68,7 +68,8 @@ public class FileBasedRandomInputViewTest { File file = writeFile(bytes); CacheManager cacheManager = new CacheManager(MemorySize.ofKibiBytes(128)); FileBasedRandomInputView view = - new FileBasedRandomInputView(PageFileInput.create(file, 1024), cacheManager); + new FileBasedRandomInputView( + PageFileInput.create(file, 1024, null, 0, null), cacheManager); // read first one // this assertThatCode check the ConcurrentModificationException is not threw. diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java index c7f386950..e29acddd8 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java @@ -43,7 +43,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -65,25 +64,31 @@ public class HashLookupStoreFactoryTest { private final int pageSize = 1024; private final boolean enableBloomFilter; + private final String compress; private File file; private HashLookupStoreFactory factory; public HashLookupStoreFactoryTest(List<Object> var) { this.enableBloomFilter = (Boolean) var.get(0); + this.compress = (String) var.get(1); } @SuppressWarnings("unused") - @Parameters(name = "enableBf-{0}") + @Parameters(name = "enableBf&compress-{0}") public static List<List<Object>> getVarSeg() { - return Arrays.asList(Collections.singletonList(true), Collections.singletonList(false)); + return Arrays.asList( + Arrays.asList(true, "none"), + Arrays.asList(false, "none"), + Arrays.asList(false, "lz4"), + Arrays.asList(true, "lz4")); } @BeforeEach public void setUp() throws IOException { this.factory = new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d); + new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); this.file = new File(tempDir.toFile(), UUID.randomUUID().toString()); if (!file.createNewFile()) { throw new IOException("Can not create file: " + file); @@ -206,7 +211,7 @@ public class HashLookupStoreFactoryTest { factory = new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d); + new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); Context context = writeStore(factory, file, keys, values); @@ -236,7 +241,7 @@ public class HashLookupStoreFactoryTest { factory = new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d); + new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); Context context = writeStore(file, keys, values); @@ -307,7 +312,8 @@ public class HashLookupStoreFactoryTest { // Read factory = - new HashLookupStoreFactory(new CacheManager(new MemorySize(8096)), pageSize, 0.75d); + new HashLookupStoreFactory( + new CacheManager(new MemorySize(8096)), pageSize, 0.75d, compress); HashLookupStoreReader reader = factory.createReader(file, context); for (int i = 0; i < keys.length; i++) { assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java index bf43680c7..03817cce0 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java @@ -53,7 +53,11 @@ public class FileBasedBloomFilterTest { CacheManager cacheManager = new CacheManager(MemorySize.ofMebiBytes(1)); FileBasedBloomFilter filter = new FileBasedBloomFilter( - PageFileInput.create(file, 1000), cacheManager, 100, 0, 1000); + PageFileInput.create(file, 1024, null, 0, null), + cacheManager, + 100, + 0, + 1000); Arrays.stream(inputs) .forEach(i -> Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 9ae8b4ccd..2b2d3f927 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -288,7 +288,8 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { new HashLookupStoreFactory( cacheManager, this.options.cachePageSize(), - options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)), + options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR), + options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), bfGenerator(options)); @@ -312,7 +313,8 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { new HashLookupStoreFactory( cacheManager, this.options.cachePageSize(), - options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)), + options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR), + options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)), options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE), bfGenerator(options)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 2009932c5..b8a0a4e88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -85,7 +85,8 @@ public class LocalTableQuery implements TableQuery { new HashLookupStoreFactory( new CacheManager(options.lookupCacheMaxMemory()), options.cachePageSize(), - options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)); + options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR), + options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)); if (options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP) { startLevel = 1; diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index e4d4e83b0..6db0abc90 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -186,7 +186,8 @@ public class ContainsLevelsTest { .createRecordReader( 0, file.fileName(), file.fileSize(), file.level()), () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), - new HashLookupStoreFactory(new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75), + new HashLookupStoreFactory( + new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), Duration.ofHours(1), maxDiskSize, rowCount -> BloomFilter.builder(rowCount, 0.01)); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 69dcf5062..a959d3413 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -226,7 +226,8 @@ public class LookupLevelsTest { .createRecordReader( 0, file.fileName(), file.fileSize(), file.level()), () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), - new HashLookupStoreFactory(new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75), + new HashLookupStoreFactory( + new CacheManager(MemorySize.ofMebiBytes(1)), 2048, 0.75, "none"), Duration.ofHours(1), maxDiskSize, rowCount -> BloomFilter.builder(rowCount, 0.05)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index de1316fe4..c0e6bea9a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -524,11 +524,17 @@ public class LookupJoinITCase extends CatalogITCaseBase { @Test public void testLookupPartialUpdate() throws Exception { + testLookupPartialUpdate("none"); + testLookupPartialUpdate("zstd"); + } + + private void testLookupPartialUpdate(String compression) throws Exception { sql( "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH" + " ('merge-engine'='partial-update'," + " 'changelog-producer'='full-compaction'," + " 'changelog-producer.compaction-interval'='1 s'," + + String.format(" 'lookup.cache-spill-compression'='%s',", compression) + " 'continuous.discovery-interval'='10 ms')"); sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS INT))"); String query = @@ -543,6 +549,9 @@ public class LookupJoinITCase extends CatalogITCaseBase { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 11, 111, 1111)); iterator.close(); + + sql("DROP TABLE DIM2"); + sql("TRUNCATE TABLE T"); } @ParameterizedTest
