This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a79e1eb77 fix shared buffer, tests (#12587)
7a79e1eb77 is described below

commit 7a79e1eb7797592b0d3d249824d0af122af6a2a2
Author: Christopher Peck <[email protected]>
AuthorDate: Thu Mar 7 12:22:00 2024 -0800

    fix shared buffer, tests (#12587)
---
 .../impl/VarByteChunkForwardIndexWriterV4.java     |   2 +-
 .../segment/index/creator/VarByteChunkV4Test.java  | 122 +++++++++++++++------
 2 files changed, 87 insertions(+), 37 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 35c61f35f5..868511a437 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -94,7 +94,7 @@ public class VarByteChunkForwardIndexWriterV4 implements 
VarByteChunkWriter {
 
   public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType 
compressionType, int chunkSize)
       throws IOException {
-    _dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
+    _dataBuffer = new File(file.getParentFile(), file.getName() + 
DATA_BUFFER_SUFFIX);
     _output = new RandomAccessFile(file, "rw");
     _dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, 
true);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index 88414b8b58..70313d91e7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -21,8 +21,10 @@ package org.apache.pinot.segment.local.segment.index.creator;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -36,7 +38,6 @@ import 
org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -46,73 +47,122 @@ import static org.testng.Assert.assertEquals;
 
 public class VarByteChunkV4Test {
 
-  private static final File TEST_DIR = new File(FileUtils.getTempDirectory(), 
"VarByteChunkV4Test");
+  private static File[] _dirs;
 
-  private File _file;
-
-  @DataProvider
+  @DataProvider(parallel = true)
   public Object[][] params() {
-    return new Object[][]{
-        {ChunkCompressionType.LZ4, 20, 1024},
-        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
-        {ChunkCompressionType.PASS_THROUGH, 20, 1024},
-        {ChunkCompressionType.SNAPPY, 20, 1024},
-        {ChunkCompressionType.ZSTANDARD, 20, 1024},
-        {ChunkCompressionType.LZ4, 2048, 1024},
-        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
-        {ChunkCompressionType.PASS_THROUGH, 2048, 1024},
-        {ChunkCompressionType.SNAPPY, 2048, 1024},
-        {ChunkCompressionType.ZSTANDARD, 2048, 1024}
+    Object[][] params = new Object[][]{
+        {null, ChunkCompressionType.LZ4, 20, 1024},
+        {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
+        {null, ChunkCompressionType.PASS_THROUGH, 20, 1024},
+        {null, ChunkCompressionType.SNAPPY, 20, 1024},
+        {null, ChunkCompressionType.ZSTANDARD, 20, 1024},
+        {null, ChunkCompressionType.LZ4, 2048, 1024},
+        {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
+        {null, ChunkCompressionType.PASS_THROUGH, 2048, 1024},
+        {null, ChunkCompressionType.SNAPPY, 2048, 1024},
+        {null, ChunkCompressionType.ZSTANDARD, 2048, 1024}
     };
+
+    for (int i = 0; i < _dirs.length; i++) {
+      params[i][0] = _dirs[i];
+    }
+
+    return params;
   }
 
   @BeforeClass
-  public void forceMkDir()
+  public void forceMkDirs()
       throws IOException {
-    FileUtils.forceMkdir(TEST_DIR);
+    _dirs = new File[10];
+    for (int i = 0; i < _dirs.length; i++) {
+      _dirs[i] = new File(new File(FileUtils.getTempDirectory(), 
UUID.randomUUID().toString()), "VarByteChunkV4Test");
+      FileUtils.forceMkdir(_dirs[i]);
+    }
   }
 
   @AfterClass
-  public void deleteDir() {
-    FileUtils.deleteQuietly(TEST_DIR);
-  }
-
-  @AfterMethod
-  public void after() {
-    if (_file != null) {
-      FileUtils.deleteQuietly(_file);
+  public void deleteDirs() {
+    for (File dir : _dirs) {
+      FileUtils.deleteQuietly(dir);
     }
   }
 
   @Test(dataProvider = "params")
-  public void testStringSV(ChunkCompressionType compressionType, int 
longestEntry, int chunkSize)
+  public void testStringSV(File file, ChunkCompressionType compressionType, 
int longestEntry, int chunkSize)
       throws IOException {
-    _file = new File(TEST_DIR, "testStringSV");
-    testSV(compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.STRING, x -> x,
+    File stringSVFile = new File(file, "testStringSV");
+    testWriteRead(stringSVFile, compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.STRING, x -> x,
         VarByteChunkForwardIndexWriterV4::putString, (reader, context, docId) 
-> reader.getString(docId, context));
+    FileUtils.deleteQuietly(stringSVFile);
+  }
+
+  @Test(dataProvider = "params")
+  public void testBytesSV(File file, ChunkCompressionType compressionType, int 
longestEntry, int chunkSize)
+      throws IOException {
+    File bytesSVFile = new File(file, "testBytesSV");
+    testWriteRead(bytesSVFile, compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.BYTES,
+        x -> x.getBytes(StandardCharsets.UTF_8), 
VarByteChunkForwardIndexWriterV4::putBytes,
+        (reader, context, docId) -> reader.getBytes(docId, context));
+    FileUtils.deleteQuietly(bytesSVFile);
   }
 
   @Test(dataProvider = "params")
-  public void testBytesSV(ChunkCompressionType compressionType, int 
longestEntry, int chunkSize)
+  public void testStringMV(File file, ChunkCompressionType compressionType, 
int longestEntry, int chunkSize)
       throws IOException {
-    _file = new File(TEST_DIR, "testBytesSV");
-    testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, 
x -> x.getBytes(StandardCharsets.UTF_8),
-        VarByteChunkForwardIndexWriterV4::putBytes, (reader, context, docId) 
-> reader.getBytes(docId, context));
+    File stringMVFile = new File(file, "testStringMV");
+    testWriteRead(stringMVFile, compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.STRING,
+        new StringSplitterMV(), VarByteChunkForwardIndexWriterV4::putStringMV,
+        (reader, context, docId) -> reader.getStringMV(docId, context));
+    FileUtils.deleteQuietly(stringMVFile);
+  }
+
+  @Test(dataProvider = "params")
+  public void testBytesMV(File file, ChunkCompressionType compressionType, int 
longestEntry, int chunkSize)
+      throws IOException {
+    File bytesMVFile = new File(file, "testBytesMV");
+    testWriteRead(bytesMVFile, compressionType, longestEntry, chunkSize, 
FieldSpec.DataType.BYTES, new ByteSplitterMV(),
+        VarByteChunkForwardIndexWriterV4::putBytesMV, (reader, context, docId) 
-> reader.getBytesMV(docId, context));
+    FileUtils.deleteQuietly(bytesMVFile);
+  }
+
+  static class StringSplitterMV implements Function<String, String[]> {
+    @Override
+    public String[] apply(String input) {
+      List<String> res = new ArrayList<>();
+      for (int i = 0; i < input.length(); i += 3) {
+        int endIndex = Math.min(i + 3, input.length());
+        res.add(input.substring(i, endIndex));
+      }
+      return res.toArray(new String[0]);
+    }
+  }
+
+  static class ByteSplitterMV implements Function<String, byte[][]> {
+    @Override
+    public byte[][] apply(String input) {
+      List<byte[]> res = new ArrayList<>();
+      for (int i = 0; i < input.length(); i += 3) {
+        int endIndex = Math.min(i + 3, input.length());
+        res.add(input.substring(i, endIndex).getBytes());
+      }
+      return res.toArray(new byte[0][]);
+    }
   }
 
-  private <T> void testSV(ChunkCompressionType compressionType, int 
longestEntry, int chunkSize,
+  private <T> void testWriteRead(File file, ChunkCompressionType 
compressionType, int longestEntry, int chunkSize,
       FieldSpec.DataType dataType, Function<String, T> forwardMapper,
       BiConsumer<VarByteChunkForwardIndexWriterV4, T> write,
       Read<T> read)
       throws IOException {
     List<T> values = randomStrings(1000, 
longestEntry).map(forwardMapper).collect(Collectors.toList());
-    try (VarByteChunkForwardIndexWriterV4 writer = new 
VarByteChunkForwardIndexWriterV4(_file, compressionType,
+    try (VarByteChunkForwardIndexWriterV4 writer = new 
VarByteChunkForwardIndexWriterV4(file, compressionType,
         chunkSize)) {
       for (T value : values) {
         write.accept(writer, value);
       }
     }
-    try (PinotDataBuffer buffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
+    try (PinotDataBuffer buffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(file)) {
       try (VarByteChunkForwardIndexReaderV4 reader = new 
VarByteChunkForwardIndexReaderV4(buffer, dataType,
           true); VarByteChunkForwardIndexReaderV4.ReaderContext context = 
reader.createContext()) {
         for (int i = 0; i < values.size(); i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to