This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1369032b0a [core] Remove useless classes and clean memory in
FileBasedBloomFilter
1369032b0a is described below
commit 1369032b0abf2c50818a7c7442f206e1f0eaab10
Author: JingsongLi <[email protected]>
AuthorDate: Sat Dec 6 20:28:46 2025 +0800
[core] Remove useless classes and clean memory in FileBasedBloomFilter
---
.../org/apache/paimon/fs/local/LocalFileIO.java | 2 +
.../apache/paimon/io/CompressedPageFileInput.java | 115 ------------------
.../apache/paimon/io/CompressedPageFileOutput.java | 109 -----------------
.../java/org/apache/paimon/io/PageFileInput.java | 62 ----------
.../java/org/apache/paimon/io/PageFileOutput.java | 42 -------
.../paimon/io/UncompressedPageFileInput.java | 79 ------------
.../paimon/io/UncompressedPageFileOutput.java | 44 -------
.../java/org/apache/paimon/io/cache/CacheKey.java | 12 +-
.../org/apache/paimon/io/cache/CacheManager.java | 4 -
.../paimon/io/cache/FileBasedRandomInputView.java | 119 -------------------
.../paimon/lookup/sort/SortLookupStoreReader.java | 18 +--
.../paimon/lookup/sort/SortLookupStoreWriter.java | 17 ++-
.../java/org/apache/paimon/sst/BlockCache.java | 1 -
.../org/apache/paimon/sst/BloomFilterHandle.java | 2 -
.../java/org/apache/paimon/sst/SstFileReader.java | 24 ++--
.../java/org/apache/paimon/sst/SstFileWriter.java | 3 +-
.../apache/paimon/utils/FileBasedBloomFilter.java | 21 ++--
.../paimon/io/CompressedInputOutputTest.java | 112 -----------------
.../io/cache/FileBasedRandomInputViewTest.java | 132 ---------------------
19 files changed, 45 insertions(+), 873 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index c6b32b1ae0..143f9d8c24 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -54,6 +54,8 @@ public class LocalFileIO implements FileIO {
private static final Logger LOG =
LoggerFactory.getLogger(LocalFileIO.class);
+ public static final LocalFileIO INSTANCE = new LocalFileIO();
+
private static final long serialVersionUID = 1L;
// the lock to ensure atomic renaming
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
deleted file mode 100644
index 242e743110..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockDecompressor;
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** A class to wrap compressed {@link RandomAccessFile}. */
-public class CompressedPageFileInput implements PageFileInput {
-
- private final RandomAccessFile file;
- private final int pageSize;
- private final long uncompressedBytes;
- private final long[] pagePositions;
-
- private final BlockDecompressor decompressor;
- private final byte[] uncompressedBuffer;
- private final byte[] compressedBuffer;
-
- private final int pageSizeBits;
- private final int pageSizeMask;
-
- public CompressedPageFileInput(
- RandomAccessFile file,
- int pageSize,
- BlockCompressionFactory compressionFactory,
- long uncompressedBytes,
- long[] pagePositions) {
- this.file = file;
- this.pageSize = pageSize;
- this.uncompressedBytes = uncompressedBytes;
- this.pagePositions = pagePositions;
-
- this.uncompressedBuffer = new byte[pageSize];
- this.decompressor = compressionFactory.getDecompressor();
- this.compressedBuffer =
- new
byte[compressionFactory.getCompressor().getMaxCompressedSize(pageSize)];
-
- this.pageSizeBits = MathUtils.log2strict(pageSize);
- this.pageSizeMask = pageSize - 1;
- }
-
- @Override
- public RandomAccessFile file() {
- return file;
- }
-
- @Override
- public long uncompressBytes() {
- return uncompressedBytes;
- }
-
- @Override
- public int pageSize() {
- return pageSize;
- }
-
- @Override
- public byte[] readPage(int pageIndex) throws IOException {
- long position = pagePositions[pageIndex];
- file.seek(position);
- int compressLength = file.readInt();
- file.readFully(compressedBuffer, 0, compressLength);
-
- int uncompressedLength =
- decompressor.decompress(compressedBuffer, 0, compressLength,
uncompressedBuffer, 0);
-
- byte[] result = new byte[uncompressedLength];
- System.arraycopy(uncompressedBuffer, 0, result, 0, uncompressedLength);
- return result;
- }
-
- @Override
- public byte[] readPosition(long position, int length) throws IOException {
- int offset = (int) (position & this.pageSizeMask);
- int pageIndex = (int) (position >>> this.pageSizeBits);
-
- byte[] result = new byte[length];
- int n = 0;
- do {
- byte[] page = readPage(pageIndex);
- int currentLength = Math.min(page.length - offset, length - n);
- System.arraycopy(page, offset, result, n, currentLength);
- offset = 0;
- n += currentLength;
- pageIndex++;
- } while (n < length);
- return result;
- }
-
- @Override
- public void close() throws IOException {
- this.file.close();
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
deleted file mode 100644
index 6e15715894..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.compression.BlockCompressor;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/** A class to output bytes with compression. */
-public class CompressedPageFileOutput implements PageFileOutput {
-
- private final FileOutputStream out;
- private final byte[] page;
- private final BlockCompressor compressor;
- private final byte[] compressedPage;
- private final List<Long> pages;
-
- private long uncompressBytes;
- private long position;
- private int count;
-
- public CompressedPageFileOutput(
- File file, int pageSize, BlockCompressionFactory
compressionFactory)
- throws FileNotFoundException {
- this.out = new FileOutputStream(file);
- this.page = new byte[pageSize];
- this.compressor = compressionFactory.getCompressor();
- this.compressedPage = new
byte[compressor.getMaxCompressedSize(pageSize)];
- this.pages = new ArrayList<>();
-
- this.uncompressBytes = 0;
- this.position = 0;
- this.count = 0;
- }
-
- private void flushBuffer() throws IOException {
- if (count > 0) {
- pages.add(position);
- int len = compressor.compress(page, 0, count, compressedPage, 0);
- // write length
- out.write((len >>> 24) & 0xFF);
- out.write((len >>> 16) & 0xFF);
- out.write((len >>> 8) & 0xFF);
- out.write(len & 0xFF);
- // write page
- out.write(compressedPage, 0, len);
- count = 0;
- position += (len + 4);
- }
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- uncompressBytes += len;
-
- while (len > 0) {
- if (count >= page.length) {
- flushBuffer();
- }
- int toWrite = Math.min(len, page.length - count);
- System.arraycopy(b, off, page, count, toWrite);
- off += toWrite;
- len -= toWrite;
- count += toWrite;
- }
- }
-
- @Override
- public void close() throws IOException {
- try (OutputStream ignored = out) {
- flushBuffer();
- }
- }
-
- public long uncompressBytes() {
- return uncompressBytes;
- }
-
- public long[] pages() {
- long[] pages = new long[this.pages.size()];
- for (int i = 0; i < pages.length; i++) {
- pages[i] = this.pages.get(i);
- }
- return pages;
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
deleted file mode 100644
index 2e5d73ac37..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** An interface to read pages from file. */
-public interface PageFileInput extends Closeable {
-
- RandomAccessFile file();
-
- long uncompressBytes();
-
- int pageSize();
-
- byte[] readPage(int pageIndex) throws IOException;
-
- byte[] readPosition(long position, int length) throws IOException;
-
- static PageFileInput create(
- File file,
- int pageSize,
- @Nullable BlockCompressionFactory compressionFactory,
- long uncompressBytes,
- @Nullable long[] compressPagePositions)
- throws IOException {
- RandomAccessFile accessFile = new RandomAccessFile(file, "r");
- if (compressionFactory == null) {
- return new UncompressedPageFileInput(accessFile, pageSize);
- } else {
- return new CompressedPageFileInput(
- accessFile,
- pageSize,
- compressionFactory,
- uncompressBytes,
- compressPagePositions);
- }
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
deleted file mode 100644
index d732e21b02..0000000000
--- a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.BlockCompressionFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-
-/** An interface to write bytes with pages into file. */
-public interface PageFileOutput extends Closeable {
-
- void write(byte[] bytes, int off, int len) throws IOException;
-
- static PageFileOutput create(
- File file, int pageSize, @Nullable BlockCompressionFactory
compressionFactory)
- throws IOException {
- if (compressionFactory == null) {
- return new UncompressedPageFileOutput(file);
- }
- return new CompressedPageFileOutput(file, pageSize,
compressionFactory);
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
deleted file mode 100644
index dd79223937..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-/** A class to wrap uncompressed {@link RandomAccessFile}. */
-public class UncompressedPageFileInput implements PageFileInput {
-
- private final RandomAccessFile file;
- private final long fileLength;
- private final int pageSize;
- private final int pageSizeBits;
-
- public UncompressedPageFileInput(RandomAccessFile file, int pageSize)
throws IOException {
- this.file = file;
- this.fileLength = file.length();
- this.pageSize = pageSize;
- this.pageSizeBits = MathUtils.log2strict(pageSize);
- }
-
- @Override
- public RandomAccessFile file() {
- return file;
- }
-
- @Override
- public long uncompressBytes() {
- return fileLength;
- }
-
- @Override
- public int pageSize() {
- return pageSize;
- }
-
- @Override
- public byte[] readPage(int pageIndex) throws IOException {
- long position = (long) pageIndex << pageSizeBits;
- file.seek(position);
-
- int length = (int) Math.min(pageSize, fileLength - position);
- byte[] result = new byte[length];
- file.readFully(result);
- return result;
- }
-
- @Override
- public byte[] readPosition(long position, int length) throws IOException {
- file.seek(position);
- byte[] result = new byte[length];
- file.readFully(result);
- return result;
- }
-
- @Override
- public void close() throws IOException {
- this.file.close();
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
deleted file mode 100644
index 799395f0bd..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-/** A class to wrap uncompressed {@link FileOutputStream}. */
-public class UncompressedPageFileOutput implements PageFileOutput {
-
- private final FileOutputStream out;
-
- public UncompressedPageFileOutput(File file) throws FileNotFoundException {
- this.out = new FileOutputStream(file);
- }
-
- @Override
- public void write(byte[] bytes, int off, int len) throws IOException {
- this.out.write(bytes, off, len);
- }
-
- @Override
- public void close() throws IOException {
- this.out.close();
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
index 74bca4c593..a442854e4f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -26,11 +26,11 @@ import java.util.Objects;
/** Key for cache manager. */
public interface CacheKey {
- static CacheKey forPosition(Path filePath, long position, int length,
boolean isIndex) {
+ static PositionCacheKey forPosition(Path filePath, long position, int
length, boolean isIndex) {
return new PositionCacheKey(filePath, position, length, isIndex);
}
- static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int
pageIndex) {
+ static PageIndexCacheKey forPageIndex(RandomAccessFile file, int pageSize,
int pageIndex) {
return new PageIndexCacheKey(file, pageSize, pageIndex, false);
}
@@ -52,6 +52,14 @@ public interface CacheKey {
this.isIndex = isIndex;
}
+ public long position() {
+ return position;
+ }
+
+ public int length() {
+ return length;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 677d87d499..486b59e6c7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -113,10 +113,6 @@ public class CacheManager {
}
}
- public int fileReadCount() {
- return fileReadCount;
- }
-
/** The container for the segment. */
public static class SegmentContainer {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
deleted file mode 100644
index b27cab0b4c..0000000000
---
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io.cache;
-
-import org.apache.paimon.data.AbstractPagedInputView;
-import org.apache.paimon.io.PageFileInput;
-import org.apache.paimon.io.SeekableDataInputView;
-import org.apache.paimon.io.cache.CacheKey.PageIndexCacheKey;
-import org.apache.paimon.io.cache.CacheManager.SegmentContainer;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.utils.MathUtils;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
-
-/**
- * A {@link SeekableDataInputView} to read bytes from {@link
RandomAccessFile}, the bytes can be
- * cached to {@link MemorySegment}s in {@link CacheManager}.
- */
-public class FileBasedRandomInputView extends AbstractPagedInputView
- implements SeekableDataInputView, Closeable {
-
- private final PageFileInput input;
- private final CacheManager cacheManager;
- private final Map<Integer, SegmentContainer> segments;
- private final int segmentSizeBits;
- private final int segmentSizeMask;
-
- private int currentSegmentIndex;
-
- public FileBasedRandomInputView(PageFileInput input, CacheManager
cacheManager) {
- this.input = input;
- this.cacheManager = cacheManager;
- this.segments = new HashMap<>();
- int segmentSize = input.pageSize();
- this.segmentSizeBits = MathUtils.log2strict(segmentSize);
- this.segmentSizeMask = segmentSize - 1;
-
- this.currentSegmentIndex = -1;
- }
-
- @Override
- public void setReadPosition(long position) {
- int offset = (int) (position & this.segmentSizeMask);
- this.currentSegmentIndex = (int) (position >>> this.segmentSizeBits);
- MemorySegment segment = getCurrentPage();
- seekInput(segment, offset, getLimitForSegment(segment));
- }
-
- private MemorySegment getCurrentPage() {
- SegmentContainer container = segments.get(currentSegmentIndex);
- if (container == null || container.getAccessCount() == REFRESH_COUNT) {
- int pageIndex = currentSegmentIndex;
- MemorySegment segment =
- cacheManager.getPage(
- CacheKey.forPageIndex(input.file(),
input.pageSize(), pageIndex),
- key -> input.readPage(pageIndex),
- this::invalidPage);
- container = new SegmentContainer(segment);
- segments.put(currentSegmentIndex, container);
- }
- return container.access();
- }
-
- @Override
- protected MemorySegment nextSegment(MemorySegment current) throws
EOFException {
- currentSegmentIndex++;
- if ((long) currentSegmentIndex << segmentSizeBits >=
input.uncompressBytes()) {
- throw new EOFException();
- }
-
- return getCurrentPage();
- }
-
- @Override
- protected int getLimitForSegment(MemorySegment segment) {
- return segment.size();
- }
-
- private void invalidPage(CacheKey key) {
- segments.remove(((PageIndexCacheKey) key).pageIndex());
- }
-
- @Override
- public void close() throws IOException {
- // copy out to avoid ConcurrentModificationException
- List<Integer> pages = new ArrayList<>(segments.keySet());
- pages.forEach(
- page ->
- cacheManager.invalidPage(
- CacheKey.forPageIndex(input.file(),
input.pageSize(), page)));
-
- input.close();
- }
-}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
index 684273ae15..ef862e6a3a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -18,7 +18,6 @@
package org.apache.paimon.lookup.sort;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -36,31 +35,26 @@ import java.util.Comparator;
/** A {@link LookupStoreReader} backed by an {@link SstFileReader}. */
public class SortLookupStoreReader implements LookupStoreReader {
- private final FileIO fileIO;
private final SeekableInputStream input;
- private final SstFileReader sstFileReader;
+ private final SstFileReader reader;
public SortLookupStoreReader(
Comparator<MemorySlice> comparator, File file, CacheManager
cacheManager)
throws IOException {
- final Path filePath = new Path(file.getAbsolutePath());
- this.fileIO = LocalFileIO.create();
- this.input = fileIO.newInputStream(filePath);
- this.sstFileReader =
- new SstFileReader(comparator, file.length(), filePath, input,
cacheManager);
+ Path filePath = new Path(file.getAbsolutePath());
+ this.input = LocalFileIO.INSTANCE.newInputStream(filePath);
+ this.reader = new SstFileReader(comparator, file.length(), filePath,
input, cacheManager);
}
@Nullable
@Override
public byte[] lookup(byte[] key) throws IOException {
- return sstFileReader.lookup(key);
+ return reader.lookup(key);
}
@Override
public void close() throws IOException {
- // be careful about the close order
- sstFileReader.close();
+ reader.close();
input.close();
- fileIO.close();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index 366e4560ac..e8ed55b96b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -19,7 +19,6 @@
package org.apache.paimon.lookup.sort;
import org.apache.paimon.compression.BlockCompressionFactory;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -34,8 +33,8 @@ import java.io.IOException;
/** A {@link LookupStoreWriter} backed by an {@link SstFileWriter}. */
public class SortLookupStoreWriter implements LookupStoreWriter {
- private final SstFileWriter sstFileWriter;
- private final FileIO fileIO;
+
+ private final SstFileWriter writer;
private final PositionOutputStream out;
public SortLookupStoreWriter(
@@ -44,21 +43,19 @@ public class SortLookupStoreWriter implements
LookupStoreWriter {
@Nullable BloomFilter.Builder bloomFilter,
BlockCompressionFactory compressionFactory)
throws IOException {
- final Path filePath = new Path(file.getAbsolutePath());
- this.fileIO = LocalFileIO.create();
- this.out = fileIO.newOutputStream(filePath, true);
- this.sstFileWriter = new SstFileWriter(out, blockSize, bloomFilter,
compressionFactory);
+ Path filePath = new Path(file.getAbsolutePath());
+ this.out = LocalFileIO.INSTANCE.newOutputStream(filePath, true);
+ this.writer = new SstFileWriter(out, blockSize, bloomFilter,
compressionFactory);
}
@Override
public void put(byte[] key, byte[] value) throws IOException {
- sstFileWriter.put(key, value);
+ writer.put(key, value);
}
@Override
public void close() throws IOException {
- sstFileWriter.close();
+ writer.close();
out.close();
- fileIO.close();
}
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
index fbeeba2e77..d391c8de69 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BlockCache.java
@@ -58,7 +58,6 @@ public class BlockCache implements Closeable {
public MemorySegment getBlock(
long position, int length, Function<byte[], byte[]>
decompressFunc, boolean isIndex) {
-
CacheKey cacheKey = CacheKey.forPosition(filePath, position, length,
isIndex);
SegmentContainer container = blocks.get(cacheKey);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
index 7f3804dd8d..dbcb962481 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/BloomFilterHandle.java
@@ -23,8 +23,6 @@ import java.util.Objects;
/** Handle for bloom filter. */
public class BloomFilterHandle {
- public static final int MAX_ENCODED_LENGTH = 9 + 5 + 9;
-
private final long offset;
private final int size;
private final long expectedEntries;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
index cf10e13a05..9184a5d9dd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileReader.java
@@ -47,11 +47,9 @@ public class SstFileReader implements Closeable {
private final Comparator<MemorySlice> comparator;
private final Path filePath;
- private final long fileSize;
-
- private final BlockIterator indexBlockIterator;
- @Nullable private FileBasedBloomFilter bloomFilter;
private final BlockCache blockCache;
+ private final BlockIterator indexBlockIterator;
+ @Nullable private final FileBasedBloomFilter bloomFilter;
public SstFileReader(
Comparator<MemorySlice> comparator,
@@ -62,13 +60,16 @@ public class SstFileReader implements Closeable {
throws IOException {
this.comparator = comparator;
this.filePath = filePath;
- this.fileSize = fileSize;
-
this.blockCache = new BlockCache(filePath, input, cacheManager);
- Footer footer = readFooter();
+ MemorySegment footerData =
+ blockCache.getBlock(
+ fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH, b -> b, true);
+ Footer footer =
Footer.readFooter(MemorySlice.wrap(footerData).toInput());
this.indexBlockIterator = readBlock(footer.getIndexBlockHandle(),
true).iterator();
BloomFilterHandle handle = footer.getBloomFilterHandle();
- if (handle != null) {
+ if (handle == null) {
+ this.bloomFilter = null;
+ } else {
this.bloomFilter =
new FileBasedBloomFilter(
input,
@@ -80,13 +81,6 @@ public class SstFileReader implements Closeable {
}
}
- private Footer readFooter() throws IOException {
- MemorySegment footerData =
- blockCache.getBlock(
- fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH, b -> b, true);
- return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
- }
-
/**
* Lookup the specified key in the file.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 8eb234b564..603899378b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -87,8 +87,7 @@ public class SstFileWriter implements Closeable {
PositionOutputStream out,
int blockSize,
@Nullable BloomFilter.Builder bloomFilter,
- @Nullable BlockCompressionFactory compressionFactory)
- throws IOException {
+ @Nullable BlockCompressionFactory compressionFactory) {
this.out = out;
this.blockSize = blockSize;
this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index 693c3d2bc7..f05bc6bcca 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.cache.CacheCallback;
import org.apache.paimon.io.cache.CacheKey;
+import org.apache.paimon.io.cache.CacheKey.PositionCacheKey;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.MemorySegment;
@@ -38,10 +39,8 @@ public class FileBasedBloomFilter implements Closeable {
private final SeekableInputStream input;
private final CacheManager cacheManager;
private final BloomFilter filter;
- private final long readOffset;
- private final CacheKey cacheKey;
- // each bloom filter is only used by a single file reader, so we can
safely reuse here
- private final byte[] reusedPageBuffer;
+ private final PositionCacheKey cacheKey;
+
private int accessCount;
public FileBasedBloomFilter(
@@ -55,10 +54,8 @@ public class FileBasedBloomFilter implements Closeable {
this.cacheManager = cacheManager;
checkArgument(expectedEntries >= 0);
this.filter = new BloomFilter(expectedEntries, readLength);
- this.readOffset = readOffset;
this.accessCount = 0;
this.cacheKey = CacheKey.forPosition(filePath, readOffset, readLength,
true);
- this.reusedPageBuffer = new byte[readLength];
}
public boolean testHash(int hash) {
@@ -68,17 +65,19 @@ public class FileBasedBloomFilter implements Closeable {
if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null)
{
MemorySegment segment =
cacheManager.getPage(
- cacheKey, key -> readPage(), new
BloomFilterCallBack(filter));
+ cacheKey, this::readBytes, new
BloomFilterCallBack(filter));
filter.setMemorySegment(segment, 0);
accessCount = 0;
}
return filter.testHash(hash);
}
- private byte[] readPage() throws IOException {
- input.seek(readOffset);
- IOUtils.readFully(input, reusedPageBuffer);
- return reusedPageBuffer;
+ private byte[] readBytes(CacheKey k) throws IOException {
+ PositionCacheKey key = (PositionCacheKey) k;
+ input.seek(key.position());
+ byte[] bytes = new byte[key.length()];
+ IOUtils.readFully(input, bytes);
+ return bytes;
}
@VisibleForTesting
diff --git
a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
deleted file mode 100644
index 42666a8781..0000000000
---
a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.compression.Lz4BlockCompressionFactory;
-import org.apache.paimon.utils.MathUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link CompressedPageFileOutput} and {@link
CompressedPageFileInput}. */
-public class CompressedInputOutputTest {
-
- @TempDir Path tempDir;
-
- @Test
- public void testRandom() throws IOException {
- for (int i = 0; i < 100; i++) {
- innerTestRandom();
- }
- }
-
- @SuppressWarnings("ResultOfMethodCallIgnored")
- private void innerTestRandom() throws IOException {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
- byte[] bytes = new byte[rnd.nextInt(1_000) + 1];
- rnd.nextBytes(bytes);
- int pageSize = MathUtils.roundDownToPowerOf2(rnd.nextInt(2_000) + 2);
-
- // prepare compressed file
- File compressed = new File(tempDir.toFile(), "compressed");
- compressed.delete();
- Lz4BlockCompressionFactory compressionFactory = new
Lz4BlockCompressionFactory();
- CompressedPageFileOutput output1 =
- new CompressedPageFileOutput(compressed, pageSize,
compressionFactory);
- long uncompressBytes;
- long[] pagePositions;
- try {
- output1.write(bytes, 0, bytes.length);
- } finally {
- output1.close();
- uncompressBytes = output1.uncompressBytes();
- pagePositions = output1.pages();
- }
- CompressedPageFileInput input1 =
- new CompressedPageFileInput(
- new RandomAccessFile(compressed, "r"),
- pageSize,
- compressionFactory,
- uncompressBytes,
- pagePositions);
-
- // prepare uncompressed file
- File uncompressed = new File(tempDir.toFile(), "uncompressed");
- uncompressed.delete();
- try (UncompressedPageFileOutput output2 = new
UncompressedPageFileOutput(uncompressed)) {
- output2.write(bytes, 0, bytes.length);
- }
- UncompressedPageFileInput input2 =
- new UncompressedPageFileInput(new
RandomAccessFile(uncompressed, "r"), pageSize);
-
- // test uncompressBytes
-
assertThat(input1.uncompressBytes()).isEqualTo(input2.uncompressBytes());
-
- // test readPage
- for (int i = 0; i < pagePositions.length; i++) {
- assertThat(input1.readPage(i)).isEqualTo(input2.readPage(i));
- }
-
- // test readPosition
- for (int i = 0; i < 10; i++) {
- long position;
- int length;
- if (uncompressBytes == 1) {
- position = 0;
- length = 1;
- } else {
- position = rnd.nextLong(uncompressBytes - 1);
- length = rnd.nextInt((int) (uncompressBytes - position)) + 1;
- }
- assertThat(input1.readPosition(position, length))
- .isEqualTo(input2.readPosition(position, length));
- }
-
- input1.close();
- input2.close();
- }
-}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
deleted file mode 100644
index 6486aead8c..0000000000
---
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io.cache;
-
-import org.apache.paimon.io.PageFileInput;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.options.MemorySize;
-import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
-import org.apache.paimon.testutils.junit.parameterized.Parameters;
-
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-/** Test for {@link FileBasedRandomInputView}. */
-@ExtendWith(ParameterizedTestExtension.class)
-public class FileBasedRandomInputViewTest {
-
- @TempDir Path tempDir;
-
- private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
- private final Cache.CacheType cacheType;
-
- public FileBasedRandomInputViewTest(Cache.CacheType cacheType) {
- this.cacheType = cacheType;
- }
-
- @Parameters(name = "{0}")
- public static List<Cache.CacheType> getVarSeg() {
- return Arrays.asList(Cache.CacheType.CAFFEINE, Cache.CacheType.GUAVA);
- }
-
- @TestTemplate
- public void testMatched() throws IOException {
- innerTest(1024 * 512, 5000);
- }
-
- @TestTemplate
- public void testNotMatched() throws IOException {
- innerTest(131092, 1000);
- }
-
- @TestTemplate
- public void testRandom() throws IOException {
- innerTest(rnd.nextInt(5000, 100000), 100);
- }
-
- private void innerTest(int len, int maxFileReadCount) throws IOException {
- byte[] bytes = new byte[len];
- MemorySegment segment = MemorySegment.wrap(bytes);
- for (int i = 0; i < bytes.length; i++) {
- bytes[i] = (byte) rnd.nextInt();
- }
-
- File file = writeFile(bytes);
- CacheManager cacheManager = new CacheManager(cacheType,
MemorySize.ofKibiBytes(128), 0);
- FileBasedRandomInputView view =
- new FileBasedRandomInputView(
- PageFileInput.create(file, 1024, null, 0, null),
cacheManager);
-
- // read first one
- // this assertThatCode check the ConcurrentModificationException is
not threw.
- assertThatCode(() ->
view.setReadPosition(0)).doesNotThrowAnyException();
- assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(0));
-
- // read mid
- int mid = bytes.length / 2;
- assertThatCode(() ->
view.setReadPosition(mid)).doesNotThrowAnyException();
- assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(mid));
-
- // read special
- assertThatCode(() ->
view.setReadPosition(1021)).doesNotThrowAnyException();
- assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(1021));
-
- // read last one
- assertThatCode(() -> view.setReadPosition(bytes.length -
1)).doesNotThrowAnyException();
- assertThat(view.readByte()).isEqualTo(bytes[bytes.length - 1]);
-
- // random read
- for (int i = 0; i < 10000; i++) {
- // hot key -> 10
- int position = rnd.nextBoolean() ? 10 : rnd.nextInt(bytes.length -
8);
- assertThatCode(() ->
view.setReadPosition(position)).doesNotThrowAnyException();
-
assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position));
- }
-
- view.close();
-
- // hot key in LRU, should have good cache hit rate
- assertThat(cacheManager.fileReadCount()).isLessThan(maxFileReadCount);
- assertThat(cacheManager.dataCache().asMap().size()).isEqualTo(0);
- assertThat(cacheManager.indexCache().asMap().size()).isEqualTo(0);
- }
-
- private File writeFile(byte[] bytes) throws IOException {
- File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
- if (!file.createNewFile()) {
- throw new IOException("Can not create: " + file);
- }
- Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
- return file;
- }
-}