This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 6a6c060fae074d6e7efc981f8c1dc1a46a1775ac Author: Jingsong <[email protected]> AuthorDate: Tue Jan 30 16:02:51 2024 +0800 [core] Extract PageFileInput and PageFileOutput from Hash lookup --- .../compression/BlockCompressionFactory.java | 9 +- .../apache/paimon/io/CompressedPageFileInput.java | 115 +++++++++++++++++++++ .../apache/paimon/io/CompressedPageFileOutput.java | 109 +++++++++++++++++++ .../java/org/apache/paimon/io/PageFileInput.java | 43 ++++++++ .../java/org/apache/paimon/io/PageFileOutput.java | 33 ++++++ .../paimon/io/UncompressedPageFileInput.java | 79 ++++++++++++++ .../paimon/io/UncompressedPageFileOutput.java | 44 ++++++++ .../org/apache/paimon/io/cache/CacheCallback.java | 25 +++++ .../java/org/apache/paimon/io/cache/CacheKey.java | 104 +++++++++++++++++++ .../org/apache/paimon/io/cache/CacheManager.java | 65 ++---------- .../org/apache/paimon/io/cache/CacheReader.java | 27 +++++ .../paimon/io/cache/FileBasedRandomInputView.java | 60 ++++------- .../paimon/lookup/hash/HashLookupStoreReader.java | 8 +- .../paimon/lookup/hash/HashLookupStoreWriter.java | 21 ++-- .../apache/paimon/utils/FileBasedBloomFilter.java | 17 ++- .../paimon/io/CompressedInputOutputTest.java | 112 ++++++++++++++++++++ .../io/cache/FileBasedRandomInputViewTest.java | 4 +- .../paimon/utils/FileBasedBloomFilterTest.java | 4 +- 18 files changed, 760 insertions(+), 119 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java index 40bb10fc4..f90d4e039 100644 --- a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java @@ -23,6 +23,8 @@ import io.airlift.compress.lzo.LzoDecompressor; import io.airlift.compress.zstd.ZstdCompressor; import io.airlift.compress.zstd.ZstdDecompressor; +import javax.annotation.Nullable; + /** * Each compression codec has an implementation of {@link BlockCompressionFactory} to create * compressors and decompressors. @@ -34,7 +36,12 @@ public interface BlockCompressionFactory { BlockDecompressor getDecompressor(); /** Creates {@link BlockCompressionFactory} according to the configuration. */ - static BlockCompressionFactory create(String compression) { + @Nullable + static BlockCompressionFactory create(@Nullable String compression) { + if (compression == null) { + return null; + } + switch (compression.toUpperCase()) { case "LZ4": return new Lz4BlockCompressionFactory(); diff --git a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java new file mode 100644 index 000000000..3d1a23892 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.BlockDecompressor; +import org.apache.paimon.utils.MathUtils; + +import java.io.IOException; +import java.io.RandomAccessFile; + +/** A class to wrap compressed {@link RandomAccessFile}. */ +public class CompressedPageFileInput implements PageFileInput { + + private final RandomAccessFile file; + private final int pageSize; + private final long uncompressBytes; + private final long[] pagePositions; + + private final BlockDecompressor decompressor; + private final byte[] uncompressedBuffer; + private final byte[] compressedBuffer; + + private final int pageSizeBits; + private final int pageSizeMask; + + public CompressedPageFileInput( + RandomAccessFile file, + int pageSize, + BlockCompressionFactory compressionFactory, + long uncompressBytes, + long[] pagePositions) { + this.file = file; + this.pageSize = pageSize; + this.uncompressBytes = uncompressBytes; + this.pagePositions = pagePositions; + + this.uncompressedBuffer = new byte[pageSize]; + this.decompressor = compressionFactory.getDecompressor(); + this.compressedBuffer = + new byte[compressionFactory.getCompressor().getMaxCompressedSize(pageSize)]; + + this.pageSizeBits = MathUtils.log2strict(pageSize); + this.pageSizeMask = pageSize - 1; + } + + @Override + public RandomAccessFile file() { + return file; + } + + @Override + public long uncompressBytes() { + return uncompressBytes; + } + + @Override + public int pageSize() { + return pageSize; + } + + @Override + public byte[] readPage(int pageIndex) throws IOException { + long position = pagePositions[pageIndex]; + file.seek(position); + int compressLength = file.readInt(); + file.readFully(compressedBuffer, 0, compressLength); + + int uncompressedLength = + decompressor.decompress(compressedBuffer, 0, compressLength, uncompressedBuffer, 0); + + byte[] result = new byte[uncompressedLength]; + System.arraycopy(uncompressedBuffer, 0, result, 0, uncompressedLength); + return result; + } + + @Override + public byte[] readPosition(long position, int length) throws IOException { + int offset = (int) (position & this.pageSizeMask); + int pageIndex = (int) (position >>> this.pageSizeBits); + + byte[] result = new byte[length]; + int n = 0; + do { + byte[] page = readPage(pageIndex); + int currentLength = Math.min(page.length - offset, length - n); + System.arraycopy(page, offset, result, n, currentLength); + offset = 0; + n += currentLength; + pageIndex++; + } while (n < length); + return result; + } + + @Override + public void close() throws IOException { + this.file.close(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java new file mode 100644 index 000000000..6e1571589 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.compression.BlockCompressionFactory; +import org.apache.paimon.compression.BlockCompressor; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +/** A class to output bytes with compression. */ +public class CompressedPageFileOutput implements PageFileOutput { + + private final FileOutputStream out; + private final byte[] page; + private final BlockCompressor compressor; + private final byte[] compressedPage; + private final List<Long> pages; + + private long uncompressBytes; + private long position; + private int count; + + public CompressedPageFileOutput( + File file, int pageSize, BlockCompressionFactory compressionFactory) + throws FileNotFoundException { + this.out = new FileOutputStream(file); + this.page = new byte[pageSize]; + this.compressor = compressionFactory.getCompressor(); + this.compressedPage = new byte[compressor.getMaxCompressedSize(pageSize)]; + this.pages = new ArrayList<>(); + + this.uncompressBytes = 0; + this.position = 0; + this.count = 0; + } + + private void flushBuffer() throws IOException { + if (count > 0) { + pages.add(position); + int len = compressor.compress(page, 0, count, compressedPage, 0); + // write length + out.write((len >>> 24) & 0xFF); + out.write((len >>> 16) & 0xFF); + out.write((len >>> 8) & 0xFF); + out.write(len & 0xFF); + // write page + out.write(compressedPage, 0, len); + count = 0; + position += (len + 4); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + uncompressBytes += len; + + while (len > 0) { + if (count >= page.length) { + flushBuffer(); + } + int toWrite = Math.min(len, page.length - count); + System.arraycopy(b, off, page, count, toWrite); + off += toWrite; + len -= toWrite; + count += toWrite; + } + } + + @Override + public void close() throws IOException { + try (OutputStream ignored = out) { + flushBuffer(); + } + } + + public long uncompressBytes() { + return uncompressBytes; + } + + public long[] pages() { + long[] pages = new long[this.pages.size()]; + for (int i = 0; i < pages.length; i++) { + pages[i] = this.pages.get(i); + } + return pages; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java new file mode 100644 index 000000000..da699304f --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** An interface to read pages from file. */ +public interface PageFileInput extends Closeable { + + RandomAccessFile file(); + + long uncompressBytes(); + + int pageSize(); + + byte[] readPage(int pageIndex) throws IOException; + + byte[] readPosition(long position, int length) throws IOException; + + static PageFileInput create(File file, int pageSize) throws IOException { + RandomAccessFile accessFile = new RandomAccessFile(file, "r"); + return new UncompressedPageFileInput(accessFile, pageSize); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java new file mode 100644 index 000000000..1a623ad90 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +/** An interface to write bytes with pages into file. */ +public interface PageFileOutput extends Closeable { + + void write(byte[] bytes, int off, int len) throws IOException; + + static PageFileOutput create(File file) throws IOException { + return new UncompressedPageFileOutput(file); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java new file mode 100644 index 000000000..dd7922393 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.utils.MathUtils; + +import java.io.IOException; +import java.io.RandomAccessFile; + +/** A class to wrap uncompressed {@link RandomAccessFile}. */ +public class UncompressedPageFileInput implements PageFileInput { + + private final RandomAccessFile file; + private final long fileLength; + private final int pageSize; + private final int pageSizeBits; + + public UncompressedPageFileInput(RandomAccessFile file, int pageSize) throws IOException { + this.file = file; + this.fileLength = file.length(); + this.pageSize = pageSize; + this.pageSizeBits = MathUtils.log2strict(pageSize); + } + + @Override + public RandomAccessFile file() { + return file; + } + + @Override + public long uncompressBytes() { + return fileLength; + } + + @Override + public int pageSize() { + return pageSize; + } + + @Override + public byte[] readPage(int pageIndex) throws IOException { + long position = (long) pageIndex << pageSizeBits; + file.seek(position); + + int length = (int) Math.min(pageSize, fileLength - position); + byte[] result = new byte[length]; + file.readFully(result); + return result; + } + + @Override + public byte[] readPosition(long position, int length) throws IOException { + file.seek(position); + byte[] result = new byte[length]; + file.readFully(result); + return result; + } + + @Override + public void close() throws IOException { + this.file.close(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java new file mode 100644 index 000000000..799395f0b --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; + +/** A class to wrap uncompressed {@link FileOutputStream}. */ +public class UncompressedPageFileOutput implements PageFileOutput { + + private final FileOutputStream out; + + public UncompressedPageFileOutput(File file) throws FileNotFoundException { + this.out = new FileOutputStream(file); + } + + @Override + public void write(byte[] bytes, int off, int len) throws IOException { + this.out.write(bytes, off, len); + } + + @Override + public void close() throws IOException { + this.out.close(); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java new file mode 100644 index 000000000..523e34b43 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io.cache; + +/** Callback for cache removal. */ +public interface CacheCallback { + + void onRemoval(CacheKey key); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java new file mode 100644 index 000000000..b313018d3 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io.cache; + +import java.io.RandomAccessFile; +import java.util.Objects; + +/** Key for cache manager. */ +public interface CacheKey { + + static CacheKey forPosition(RandomAccessFile file, long position, int length) { + return new PositionCacheKey(file, position, length); + } + + static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int pageIndex) { + return new PageIndexCacheKey(file, pageSize, pageIndex); + } + + /** Key for file position and length. */ + class PositionCacheKey implements CacheKey { + + private final RandomAccessFile file; + private final long position; + private final int length; + + private PositionCacheKey(RandomAccessFile file, long position, int length) { + this.file = file; + this.position = position; + this.length = length; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PositionCacheKey that = (PositionCacheKey) o; + return position == that.position + && length == that.length + && Objects.equals(file, that.file); + } + + @Override + public int hashCode() { + return Objects.hash(file, position, length); + } + } + + /** Key for file page index. */ + class PageIndexCacheKey implements CacheKey { + + private final RandomAccessFile file; + private final int pageSize; + private final int pageIndex; + + private PageIndexCacheKey(RandomAccessFile file, int pageSize, int pageIndex) { + this.file = file; + this.pageSize = pageSize; + this.pageIndex = pageIndex; + } + + public int pageIndex() { + return pageIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PageIndexCacheKey that = (PageIndexCacheKey) o; + return pageSize == that.pageSize + && pageIndex == that.pageIndex + && Objects.equals(file, that.file); + } + + @Override + public int hashCode() { + return Objects.hash(file, pageSize, pageIndex); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java index 0634e1c07..3f09fa5f4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java @@ -28,9 +28,6 @@ import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Remo import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.Objects; -import java.util.function.BiConsumer; /** Cache manager to cache bytes to paged {@link MemorySegment}s. */ public class CacheManager { @@ -61,16 +58,12 @@ public class CacheManager { return cache; } - public MemorySegment getPage( - RandomAccessFile file, - long readOffset, - int readLength, - BiConsumer<Long, Integer> cleanCallback) { - CacheKey key = new CacheKey(file, readOffset, readLength); + public MemorySegment getPage(CacheKey key, CacheReader reader, CacheCallback callback) { CacheValue value = cache.getIfPresent(key); while (value == null || value.isClosed) { try { - value = new CacheValue(key.read(), cleanCallback); + this.fileReadCount++; + value = new CacheValue(MemorySegment.wrap(reader.read(key)), callback); } catch (IOException e) { throw new RuntimeException(e); } @@ -79,8 +72,8 @@ public class CacheManager { return value.segment; } - public void invalidPage(RandomAccessFile file, long readOffset, int readLength) { - cache.invalidate(new CacheKey(file, readOffset, readLength)); + public void invalidPage(CacheKey key) { + cache.invalidate(key); } private int weigh(CacheKey cacheKey, CacheValue cacheValue) { @@ -89,63 +82,23 @@ public class CacheManager { private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) { value.isClosed = true; - value.cleanCallback.accept(key.offset, key.length); + value.callback.onRemoval(key); } public int fileReadCount() { return fileReadCount; } - private class CacheKey { - - private final RandomAccessFile file; - private final long offset; - private final int length; - - private CacheKey(RandomAccessFile file, long offset, int length) { - this.file = file; - this.offset = offset; - this.length = length; - } - - private MemorySegment read() throws IOException { - byte[] bytes = new byte[length]; - file.seek(offset); - file.readFully(bytes); - fileReadCount++; - return MemorySegment.wrap(bytes); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CacheKey cacheKey = (CacheKey) o; - return Objects.equals(file, cacheKey.file) - && offset == cacheKey.offset - && length == cacheKey.length; - } - - @Override - public int hashCode() { - return Objects.hash(file, offset, length); - } - } - private static class CacheValue { private final MemorySegment segment; - private final BiConsumer<Long, Integer> cleanCallback; + private final CacheCallback callback; private boolean isClosed = false; - private CacheValue(MemorySegment segment, BiConsumer<Long, Integer> cleanCallback) { + private CacheValue(MemorySegment segment, CacheCallback callback) { this.segment = segment; - this.cleanCallback = cleanCallback; + this.callback = callback; } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java new file mode 100644 index 000000000..2067cafce --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io.cache; + +import java.io.IOException; + +/** Reader to read byte[]. */ +public interface CacheReader { + + byte[] read(CacheKey key) throws IOException; +} diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java index ef7594516..b6fa559d9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java @@ -19,14 +19,14 @@ package org.apache.paimon.io.cache; import org.apache.paimon.data.AbstractPagedInputView; +import org.apache.paimon.io.PageFileInput; import org.apache.paimon.io.SeekableDataInputView; +import org.apache.paimon.io.cache.CacheKey.PageIndexCacheKey; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.utils.MathUtils; import java.io.Closeable; import java.io.EOFException; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.ArrayList; @@ -43,23 +43,19 @@ import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT; public class FileBasedRandomInputView extends AbstractPagedInputView implements SeekableDataInputView, Closeable { - private final RandomAccessFile file; - private final long fileLength; + private final PageFileInput input; private final CacheManager cacheManager; private final Map<Integer, SegmentContainer> segments; - private final int segmentSize; private final int segmentSizeBits; private final int segmentSizeMask; private int currentSegmentIndex; - public FileBasedRandomInputView(File file, CacheManager cacheManager, int segmentSize) - throws FileNotFoundException { - this.file = new RandomAccessFile(file, "r"); - this.fileLength = file.length(); + public FileBasedRandomInputView(PageFileInput input, CacheManager cacheManager) { + this.input = input; this.cacheManager = cacheManager; this.segments = new HashMap<>(); - this.segmentSize = segmentSize; + int segmentSize = input.pageSize(); this.segmentSizeBits = MathUtils.log2strict(segmentSize); this.segmentSizeMask = segmentSize - 1; @@ -68,8 +64,8 @@ public class FileBasedRandomInputView extends AbstractPagedInputView @Override public void setReadPosition(long position) { - final int offset = (int) (position & this.segmentSizeMask); - this.currentSegmentIndex = positionToSegmentIndex(position); + int offset = (int) (position & this.segmentSizeMask); + this.currentSegmentIndex = (int) (position >>> this.segmentSizeBits); MemorySegment segment = getCurrentPage(); seekInput(segment, offset, getLimitForSegment(segment)); } @@ -77,11 +73,13 @@ public class FileBasedRandomInputView extends AbstractPagedInputView private MemorySegment getCurrentPage() { SegmentContainer container = segments.get(currentSegmentIndex); if (container == null || container.accessCount == REFRESH_COUNT) { - long offset = segmentIndexToPosition(currentSegmentIndex); - int length = (int) Math.min(segmentSize, fileLength - offset); - container = - new SegmentContainer( - cacheManager.getPage(file, offset, length, this::invalidPage)); + int pageIndex = currentSegmentIndex; + MemorySegment segment = + cacheManager.getPage( + CacheKey.forPageIndex(input.file(), input.pageSize(), pageIndex), + key -> input.readPage(pageIndex), + this::invalidPage); + container = new SegmentContainer(segment); segments.put(currentSegmentIndex, container); } return container.access(); @@ -90,7 +88,7 @@ public class FileBasedRandomInputView extends AbstractPagedInputView @Override protected MemorySegment nextSegment(MemorySegment current) throws EOFException { currentSegmentIndex++; - if (segmentIndexToPosition(currentSegmentIndex) >= fileLength) { + if ((long) currentSegmentIndex << segmentSizeBits >= input.uncompressBytes()) { throw new EOFException(); } @@ -102,8 +100,8 @@ public class FileBasedRandomInputView extends AbstractPagedInputView return segment.size(); } - private void invalidPage(long position, int length) { - segments.remove(positionToSegmentIndex(position)); + private void invalidPage(CacheKey key) { + segments.remove(((PageIndexCacheKey) key).pageIndex()); } @Override @@ -111,25 +109,11 @@ public class FileBasedRandomInputView extends AbstractPagedInputView // copy out to avoid ConcurrentModificationException List<Integer> pages = new ArrayList<>(segments.keySet()); pages.forEach( - page -> { - long offset = segmentIndexToPosition(page); - int length = (int) Math.min(segmentSize, fileLength - offset); - cacheManager.invalidPage(file, offset, length); - }); + page -> + cacheManager.invalidPage( + CacheKey.forPageIndex(input.file(), input.pageSize(), page))); - file.close(); - } - - private int positionToSegmentIndex(long position) { - return (int) (position >>> this.segmentSizeBits); - } - - private long segmentIndexToPosition(int segmentIndex) { - return (long) segmentIndex << segmentSizeBits; - } - - public RandomAccessFile file() { - return file; + input.close(); } private static class SegmentContainer { diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java index 19849953a..853cd4049 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java @@ -18,6 +18,7 @@ package org.apache.paimon.lookup.hash; +import org.apache.paimon.io.PageFileInput; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.io.cache.FileBasedRandomInputView; import org.apache.paimon.lookup.LookupStoreReader; @@ -81,13 +82,14 @@ public class HashLookupStoreReader dataOffsets = context.dataOffsets; LOG.info("Opening file {}", file.getName()); - // Create Mapped file in read-only mode - inputView = new FileBasedRandomInputView(file, cacheManager, cachePageSize); + + PageFileInput fileInput = PageFileInput.create(file, cachePageSize); + inputView = new FileBasedRandomInputView(fileInput, cacheManager); if (context.bloomFilterEnabled) { bloomFilter = new FileBasedBloomFilter( - inputView.file(), + fileInput, cacheManager, context.bloomFilterExpectedEntries, 0, diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java index d0d00ba0e..8853ab827 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java @@ -18,6 +18,7 @@ package org.apache.paimon.lookup.hash; +import org.apache.paimon.io.PageFileOutput; import org.apache.paimon.lookup.LookupStoreFactory.Context; import org.apache.paimon.lookup.LookupStoreWriter; import org.apache.paimon.utils.BloomFilter; @@ -37,7 +38,6 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -57,7 +57,7 @@ public class HashLookupStoreWriter implements LookupStoreWriter { private final double loadFactor; // Output private final File tempFolder; - private final OutputStream outputStream; + private final File outputFile; // Index stream private File[] indexFiles; private DataOutputStream[] indexStreams; @@ -84,6 +84,7 @@ public class HashLookupStoreWriter implements LookupStoreWriter { HashLookupStoreWriter(double loadFactor, File file, @Nullable BloomFilter.Builder bloomFilter) throws IOException { this.loadFactor = loadFactor; + this.outputFile = file; if (loadFactor <= 0.0 || loadFactor >= 1.0) { throw new IllegalArgumentException( "Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); @@ -93,7 +94,6 @@ public class HashLookupStoreWriter implements LookupStoreWriter { if (!tempFolder.mkdir()) { throw new IOException("Can not create temp folder: " + tempFolder); } - this.outputStream = new BufferedOutputStream(new FileOutputStream(file)); this.indexStreams = new DataOutputStream[0]; this.dataStreams = new DataOutputStream[0]; this.indexFiles = new File[0]; @@ -223,7 +223,7 @@ public class HashLookupStoreWriter implements LookupStoreWriter { context.dataOffsets[i] = indexesLength + context.dataOffsets[i]; } - try { + try (PageFileOutput output = PageFileOutput.create(outputFile)) { // Write bloom filter file if (bloomFilter != null) { File bloomFilterFile = new File(tempFolder, "bloomfilter.dat"); @@ -254,12 +254,15 @@ public class HashLookupStoreWriter implements LookupStoreWriter { // Merge and write to output checkFreeDiskSpace(filesToMerge); - mergeFiles(filesToMerge, outputStream); - return context; + mergeFiles(filesToMerge, output); } finally { - outputStream.close(); cleanup(filesToMerge); } + + LOG.info( + "Compressed Total store size: {} Mb", + new DecimalFormat("#,##0.0").format(outputFile.length() / (1024 * 1024))); + return context; } private File buildIndex(int keyLength) throws IOException { @@ -377,7 +380,7 @@ public class HashLookupStoreWriter implements LookupStoreWriter { } // Merge files to the provided fileChannel - private void mergeFiles(List<File> inputFiles, OutputStream outputStream) throws IOException { + private void mergeFiles(List<File> inputFiles, PageFileOutput output) throws IOException { long startTime = System.nanoTime(); // Merge files @@ -391,7 +394,7 @@ public class HashLookupStoreWriter implements LookupStoreWriter { byte[] buffer = new byte[8192]; int length; while ((length = bufferedInputStream.read(buffer)) > 0) { - outputStream.write(buffer, 0, length); + output.write(buffer, 0, length); } } finally { bufferedInputStream.close(); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java index d6ce54bf3..e2abc361d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java @@ -21,18 +21,18 @@ package org.apache.paimon.utils; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.io.PageFileInput; +import org.apache.paimon.io.cache.CacheKey; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.MemorySegment; -import java.io.RandomAccessFile; - import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Util to apply a built bloom filter . */ public class FileBasedBloomFilter { - private final RandomAccessFile file; + private final PageFileInput input; private final CacheManager cacheManager; private final BloomFilter filter; private final long readOffset; @@ -41,12 +41,12 @@ public class FileBasedBloomFilter { private int accessCount; public FileBasedBloomFilter( - RandomAccessFile file, + PageFileInput input, CacheManager cacheManager, long expectedEntries, long readOffset, int readLength) { - this.file = file; + this.input = input; this.cacheManager = cacheManager; checkArgument(expectedEntries >= 0); this.filter = new BloomFilter(expectedEntries, readLength); @@ -62,10 +62,9 @@ public class FileBasedBloomFilter { if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) { MemorySegment segment = cacheManager.getPage( - file, - readOffset, - readLength, - (position, length) -> filter.unsetMemorySegment()); + CacheKey.forPosition(input.file(), readOffset, readLength), + key -> input.readPosition(readOffset, readLength), + key -> filter.unsetMemorySegment()); filter.setMemorySegment(segment, 0); accessCount = 0; } diff --git a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java new file mode 100644 index 000000000..42666a878 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.compression.Lz4BlockCompressionFactory; +import org.apache.paimon.utils.MathUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link CompressedPageFileOutput} and {@link CompressedPageFileInput}. */ +public class CompressedInputOutputTest { + + @TempDir Path tempDir; + + @Test + public void testRandom() throws IOException { + for (int i = 0; i < 100; i++) { + innerTestRandom(); + } + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + private void innerTestRandom() throws IOException { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + byte[] bytes = new byte[rnd.nextInt(1_000) + 1]; + rnd.nextBytes(bytes); + int pageSize = MathUtils.roundDownToPowerOf2(rnd.nextInt(2_000) + 2); + + // prepare compressed file + File compressed = new File(tempDir.toFile(), "compressed"); + compressed.delete(); + Lz4BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory(); + CompressedPageFileOutput output1 = + new CompressedPageFileOutput(compressed, pageSize, compressionFactory); + long uncompressBytes; + long[] pagePositions; + try { + output1.write(bytes, 0, bytes.length); + } finally { + output1.close(); + uncompressBytes = output1.uncompressBytes(); + pagePositions = output1.pages(); + } + CompressedPageFileInput input1 = + new CompressedPageFileInput( + new RandomAccessFile(compressed, "r"), + pageSize, + compressionFactory, + uncompressBytes, + pagePositions); + + // prepare uncompressed file + File uncompressed = new File(tempDir.toFile(), "uncompressed"); + uncompressed.delete(); + try (UncompressedPageFileOutput output2 = new UncompressedPageFileOutput(uncompressed)) { + output2.write(bytes, 0, bytes.length); + } + UncompressedPageFileInput input2 = + new UncompressedPageFileInput(new RandomAccessFile(uncompressed, "r"), pageSize); + + // test uncompressBytes + assertThat(input1.uncompressBytes()).isEqualTo(input2.uncompressBytes()); + + // test readPage + for (int i = 0; i < pagePositions.length; i++) { + assertThat(input1.readPage(i)).isEqualTo(input2.readPage(i)); + } + + // test readPosition + for (int i = 0; i < 10; i++) { + long position; + int length; + if (uncompressBytes == 1) { + position = 0; + length = 1; + } else { + position = rnd.nextLong(uncompressBytes - 1); + length = rnd.nextInt((int) (uncompressBytes - position)) + 1; + } + assertThat(input1.readPosition(position, length)) + .isEqualTo(input2.readPosition(position, length)); + } + + input1.close(); + input2.close(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java index e1bfe16cf..a0be24d8f 100644 --- a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.io.cache; +import org.apache.paimon.io.PageFileInput; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.options.MemorySize; @@ -66,7 +67,8 @@ public class FileBasedRandomInputViewTest { File file = writeFile(bytes); CacheManager cacheManager = new CacheManager(MemorySize.ofKibiBytes(128)); - FileBasedRandomInputView view = new FileBasedRandomInputView(file, cacheManager, 1024); + FileBasedRandomInputView view = + new FileBasedRandomInputView(PageFileInput.create(file, 1024), cacheManager); // read first one // this assertThatCode check the ConcurrentModificationException is not threw. diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java index 9a71544c0..bf43680c7 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java @@ -20,6 +20,7 @@ package org.apache.paimon.utils; +import org.apache.paimon.io.PageFileInput; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.options.MemorySize; @@ -30,7 +31,6 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -53,7 +53,7 @@ public class FileBasedBloomFilterTest { CacheManager cacheManager = new CacheManager(MemorySize.ofMebiBytes(1)); FileBasedBloomFilter filter = new FileBasedBloomFilter( - new RandomAccessFile(file, "r"), cacheManager, 100, 0, 1000); + PageFileInput.create(file, 1000), cacheManager, 100, 0, 1000); Arrays.stream(inputs) .forEach(i -> Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());
