This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85f563b8a [core] Separate index cache and data cache (#4438)
85f563b8a is described below

commit 85f563b8aebe378ad17606ff19ee775625bba66c
Author: WenjunMin <[email protected]>
AuthorDate: Mon Nov 11 20:45:57 2024 +0800

    [core] Separate index cache and data cache (#4438)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../paimon/benchmark/metric/cpu/SysInfoLinux.java  |  7 ++-
 .../benchmark/cache/CacheManagerBenchmark.java     |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++++
 .../java/org/apache/paimon/io/cache/CacheKey.java  | 35 +++++++++++---
 .../org/apache/paimon/io/cache/CacheManager.java   | 54 +++++++++++++++++++---
 .../org/apache/paimon/lookup/sort/BlockCache.java  |  5 +-
 .../paimon/lookup/sort/SortLookupStoreReader.java  | 32 ++++++-------
 .../apache/paimon/utils/FileBasedBloomFilter.java  |  2 +-
 .../apache/paimon/io/cache/CacheManagerTest.java   |  2 +-
 .../io/cache/FileBasedRandomInputViewTest.java     |  5 +-
 .../lookup/sort/SortLookupStoreFactoryTest.java    |  3 +-
 .../paimon/utils/FileBasedBloomFilterTest.java     |  5 +-
 .../paimon/operation/MemoryFileStoreWrite.java     |  4 +-
 .../apache/paimon/table/query/LocalTableQuery.java |  4 +-
 15 files changed, 133 insertions(+), 44 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6556867ac..78392c3ef 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -429,6 +429,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Double</td>
             <td>Define the default false positive probability for lookup cache 
bloom filters.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.cache.high-priority-pool-ratio</h5></td>
+            <td style="word-wrap: break-word;">0.25</td>
+            <td>Double</td>
+            <td>The fraction of cache memory that is reserved for 
high-priority data like index, filter.</td>
+        </tr>
         <tr>
             <td><h5>lookup.hash-load-factor</h5></td>
             <td style="word-wrap: break-word;">0.75</td>
diff --git 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
index e4a5cfa57..041637c2d 100644
--- 
a/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
+++ 
b/paimon-benchmark/paimon-cluster-benchmark/src/main/java/org/apache/paimon/benchmark/metric/cpu/SysInfoLinux.java
@@ -18,7 +18,12 @@
 
 package org.apache.paimon.benchmark.metric.cpu;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
 import java.math.BigInteger;
 import java.nio.charset.Charset;
 import java.util.HashMap;
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
index 2c8562106..9a64322e0 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/cache/CacheManagerBenchmark.java
@@ -52,7 +52,7 @@ public class CacheManagerBenchmark {
         CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, 
"r"), 0, 0);
 
         for (Cache.CacheType cacheType : Cache.CacheType.values()) {
-            CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofBytes(10));
+            CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofBytes(10), 0.1);
             benchmark.addCase(
                     String.format("cache-%s", cacheType.toString()),
                     5,
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c69f0aae0..bde2f1cc3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -922,6 +922,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue(MemorySize.parse("256 mb"))
                     .withDescription("Max memory size for lookup cache.");
 
+    public static final ConfigOption<Double> LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO 
=
+            key("lookup.cache.high-priority-pool-ratio")
+                    .doubleType()
+                    .defaultValue(0.25)
+                    .withDescription(
+                            "The fraction of cache memory that is reserved for 
high-priority data like index, filter.");
+
     public static final ConfigOption<Boolean> 
LOOKUP_CACHE_BLOOM_FILTER_ENABLED =
             key("lookup.cache.bloom.filter.enabled")
                     .booleanType()
@@ -1837,6 +1844,10 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
     }
 
+    public double lookupCacheHighPrioPoolRatio() {
+        return options.get(LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO);
+    }
+
     public long targetFileSize(boolean hasPrimaryKey) {
         return options.getOptional(TARGET_FILE_SIZE)
                 .orElse(hasPrimaryKey ? VALUE_128_MB : VALUE_256_MB)
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
index b313018d3..11b8beb22 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -24,25 +24,31 @@ import java.util.Objects;
 /** Key for cache manager. */
 public interface CacheKey {
 
-    static CacheKey forPosition(RandomAccessFile file, long position, int 
length) {
-        return new PositionCacheKey(file, position, length);
+    static CacheKey forPosition(RandomAccessFile file, long position, int 
length, boolean isIndex) {
+        return new PositionCacheKey(file, position, length, isIndex);
     }
 
     static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int 
pageIndex) {
-        return new PageIndexCacheKey(file, pageSize, pageIndex);
+        return new PageIndexCacheKey(file, pageSize, pageIndex, false);
     }
 
+    /** @return Whether this cache key is for index cache. */
+    boolean isIndex();
+
     /** Key for file position and length. */
     class PositionCacheKey implements CacheKey {
 
         private final RandomAccessFile file;
         private final long position;
         private final int length;
+        private final boolean isIndex;
 
-        private PositionCacheKey(RandomAccessFile file, long position, int 
length) {
+        private PositionCacheKey(
+                RandomAccessFile file, long position, int length, boolean 
isIndex) {
             this.file = file;
             this.position = position;
             this.length = length;
+            this.isIndex = isIndex;
         }
 
         @Override
@@ -56,12 +62,18 @@ public interface CacheKey {
             PositionCacheKey that = (PositionCacheKey) o;
             return position == that.position
                     && length == that.length
+                    && isIndex == that.isIndex
                     && Objects.equals(file, that.file);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(file, position, length);
+            return Objects.hash(file, position, length, isIndex);
+        }
+
+        @Override
+        public boolean isIndex() {
+            return isIndex;
         }
     }
 
@@ -71,17 +83,25 @@ public interface CacheKey {
         private final RandomAccessFile file;
         private final int pageSize;
         private final int pageIndex;
+        private final boolean isIndex;
 
-        private PageIndexCacheKey(RandomAccessFile file, int pageSize, int 
pageIndex) {
+        private PageIndexCacheKey(
+                RandomAccessFile file, int pageSize, int pageIndex, boolean 
isIndex) {
             this.file = file;
             this.pageSize = pageSize;
             this.pageIndex = pageIndex;
+            this.isIndex = isIndex;
         }
 
         public int pageIndex() {
             return pageIndex;
         }
 
+        @Override
+        public boolean isIndex() {
+            return isIndex;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -93,12 +113,13 @@ public interface CacheKey {
             PageIndexCacheKey that = (PageIndexCacheKey) o;
             return pageSize == that.pageSize
                     && pageIndex == that.pageIndex
+                    && isIndex == that.isIndex
                     && Objects.equals(file, that.file);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(file, pageSize, pageIndex);
+            return Objects.hash(file, pageSize, pageIndex, isIndex);
         }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 9e160aa0d..677d87d49 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -21,6 +21,10 @@ package org.apache.paimon.io.cache;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -29,31 +33,63 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 /** Cache manager to cache bytes to paged {@link MemorySegment}s. */
 public class CacheManager {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CacheManager.class);
+
     /**
      * Refreshing the cache comes with some costs, so not every time we visit 
the CacheManager, but
      * every 10 visits, refresh the LRU strategy.
      */
     public static final int REFRESH_COUNT = 10;
 
-    private final Cache cache;
+    private final Cache dataCache;
+    private final Cache indexCache;
 
     private int fileReadCount;
 
+    @VisibleForTesting
     public CacheManager(MemorySize maxMemorySize) {
-        this(Cache.CacheType.GUAVA, maxMemorySize);
+        this(Cache.CacheType.GUAVA, maxMemorySize, 0);
+    }
+
+    public CacheManager(MemorySize dataMaxMemorySize, double 
highPriorityPoolRatio) {
+        this(Cache.CacheType.GUAVA, dataMaxMemorySize, highPriorityPoolRatio);
     }
 
-    public CacheManager(Cache.CacheType cacheType, MemorySize maxMemorySize) {
-        this.cache = 
CacheBuilder.newBuilder(cacheType).maximumWeight(maxMemorySize).build();
+    public CacheManager(
+            Cache.CacheType cacheType, MemorySize maxMemorySize, double 
highPriorityPoolRatio) {
+        Preconditions.checkArgument(
+                highPriorityPoolRatio >= 0 && highPriorityPoolRatio < 1,
+                "The high priority pool ratio should in the range [0, 1).");
+        MemorySize indexCacheSize =
+                MemorySize.ofBytes((long) (maxMemorySize.getBytes() * 
highPriorityPoolRatio));
+        MemorySize dataCacheSize =
+                MemorySize.ofBytes((long) (maxMemorySize.getBytes() * (1 - 
highPriorityPoolRatio)));
+        this.dataCache = 
CacheBuilder.newBuilder(cacheType).maximumWeight(dataCacheSize).build();
+        if (highPriorityPoolRatio == 0) {
+            this.indexCache = dataCache;
+        } else {
+            this.indexCache =
+                    
CacheBuilder.newBuilder(cacheType).maximumWeight(indexCacheSize).build();
+        }
         this.fileReadCount = 0;
+        LOG.info(
+                "Initialize cache manager with data cache of {} and index 
cache of {}.",
+                dataCacheSize,
+                indexCacheSize);
     }
 
     @VisibleForTesting
-    public Cache cache() {
-        return cache;
+    public Cache dataCache() {
+        return dataCache;
+    }
+
+    @VisibleForTesting
+    public Cache indexCache() {
+        return indexCache;
     }
 
     public MemorySegment getPage(CacheKey key, CacheReader reader, 
CacheCallback callback) {
+        Cache cache = key.isIndex() ? indexCache : dataCache;
         Cache.CacheValue value =
                 cache.get(
                         key,
@@ -70,7 +106,11 @@ public class CacheManager {
     }
 
     public void invalidPage(CacheKey key) {
-        cache.invalidate(key);
+        if (key.isIndex()) {
+            indexCache.invalidate(key);
+        } else {
+            dataCache.invalidate(key);
+        }
     }
 
     public int fileReadCount() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
index e18110904..0441a24f2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockCache.java
@@ -49,7 +49,6 @@ public class BlockCache implements Closeable {
         this.blocks = new HashMap<>();
     }
 
-    // TODO separate index and data cache
     private byte[] readFrom(long offset, int length) throws IOException {
         byte[] buffer = new byte[length];
         int read = channel.read(ByteBuffer.wrap(buffer), offset);
@@ -61,9 +60,9 @@ public class BlockCache implements Closeable {
     }
 
     public MemorySegment getBlock(
-            long position, int length, Function<byte[], byte[]> 
decompressFunc) {
+            long position, int length, Function<byte[], byte[]> 
decompressFunc, boolean isIndex) {
 
-        CacheKey cacheKey = CacheKey.forPosition(file, position, length);
+        CacheKey cacheKey = CacheKey.forPosition(file, position, length, 
isIndex);
 
         SegmentContainer container = blocks.get(cacheKey);
         if (container == null || container.getAccessCount() == 
CacheManager.REFRESH_COUNT) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index 727589bfd..39997888c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -38,11 +38,7 @@ import java.util.Comparator;
 import static org.apache.paimon.lookup.sort.SortLookupStoreUtils.crc32c;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/**
- * A {@link LookupStoreReader} for sort store.
- *
- * <p>TODO separate index cache and block cache.
- */
+/** A {@link LookupStoreReader} for sort store. */
 public class SortLookupStoreReader implements LookupStoreReader {
 
     private final Comparator<MemorySlice> comparator;
@@ -68,7 +64,7 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
         this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, 
null);
         this.blockCache = new BlockCache(fileInput.file(), cacheManager);
         Footer footer = readFooter();
-        this.indexBlockIterator = 
readBlock(footer.getIndexBlockHandle()).iterator();
+        this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), 
true).iterator();
         BloomFilterHandle handle = footer.getBloomFilterHandle();
         if (handle != null) {
             this.bloomFilter =
@@ -84,7 +80,7 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
     private Footer readFooter() throws IOException {
         MemorySegment footerData =
                 blockCache.getBlock(
-                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b);
+                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b, true);
         return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
     }
 
@@ -111,23 +107,26 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
     }
 
     private BlockIterator getNextBlock() throws IOException {
+        // index block handle, point to the key, value position.
         MemorySlice blockHandle = indexBlockIterator.next().getValue();
-        BlockReader dataBlock = openBlock(blockHandle);
+        BlockReader dataBlock =
+                readBlock(BlockHandle.readBlockHandle(blockHandle.toInput()), 
false);
         return dataBlock.iterator();
     }
 
-    private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
-        BlockHandle blockHandle = 
BlockHandle.readBlockHandle(blockEntry.toInput());
-        return readBlock(blockHandle);
-    }
-
-    private BlockReader readBlock(BlockHandle blockHandle) {
+    /**
+     * @param blockHandle The block handle.
+     * @param index Whether read the block as an index.
+     * @return The reader of the target block.
+     */
+    private BlockReader readBlock(BlockHandle blockHandle, boolean index) {
         // read block trailer
         MemorySegment trailerData =
                 blockCache.getBlock(
                         blockHandle.offset() + blockHandle.size(),
                         BlockTrailer.ENCODED_LENGTH,
-                        b -> b);
+                        b -> b,
+                        true);
         BlockTrailer blockTrailer =
                 
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
 
@@ -166,7 +165,8 @@ public class SortLookupStoreReader implements 
LookupStoreReader {
                                 checkArgument(uncompressedLength == 
uncompressed.length);
                                 return uncompressed;
                             }
-                        });
+                        },
+                        index);
         return new BlockReader(MemorySlice.wrap(unCompressedBlock), 
comparator);
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index 3d8751774..ede7a8e3c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -55,7 +55,7 @@ public class FileBasedBloomFilter implements Closeable {
         this.readOffset = readOffset;
         this.readLength = readLength;
         this.accessCount = 0;
-        this.cacheKey = CacheKey.forPosition(input.file(), readOffset, 
readLength);
+        this.cacheKey = CacheKey.forPosition(input.file(), readOffset, 
readLength, true);
     }
 
     public boolean testHash(int hash) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java
index 6f8b4e60e..cf8076ac8 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/CacheManagerTest.java
@@ -48,7 +48,7 @@ public class CacheManagerTest {
         CacheKey key2 = CacheKey.forPageIndex(new RandomAccessFile(file2, 
"r"), 0, 0);
 
         for (Cache.CacheType cacheType : Cache.CacheType.values()) {
-            CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofBytes(10));
+            CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofBytes(10), 0.1);
             byte[] value = new byte[6];
             Arrays.fill(value, (byte) 1);
             for (int i = 0; i < 10; i++) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
index 4c88ea343..6486aead8 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
@@ -82,7 +82,7 @@ public class FileBasedRandomInputViewTest {
         }
 
         File file = writeFile(bytes);
-        CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofKibiBytes(128));
+        CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofKibiBytes(128), 0);
         FileBasedRandomInputView view =
                 new FileBasedRandomInputView(
                         PageFileInput.create(file, 1024, null, 0, null), 
cacheManager);
@@ -117,7 +117,8 @@ public class FileBasedRandomInputViewTest {
 
         // hot key in LRU, should have good cache hit rate
         assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
-        assertThat(cacheManager.cache().asMap().size()).isEqualTo(0);
+        assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
+        assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
     }
 
     private File writeFile(byte[] bytes) throws IOException {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
index cbec6131d..7ba3f8283 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
@@ -110,7 +110,8 @@ public class SortLookupStoreFactoryTest {
         assertThat(reader.lookup(toBytes(VALUE_COUNT + 1000))).isNull();
 
         reader.close();
-        assertThat(cacheManager.cache().asMap()).isEmpty();
+        assertThat(cacheManager.dataCache().asMap()).isEmpty();
+        assertThat(cacheManager.indexCache().asMap()).isEmpty();
     }
 
     @TestTemplate
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
index 51babc288..d1471fd74 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
@@ -64,7 +64,7 @@ public class FileBasedBloomFilterTest {
         Arrays.stream(inputs).forEach(i -> 
builder.addHash(Integer.hashCode(i)));
         File file = writeFile(segment.getArray());
 
-        CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofMebiBytes(1));
+        CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofMebiBytes(1), 0.1);
         FileBasedBloomFilter filter =
                 new FileBasedBloomFilter(
                         PageFileInput.create(file, 1024, null, 0, null),
@@ -76,7 +76,8 @@ public class FileBasedBloomFilterTest {
         Arrays.stream(inputs)
                 .forEach(i -> 
Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());
         filter.close();
-        Assertions.assertThat(cacheManager.cache().asMap()).isEmpty();
+        Assertions.assertThat(cacheManager.dataCache().asMap()).isEmpty();
+        Assertions.assertThat(cacheManager.indexCache().asMap()).isEmpty();
         
Assertions.assertThat(filter.bloomFilter().getMemorySegment()).isNull();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index bbde3fd48..ff99f0651 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -78,7 +78,9 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
                 options.writeMaxWritersToSpill(),
                 options.legacyPartitionName());
         this.options = options;
-        this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
+        this.cacheManager =
+                new CacheManager(
+                        options.lookupCacheMaxMemory(), 
options.lookupCacheHighPrioPoolRatio());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index d5b392d9e..8ff5ce7a6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -101,7 +101,9 @@ public class LocalTableQuery implements TableQuery {
         this.lookupStoreFactory =
                 LookupStoreFactory.create(
                         options,
-                        new CacheManager(options.lookupCacheMaxMemory()),
+                        new CacheManager(
+                                options.lookupCacheMaxMemory(),
+                                options.lookupCacheHighPrioPoolRatio()),
                         new 
RowCompactedSerializer(keyType).createSliceComparator());
 
         if (options.needLookup()) {

Reply via email to