This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7a79e1eb77 fix shared buffer, tests (#12587)
7a79e1eb77 is described below
commit 7a79e1eb7797592b0d3d249824d0af122af6a2a2
Author: Christopher Peck <[email protected]>
AuthorDate: Thu Mar 7 12:22:00 2024 -0800
fix shared buffer, tests (#12587)
---
.../impl/VarByteChunkForwardIndexWriterV4.java | 2 +-
.../segment/index/creator/VarByteChunkV4Test.java | 122 +++++++++++++++------
2 files changed, 87 insertions(+), 37 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 35c61f35f5..868511a437 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -94,7 +94,7 @@ public class VarByteChunkForwardIndexWriterV4 implements
VarByteChunkWriter {
public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType
compressionType, int chunkSize)
throws IOException {
- _dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
+ _dataBuffer = new File(file.getParentFile(), file.getName() +
DATA_BUFFER_SUFFIX);
_output = new RandomAccessFile(file, "rw");
_dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
_chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType,
true);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index 88414b8b58..70313d91e7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -21,8 +21,10 @@ package org.apache.pinot.segment.local.segment.index.creator;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -36,7 +38,6 @@ import
org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -46,73 +47,122 @@ import static org.testng.Assert.assertEquals;
public class VarByteChunkV4Test {
- private static final File TEST_DIR = new File(FileUtils.getTempDirectory(),
"VarByteChunkV4Test");
+ private static File[] _dirs;
- private File _file;
-
- @DataProvider
+ @DataProvider(parallel = true)
public Object[][] params() {
- return new Object[][]{
- {ChunkCompressionType.LZ4, 20, 1024},
- {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
- {ChunkCompressionType.PASS_THROUGH, 20, 1024},
- {ChunkCompressionType.SNAPPY, 20, 1024},
- {ChunkCompressionType.ZSTANDARD, 20, 1024},
- {ChunkCompressionType.LZ4, 2048, 1024},
- {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
- {ChunkCompressionType.PASS_THROUGH, 2048, 1024},
- {ChunkCompressionType.SNAPPY, 2048, 1024},
- {ChunkCompressionType.ZSTANDARD, 2048, 1024}
+ Object[][] params = new Object[][]{
+ {null, ChunkCompressionType.LZ4, 20, 1024},
+ {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
+ {null, ChunkCompressionType.PASS_THROUGH, 20, 1024},
+ {null, ChunkCompressionType.SNAPPY, 20, 1024},
+ {null, ChunkCompressionType.ZSTANDARD, 20, 1024},
+ {null, ChunkCompressionType.LZ4, 2048, 1024},
+ {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
+ {null, ChunkCompressionType.PASS_THROUGH, 2048, 1024},
+ {null, ChunkCompressionType.SNAPPY, 2048, 1024},
+ {null, ChunkCompressionType.ZSTANDARD, 2048, 1024}
};
+
+ for (int i = 0; i < _dirs.length; i++) {
+ params[i][0] = _dirs[i];
+ }
+
+ return params;
}
@BeforeClass
- public void forceMkDir()
+ public void forceMkDirs()
throws IOException {
- FileUtils.forceMkdir(TEST_DIR);
+ _dirs = new File[10];
+ for (int i = 0; i < _dirs.length; i++) {
+ _dirs[i] = new File(new File(FileUtils.getTempDirectory(),
UUID.randomUUID().toString()), "VarByteChunkV4Test");
+ FileUtils.forceMkdir(_dirs[i]);
+ }
}
@AfterClass
- public void deleteDir() {
- FileUtils.deleteQuietly(TEST_DIR);
- }
-
- @AfterMethod
- public void after() {
- if (_file != null) {
- FileUtils.deleteQuietly(_file);
+ public void deleteDirs() {
+ for (File dir : _dirs) {
+ FileUtils.deleteQuietly(dir);
}
}
@Test(dataProvider = "params")
- public void testStringSV(ChunkCompressionType compressionType, int
longestEntry, int chunkSize)
+ public void testStringSV(File file, ChunkCompressionType compressionType,
int longestEntry, int chunkSize)
throws IOException {
- _file = new File(TEST_DIR, "testStringSV");
- testSV(compressionType, longestEntry, chunkSize,
FieldSpec.DataType.STRING, x -> x,
+ File stringSVFile = new File(file, "testStringSV");
+ testWriteRead(stringSVFile, compressionType, longestEntry, chunkSize,
FieldSpec.DataType.STRING, x -> x,
VarByteChunkForwardIndexWriterV4::putString, (reader, context, docId)
-> reader.getString(docId, context));
+ FileUtils.deleteQuietly(stringSVFile);
+ }
+
+ @Test(dataProvider = "params")
+ public void testBytesSV(File file, ChunkCompressionType compressionType, int
longestEntry, int chunkSize)
+ throws IOException {
+ File bytesSVFile = new File(file, "testBytesSV");
+ testWriteRead(bytesSVFile, compressionType, longestEntry, chunkSize,
FieldSpec.DataType.BYTES,
+ x -> x.getBytes(StandardCharsets.UTF_8),
VarByteChunkForwardIndexWriterV4::putBytes,
+ (reader, context, docId) -> reader.getBytes(docId, context));
+ FileUtils.deleteQuietly(bytesSVFile);
}
@Test(dataProvider = "params")
- public void testBytesSV(ChunkCompressionType compressionType, int
longestEntry, int chunkSize)
+ public void testStringMV(File file, ChunkCompressionType compressionType,
int longestEntry, int chunkSize)
throws IOException {
- _file = new File(TEST_DIR, "testBytesSV");
- testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES,
x -> x.getBytes(StandardCharsets.UTF_8),
- VarByteChunkForwardIndexWriterV4::putBytes, (reader, context, docId)
-> reader.getBytes(docId, context));
+ File stringMVFile = new File(file, "testStringMV");
+ testWriteRead(stringMVFile, compressionType, longestEntry, chunkSize,
FieldSpec.DataType.STRING,
+ new StringSplitterMV(), VarByteChunkForwardIndexWriterV4::putStringMV,
+ (reader, context, docId) -> reader.getStringMV(docId, context));
+ FileUtils.deleteQuietly(stringMVFile);
+ }
+
+ @Test(dataProvider = "params")
+ public void testBytesMV(File file, ChunkCompressionType compressionType, int
longestEntry, int chunkSize)
+ throws IOException {
+ File bytesMVFile = new File(file, "testBytesMV");
+ testWriteRead(bytesMVFile, compressionType, longestEntry, chunkSize,
FieldSpec.DataType.BYTES, new ByteSplitterMV(),
+ VarByteChunkForwardIndexWriterV4::putBytesMV, (reader, context, docId)
-> reader.getBytesMV(docId, context));
+ FileUtils.deleteQuietly(bytesMVFile);
+ }
+
+ static class StringSplitterMV implements Function<String, String[]> {
+ @Override
+ public String[] apply(String input) {
+ List<String> res = new ArrayList<>();
+ for (int i = 0; i < input.length(); i += 3) {
+ int endIndex = Math.min(i + 3, input.length());
+ res.add(input.substring(i, endIndex));
+ }
+ return res.toArray(new String[0]);
+ }
+ }
+
+ static class ByteSplitterMV implements Function<String, byte[][]> {
+ @Override
+ public byte[][] apply(String input) {
+ List<byte[]> res = new ArrayList<>();
+ for (int i = 0; i < input.length(); i += 3) {
+ int endIndex = Math.min(i + 3, input.length());
+ res.add(input.substring(i, endIndex).getBytes());
+ }
+ return res.toArray(new byte[0][]);
+ }
}
- private <T> void testSV(ChunkCompressionType compressionType, int
longestEntry, int chunkSize,
+ private <T> void testWriteRead(File file, ChunkCompressionType
compressionType, int longestEntry, int chunkSize,
FieldSpec.DataType dataType, Function<String, T> forwardMapper,
BiConsumer<VarByteChunkForwardIndexWriterV4, T> write,
Read<T> read)
throws IOException {
List<T> values = randomStrings(1000,
longestEntry).map(forwardMapper).collect(Collectors.toList());
- try (VarByteChunkForwardIndexWriterV4 writer = new
VarByteChunkForwardIndexWriterV4(_file, compressionType,
+ try (VarByteChunkForwardIndexWriterV4 writer = new
VarByteChunkForwardIndexWriterV4(file, compressionType,
chunkSize)) {
for (T value : values) {
write.accept(writer, value);
}
}
- try (PinotDataBuffer buffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
+ try (PinotDataBuffer buffer =
PinotDataBuffer.mapReadOnlyBigEndianFile(file)) {
try (VarByteChunkForwardIndexReaderV4 reader = new
VarByteChunkForwardIndexReaderV4(buffer, dataType,
true); VarByteChunkForwardIndexReaderV4.ReaderContext context =
reader.createContext()) {
for (int i = 0; i < values.size(); i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]