This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1369032b0a [core] Remove useless classes and clean memory in 
FileBasedBloomFilter
1369032b0a is described below

commit 1369032b0abf2c50818a7c7442f206e1f0eaab10
Author: JingsongLi <[email protected]>
AuthorDate: Sat Dec 6 20:28:46 2025 +0800

    [core] Remove useless classes and clean memory in FileBasedBloomFilter
---
 .../org/apache/paimon/fs/local/LocalFileIO.java    |   2 +
 .../apache/paimon/io/CompressedPageFileInput.java  | 115 ------------------
 .../apache/paimon/io/CompressedPageFileOutput.java | 109 -----------------
 .../java/org/apache/paimon/io/PageFileInput.java   |  62 ----------
 .../java/org/apache/paimon/io/PageFileOutput.java  |  42 -------
 .../paimon/io/UncompressedPageFileInput.java       |  79 ------------
 .../paimon/io/UncompressedPageFileOutput.java      |  44 -------
 .../java/org/apache/paimon/io/cache/CacheKey.java  |  12 +-
 .../org/apache/paimon/io/cache/CacheManager.java   |   4 -
 .../paimon/io/cache/FileBasedRandomInputView.java  | 119 -------------------
 .../paimon/lookup/sort/SortLookupStoreReader.java  |  18 +--
 .../paimon/lookup/sort/SortLookupStoreWriter.java  |  17 ++-
 .../java/org/apache/paimon/sst/BlockCache.java     |   1 -
 .../org/apache/paimon/sst/BloomFilterHandle.java   |   2 -
 .../java/org/apache/paimon/sst/SstFileReader.java  |  24 ++--
 .../java/org/apache/paimon/sst/SstFileWriter.java  |   3 +-
 .../apache/paimon/utils/FileBasedBloomFilter.java  |  21 ++--
 .../paimon/io/CompressedInputOutputTest.java       | 112 -----------------
 .../io/cache/FileBasedRandomInputViewTest.java     | 132 ---------------------
 19 files changed, 45 insertions(+), 873 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index c6b32b1ae0..143f9d8c24 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -54,6 +54,8 @@ public class LocalFileIO implements FileIO {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileIO.class);
 
+    public static final LocalFileIO INSTANCE = new LocalFileIO();
+
     private static final long serialVersionUID = 1L;
 
     // the lock to ensure atomic renaming
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
deleted file mode 100644
index 242e743110..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** A class to wrap compressed {@link RandomAccessFile}. */
-public class CompressedPageFileInput implements PageFileInput {
-
-    private final RandomAccessFile file;
-    private final int pageSize;
-    private final long uncompressedBytes;
-    private final long[] pagePositions;
-
-    private final BlockDecompressor decompressor;
-    private final byte[] uncompressedBuffer;
-    private final byte[] compressedBuffer;
-
-    private final int pageSizeBits;
-    private final int pageSizeMask;
-
-    public CompressedPageFileInput(
-            RandomAccessFile file,
-            int pageSize,
-            BlockCompressionFactory compressionFactory,
-            long uncompressedBytes,
-            long[] pagePositions) {
-        this.file = file;
-        this.pageSize = pageSize;
-        this.uncompressedBytes = uncompressedBytes;
-        this.pagePositions = pagePositions;
-
-        this.uncompressedBuffer = new byte[pageSize];
-        this.decompressor = compressionFactory.getDecompressor();
-        this.compressedBuffer =
-                new 
byte[compressionFactory.getCompressor().getMaxCompressedSize(pageSize)];
-
-        this.pageSizeBits = MathUtils.log2strict(pageSize);
-        this.pageSizeMask = pageSize - 1;
-    }
-
-    @Override
-    public RandomAccessFile file() {
-        return file;
-    }
-
-    @Override
-    public long uncompressBytes() {
-        return uncompressedBytes;
-    }
-
-    @Override
-    public int pageSize() {
-        return pageSize;
-    }
-
-    @Override
-    public byte[] readPage(int pageIndex) throws IOException {
-        long position = pagePositions[pageIndex];
-        file.seek(position);
-        int compressLength = file.readInt();
-        file.readFully(compressedBuffer, 0, compressLength);
-
-        int uncompressedLength =
-                decompressor.decompress(compressedBuffer, 0, compressLength, 
uncompressedBuffer, 0);
-
-        byte[] result = new byte[uncompressedLength];
-        System.arraycopy(uncompressedBuffer, 0, result, 0, uncompressedLength);
-        return result;
-    }
-
-    @Override
-    public byte[] readPosition(long position, int length) throws IOException {
-        int offset = (int) (position & this.pageSizeMask);
-        int pageIndex = (int) (position >>> this.pageSizeBits);
-
-        byte[] result = new byte[length];
-        int n = 0;
-        do {
-            byte[] page = readPage(pageIndex);
-            int currentLength = Math.min(page.length - offset, length - n);
-            System.arraycopy(page, offset, result, n, currentLength);
-            offset = 0;
-            n += currentLength;
-            pageIndex++;
-        } while (n < length);
-        return result;
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.file.close();
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
deleted file mode 100644
index 6e15715894..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockCompressor;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/** A class to output bytes with compression. */
-public class CompressedPageFileOutput implements PageFileOutput {
-
-    private final FileOutputStream out;
-    private final byte[] page;
-    private final BlockCompressor compressor;
-    private final byte[] compressedPage;
-    private final List<Long> pages;
-
-    private long uncompressBytes;
-    private long position;
-    private int count;
-
-    public CompressedPageFileOutput(
-            File file, int pageSize, BlockCompressionFactory 
compressionFactory)
-            throws FileNotFoundException {
-        this.out = new FileOutputStream(file);
-        this.page = new byte[pageSize];
-        this.compressor = compressionFactory.getCompressor();
-        this.compressedPage = new 
byte[compressor.getMaxCompressedSize(pageSize)];
-        this.pages = new ArrayList<>();
-
-        this.uncompressBytes = 0;
-        this.position = 0;
-        this.count = 0;
-    }
-
-    private void flushBuffer() throws IOException {
-        if (count > 0) {
-            pages.add(position);
-            int len = compressor.compress(page, 0, count, compressedPage, 0);
-            // write length
-            out.write((len >>> 24) & 0xFF);
-            out.write((len >>> 16) & 0xFF);
-            out.write((len >>> 8) & 0xFF);
-            out.write(len & 0xFF);
-            // write page
-            out.write(compressedPage, 0, len);
-            count = 0;
-            position += (len + 4);
-        }
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        uncompressBytes += len;
-
-        while (len > 0) {
-            if (count >= page.length) {
-                flushBuffer();
-            }
-            int toWrite = Math.min(len, page.length - count);
-            System.arraycopy(b, off, page, count, toWrite);
-            off += toWrite;
-            len -= toWrite;
-            count += toWrite;
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        try (OutputStream ignored = out) {
-            flushBuffer();
-        }
-    }
-
-    public long uncompressBytes() {
-        return uncompressBytes;
-    }
-
-    public long[] pages() {
-        long[] pages = new long[this.pages.size()];
-        for (int i = 0; i < pages.length; i++) {
-            pages[i] = this.pages.get(i);
-        }
-        return pages;
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
deleted file mode 100644
index 2e5d73ac37..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** An interface to read pages from file. */
-public interface PageFileInput extends Closeable {
-
-    RandomAccessFile file();
-
-    long uncompressBytes();
-
-    int pageSize();
-
-    byte[] readPage(int pageIndex) throws IOException;
-
-    byte[] readPosition(long position, int length) throws IOException;
-
-    static PageFileInput create(
-            File file,
-            int pageSize,
-            @Nullable BlockCompressionFactory compressionFactory,
-            long uncompressBytes,
-            @Nullable long[] compressPagePositions)
-            throws IOException {
-        RandomAccessFile accessFile = new RandomAccessFile(file, "r");
-        if (compressionFactory == null) {
-            return new UncompressedPageFileInput(accessFile, pageSize);
-        } else {
-            return new CompressedPageFileInput(
-                    accessFile,
-                    pageSize,
-                    compressionFactory,
-                    uncompressBytes,
-                    compressPagePositions);
-        }
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
deleted file mode 100644
index d732e21b02..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-
-/** An interface to write bytes with pages into file. */
-public interface PageFileOutput extends Closeable {
-
-    void write(byte[] bytes, int off, int len) throws IOException;
-
-    static PageFileOutput create(
-            File file, int pageSize, @Nullable BlockCompressionFactory 
compressionFactory)
-            throws IOException {
-        if (compressionFactory == null) {
-            return new UncompressedPageFileOutput(file);
-        }
-        return new CompressedPageFileOutput(file, pageSize, 
compressionFactory);
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
deleted file mode 100644
index dd79223937..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** A class to wrap uncompressed {@link RandomAccessFile}. */
-public class UncompressedPageFileInput implements PageFileInput {
-
-    private final RandomAccessFile file;
-    private final long fileLength;
-    private final int pageSize;
-    private final int pageSizeBits;
-
-    public UncompressedPageFileInput(RandomAccessFile file, int pageSize) 
throws IOException {
-        this.file = file;
-        this.fileLength = file.length();
-        this.pageSize = pageSize;
-        this.pageSizeBits = MathUtils.log2strict(pageSize);
-    }
-
-    @Override
-    public RandomAccessFile file() {
-        return file;
-    }
-
-    @Override
-    public long uncompressBytes() {
-        return fileLength;
-    }
-
-    @Override
-    public int pageSize() {
-        return pageSize;
-    }
-
-    @Override
-    public byte[] readPage(int pageIndex) throws IOException {
-        long position = (long) pageIndex << pageSizeBits;
-        file.seek(position);
-
-        int length = (int) Math.min(pageSize, fileLength - position);
-        byte[] result = new byte[length];
-        file.readFully(result);
-        return result;
-    }
-
-    @Override
-    public byte[] readPosition(long position, int length) throws IOException {
-        file.seek(position);
-        byte[] result = new byte[length];
-        file.readFully(result);
-        return result;
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.file.close();
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
deleted file mode 100644
index 799395f0bd..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-/** A class to wrap uncompressed {@link FileOutputStream}. */
-public class UncompressedPageFileOutput implements PageFileOutput {
-
-    private final FileOutputStream out;
-
-    public UncompressedPageFileOutput(File file) throws FileNotFoundException {
-        this.out = new FileOutputStream(file);
-    }
-
-    @Override
-    public void write(byte[] bytes, int off, int len) throws IOException {
-        this.out.write(bytes, off, len);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.out.close();
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
index 74bca4c593..a442854e4f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -26,11 +26,11 @@ import java.util.Objects;
 /** Key for cache manager. */
 public interface CacheKey {
 
-    static CacheKey forPosition(Path filePath, long position, int length, 
boolean isIndex) {
+    static PositionCacheKey forPosition(Path filePath, long position, int 
length, boolean isIndex) {
         return new PositionCacheKey(filePath, position, length, isIndex);
     }
 
-    static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int 
pageIndex) {
+    static PageIndexCacheKey forPageIndex(RandomAccessFile file, int pageSize, 
int pageIndex) {
         return new PageIndexCacheKey(file, pageSize, pageIndex, false);
     }
 
@@ -52,6 +52,14 @@ public interface CacheKey {
             this.isIndex = isIndex;
         }
 
+        public long position() {
+            return position;
+        }
+
+        public int length() {
+            return length;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 677d87d499..486b59e6c7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -113,10 +113,6 @@ public class CacheManager {
         }
     }
 
-    public int fileReadCount() {
-        return fileReadCount;
-    }
-
     /** The container for the segment. */
     public static class SegmentContainer {
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
deleted file mode 100644
index b27cab0b4c..0000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io.cache;
-
-import org.apache.paimon.data.AbstractPagedInputView;
-import org.apache.paimon.io.PageFileInput;
-import org.apache.paimon.io.SeekableDataInputView;
-import org.apache.paimon.io.cache.CacheKey.PageIndexCacheKey;
-import org.apache.paimon.io.cache.CacheManager.SegmentContainer;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
-
-/**
- * A {@link SeekableDataInputView} to read bytes from {@link 
RandomAccessFile}, the bytes can be
- * cached to {@link MemorySegment}s in {@link CacheManager}.
- */
-public class FileBasedRandomInputView extends AbstractPagedInputView
-        implements SeekableDataInputView, Closeable {
-
-    private final PageFileInput input;
-    private final CacheManager cacheManager;
-    private final Map<Integer, SegmentContainer> segments;
-    private final int segmentSizeBits;
-    private final int segmentSizeMask;
-
-    private int currentSegmentIndex;
-
-    public FileBasedRandomInputView(PageFileInput input, CacheManager 
cacheManager) {
-        this.input = input;
-        this.cacheManager = cacheManager;
-        this.segments = new HashMap<>();
-        int segmentSize = input.pageSize();
-        this.segmentSizeBits = MathUtils.log2strict(segmentSize);
-        this.segmentSizeMask = segmentSize - 1;
-
-        this.currentSegmentIndex = -1;
-    }
-
-    @Override
-    public void setReadPosition(long position) {
-        int offset = (int) (position & this.segmentSizeMask);
-        this.currentSegmentIndex = (int) (position >>> this.segmentSizeBits);
-        MemorySegment segment = getCurrentPage();
-        seekInput(segment, offset, getLimitForSegment(segment));
-    }
-
-    private MemorySegment getCurrentPage() {
-        SegmentContainer container = segments.get(currentSegmentIndex);
-        if (container == null || container.getAccessCount() == REFRESH_COUNT) {
-            int pageIndex = currentSegmentIndex;
-            MemorySegment segment =
-                    cacheManager.getPage(
-                            CacheKey.forPageIndex(input.file(), 
input.pageSize(), pageIndex),
-                            key -> input.readPage(pageIndex),
-                            this::invalidPage);
-            container = new SegmentContainer(segment);
-            segments.put(currentSegmentIndex, container);
-        }
-        return container.access();
-    }
-
-    @Override
-    protected MemorySegment nextSegment(MemorySegment current) throws 
EOFException {
-        currentSegmentIndex++;
-        if ((long) currentSegmentIndex << segmentSizeBits >= 
input.uncompressBytes()) {
-            throw new EOFException();
-        }
-
-        return getCurrentPage();
-    }
-
-    @Override
-    protected int getLimitForSegment(MemorySegment segment) {
-        return segment.size();
-    }
-
-    private void invalidPage(CacheKey key) {
-        segments.remove(((PageIndexCacheKey) key).pageIndex());
-    }
-
-    @Override
-    public void close() throws IOException {
-        // copy out to avoid ConcurrentModificationException
-        List<Integer> pages = new ArrayList<>(segments.keySet());
-        pages.forEach(
-                page ->
-                        cacheManager.invalidPage(
-                                CacheKey.forPageIndex(input.file(), 
input.pageSize(), page)));
-
-        input.close();
-    }
-}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index 684273ae15..ef862e6a3a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.lookup.sort;
 
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -36,31 +35,26 @@ import java.util.Comparator;
 /** A {@link LookupStoreReader} backed by an {@link SstFileReader}. */
 public class SortLookupStoreReader implements LookupStoreReader {
 
-    private final FileIO fileIO;
     private final SeekableInputStream input;
-    private final SstFileReader sstFileReader;
+    private final SstFileReader reader;
 
     public SortLookupStoreReader(
             Comparator<MemorySlice> comparator, File file, CacheManager 
cacheManager)
             throws IOException {
-        final Path filePath = new Path(file.getAbsolutePath());
-        this.fileIO = LocalFileIO.create();
-        this.input = fileIO.newInputStream(filePath);
-        this.sstFileReader =
-                new SstFileReader(comparator, file.length(), filePath, input, 
cacheManager);
+        Path filePath = new Path(file.getAbsolutePath());
+        this.input = LocalFileIO.INSTANCE.newInputStream(filePath);
+        this.reader = new SstFileReader(comparator, file.length(), filePath, 
input, cacheManager);
     }
 
     @Nullable
     @Override
     public byte[] lookup(byte[] key) throws IOException {
-        return sstFileReader.lookup(key);
+        return reader.lookup(key);
     }
 
     @Override
     public void close() throws IOException {
-        // be careful about the close order
-        sstFileReader.close();
+        reader.close();
         input.close();
-        fileIO.close();
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index 366e4560ac..e8ed55b96b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.lookup.sort;
 
 import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -34,8 +33,8 @@ import java.io.IOException;
 
 /** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
 public class SortLookupStoreWriter implements LookupStoreWriter {
-    private final SstFileWriter sstFileWriter;
-    private final FileIO fileIO;
+
+    private final SstFileWriter writer;
     private final PositionOutputStream out;
 
     public SortLookupStoreWriter(
@@ -44,21 +43,19 @@ public class SortLookupStoreWriter implements 
LookupStoreWriter {
             @Nullable BloomFilter.Builder bloomFilter,
             BlockCompressionFactory compressionFactory)
             throws IOException {
-        final Path filePath = new Path(file.getAbsolutePath());
-        this.fileIO = LocalFileIO.create();
-        this.out = fileIO.newOutputStream(filePath, true);
-        this.sstFileWriter = new SstFileWriter(out, blockSize, bloomFilter, 
compressionFactory);
+        Path filePath = new Path(file.getAbsolutePath());
+        this.out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+        this.writer = new SstFileWriter(out, blockSize, bloomFilter, 
compressionFactory);
     }
 
     @Override
     public void put(byte[] key, byte[] value) throws IOException {
-        sstFileWriter.put(key, value);
+        writer.put(key, value);
     }
 
     @Override
     public void close() throws IOException {
-        sstFileWriter.close();
+        writer.close();
         out.close();
-        fileIO.close();
     }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
index fbeeba2e77..d391c8de69 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
@@ -58,7 +58,6 @@ public class BlockCache implements Closeable {
 
     public MemorySegment getBlock(
             long position, int length, Function<byte[], byte[]> 
decompressFunc, boolean isIndex) {
-
         CacheKey cacheKey = CacheKey.forPosition(filePath, position, length, 
isIndex);
 
         SegmentContainer container = blocks.get(cacheKey);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index 7f3804dd8d..dbcb962481 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -23,8 +23,6 @@ import java.util.Objects;
 /** Handle for bloom filter. */
 public class BloomFilterHandle {
 
-    public static final int MAX_ENCODED_LENGTH = 9 + 5 + 9;
-
     private final long offset;
     private final int size;
     private final long expectedEntries;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index cf10e13a05..9184a5d9dd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -47,11 +47,9 @@ public class SstFileReader implements Closeable {
 
     private final Comparator<MemorySlice> comparator;
     private final Path filePath;
-    private final long fileSize;
-
-    private final BlockIterator indexBlockIterator;
-    @Nullable private FileBasedBloomFilter bloomFilter;
     private final BlockCache blockCache;
+    private final BlockIterator indexBlockIterator;
+    @Nullable private final FileBasedBloomFilter bloomFilter;
 
     public SstFileReader(
             Comparator<MemorySlice> comparator,
@@ -62,13 +60,16 @@ public class SstFileReader implements Closeable {
             throws IOException {
         this.comparator = comparator;
         this.filePath = filePath;
-        this.fileSize = fileSize;
-
         this.blockCache = new BlockCache(filePath, input, cacheManager);
-        Footer footer = readFooter();
+        MemorySegment footerData =
+                blockCache.getBlock(
+                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b, true);
+        Footer footer = 
Footer.readFooter(MemorySlice.wrap(footerData).toInput());
         this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(), 
true).iterator();
         BloomFilterHandle handle = footer.getBloomFilterHandle();
-        if (handle != null) {
+        if (handle == null) {
+            this.bloomFilter = null;
+        } else {
             this.bloomFilter =
                     new FileBasedBloomFilter(
                             input,
@@ -80,13 +81,6 @@ public class SstFileReader implements Closeable {
         }
     }
 
-    private Footer readFooter() throws IOException {
-        MemorySegment footerData =
-                blockCache.getBlock(
-                        fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH, b -> b, true);
-        return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
-    }
-
     /**
      * Lookup the specified key in the file.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 8eb234b564..603899378b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -87,8 +87,7 @@ public class SstFileWriter implements Closeable {
             PositionOutputStream out,
             int blockSize,
             @Nullable BloomFilter.Builder bloomFilter,
-            @Nullable BlockCompressionFactory compressionFactory)
-            throws IOException {
+            @Nullable BlockCompressionFactory compressionFactory) {
         this.out = out;
         this.blockSize = blockSize;
         this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index 693c3d2bc7..f05bc6bcca 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.cache.CacheCallback;
 import org.apache.paimon.io.cache.CacheKey;
+import org.apache.paimon.io.cache.CacheKey.PositionCacheKey;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
 
@@ -38,10 +39,8 @@ public class FileBasedBloomFilter implements Closeable {
     private final SeekableInputStream input;
     private final CacheManager cacheManager;
     private final BloomFilter filter;
-    private final long readOffset;
-    private final CacheKey cacheKey;
-    // each bloom filter is only used by a single file reader, so we can 
safely reuse here
-    private final byte[] reusedPageBuffer;
+    private final PositionCacheKey cacheKey;
+
     private int accessCount;
 
     public FileBasedBloomFilter(
@@ -55,10 +54,8 @@ public class FileBasedBloomFilter implements Closeable {
         this.cacheManager = cacheManager;
         checkArgument(expectedEntries >= 0);
         this.filter = new BloomFilter(expectedEntries, readLength);
-        this.readOffset = readOffset;
         this.accessCount = 0;
         this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength, 
true);
-        this.reusedPageBuffer = new byte[readLength];
     }
 
     public boolean testHash(int hash) {
@@ -68,17 +65,19 @@ public class FileBasedBloomFilter implements Closeable {
         if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) 
{
             MemorySegment segment =
                     cacheManager.getPage(
-                            cacheKey, key -> readPage(), new 
BloomFilterCallBack(filter));
+                            cacheKey, this::readBytes, new 
BloomFilterCallBack(filter));
             filter.setMemorySegment(segment, 0);
             accessCount = 0;
         }
         return filter.testHash(hash);
     }
 
-    private byte[] readPage() throws IOException {
-        input.seek(readOffset);
-        IOUtils.readFully(input, reusedPageBuffer);
-        return reusedPageBuffer;
+    private byte[] readBytes(CacheKey k) throws IOException {
+        PositionCacheKey key = (PositionCacheKey) k;
+        input.seek(key.position());
+        byte[] bytes = new byte[key.length()];
+        IOUtils.readFully(input, bytes);
+        return bytes;
     }
 
     @VisibleForTesting
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
deleted file mode 100644
index 42666a8781..0000000000
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.Lz4BlockCompressionFactory;
-import org.apache.paimon.utils.MathUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link CompressedPageFileOutput} and {@link 
CompressedPageFileInput}. */
-public class CompressedInputOutputTest {
-
-    @TempDir Path tempDir;
-
-    @Test
-    public void testRandom() throws IOException {
-        for (int i = 0; i < 100; i++) {
-            innerTestRandom();
-        }
-    }
-
-    @SuppressWarnings("ResultOfMethodCallIgnored")
-    private void innerTestRandom() throws IOException {
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
-        byte[] bytes = new byte[rnd.nextInt(1_000) + 1];
-        rnd.nextBytes(bytes);
-        int pageSize = MathUtils.roundDownToPowerOf2(rnd.nextInt(2_000) + 2);
-
-        // prepare compressed file
-        File compressed = new File(tempDir.toFile(), "compressed");
-        compressed.delete();
-        Lz4BlockCompressionFactory compressionFactory = new 
Lz4BlockCompressionFactory();
-        CompressedPageFileOutput output1 =
-                new CompressedPageFileOutput(compressed, pageSize, 
compressionFactory);
-        long uncompressBytes;
-        long[] pagePositions;
-        try {
-            output1.write(bytes, 0, bytes.length);
-        } finally {
-            output1.close();
-            uncompressBytes = output1.uncompressBytes();
-            pagePositions = output1.pages();
-        }
-        CompressedPageFileInput input1 =
-                new CompressedPageFileInput(
-                        new RandomAccessFile(compressed, "r"),
-                        pageSize,
-                        compressionFactory,
-                        uncompressBytes,
-                        pagePositions);
-
-        // prepare uncompressed file
-        File uncompressed = new File(tempDir.toFile(), "uncompressed");
-        uncompressed.delete();
-        try (UncompressedPageFileOutput output2 = new 
UncompressedPageFileOutput(uncompressed)) {
-            output2.write(bytes, 0, bytes.length);
-        }
-        UncompressedPageFileInput input2 =
-                new UncompressedPageFileInput(new 
RandomAccessFile(uncompressed, "r"), pageSize);
-
-        // test uncompressBytes
-        
assertThat(input1.uncompressBytes()).isEqualTo(input2.uncompressBytes());
-
-        // test readPage
-        for (int i = 0; i < pagePositions.length; i++) {
-            assertThat(input1.readPage(i)).isEqualTo(input2.readPage(i));
-        }
-
-        // test readPosition
-        for (int i = 0; i < 10; i++) {
-            long position;
-            int length;
-            if (uncompressBytes == 1) {
-                position = 0;
-                length = 1;
-            } else {
-                position = rnd.nextLong(uncompressBytes - 1);
-                length = rnd.nextInt((int) (uncompressBytes - position)) + 1;
-            }
-            assertThat(input1.readPosition(position, length))
-                    .isEqualTo(input2.readPosition(position, length));
-        }
-
-        input1.close();
-        input2.close();
-    }
-}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
deleted file mode 100644
index 6486aead8c..0000000000
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io.cache;
-
-import org.apache.paimon.io.PageFileInput;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.options.MemorySize;
-import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
-import org.apache.paimon.testutils.junit.parameterized.Parameters;
-
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-/** Test for {@link FileBasedRandomInputView}. */
-@ExtendWith(ParameterizedTestExtension.class)
-public class FileBasedRandomInputViewTest {
-
-    @TempDir Path tempDir;
-
-    private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
-    private final Cache.CacheType cacheType;
-
-    public FileBasedRandomInputViewTest(Cache.CacheType cacheType) {
-        this.cacheType = cacheType;
-    }
-
-    @Parameters(name = "{0}")
-    public static List<Cache.CacheType> getVarSeg() {
-        return Arrays.asList(Cache.CacheType.CAFFEINE, Cache.CacheType.GUAVA);
-    }
-
-    @TestTemplate
-    public void testMatched() throws IOException {
-        innerTest(1024 * 512, 5000);
-    }
-
-    @TestTemplate
-    public void testNotMatched() throws IOException {
-        innerTest(131092, 1000);
-    }
-
-    @TestTemplate
-    public void testRandom() throws IOException {
-        innerTest(rnd.nextInt(5000, 100000), 100);
-    }
-
-    private void innerTest(int len, int maxFileReadCount) throws IOException {
-        byte[] bytes = new byte[len];
-        MemorySegment segment = MemorySegment.wrap(bytes);
-        for (int i = 0; i < bytes.length; i++) {
-            bytes[i] = (byte) rnd.nextInt();
-        }
-
-        File file = writeFile(bytes);
-        CacheManager cacheManager = new CacheManager(cacheType, 
MemorySize.ofKibiBytes(128), 0);
-        FileBasedRandomInputView view =
-                new FileBasedRandomInputView(
-                        PageFileInput.create(file, 1024, null, 0, null), 
cacheManager);
-
-        // read first one
-        // this assertThatCode check the ConcurrentModificationException is 
not threw.
-        assertThatCode(() -> 
view.setReadPosition(0)).doesNotThrowAnyException();
-        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(0));
-
-        // read mid
-        int mid = bytes.length / 2;
-        assertThatCode(() -> 
view.setReadPosition(mid)).doesNotThrowAnyException();
-        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(mid));
-
-        // read special
-        assertThatCode(() -> 
view.setReadPosition(1021)).doesNotThrowAnyException();
-        assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(1021));
-
-        // read last one
-        assertThatCode(() -> view.setReadPosition(bytes.length - 
1)).doesNotThrowAnyException();
-        assertThat(view.readByte()).isEqualTo(bytes[bytes.length - 1]);
-
-        // random read
-        for (int i = 0; i < 10000; i++) {
-            // hot key -> 10
-            int position = rnd.nextBoolean() ? 10 : rnd.nextInt(bytes.length - 
8);
-            assertThatCode(() -> 
view.setReadPosition(position)).doesNotThrowAnyException();
-            
assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position));
-        }
-
-        view.close();
-
-        // hot key in LRU, should have good cache hit rate
-        assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
-        assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
-        assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
-    }
-
-    private File writeFile(byte[] bytes) throws IOException {
-        File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
-        if (!file.createNewFile()) {
-            throw new IOException("Can not create: " + file);
-        }
-        Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
-        return file;
-    }
-}


Reply via email to