This is an automated email from the ASF dual-hosted git repository.

lqc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e645a50210 Improve the unit test dataset used by 
CLPMutableForwardIndexV2Test and CLPForwardIndexCreatorV2Test (#14632)
e645a50210 is described below

commit e645a50210bf7b951177f9e553eb9ffabdf524ca
Author: Jack Luo <[email protected]>
AuthorDate: Sat Dec 14 04:48:22 2024 +0800

    Improve the unit test dataset used by CLPMutableForwardIndexV2Test and 
CLPForwardIndexCreatorV2Test (#14632)
    
    * Improve the unit test dataset used by CLPMutableForwardIndexV2Test and 
CLPForwardIndexCreatorV2Test
    
    * Add compressed log data.
    
    * Fix linting issue.
    
    * Improved unit test code quality.
---
 .../creator/CLPForwardIndexCreatorV2Test.java      | 127 ++++++++++++++++-----
 .../mutable/CLPMutableForwardIndexV2Test.java      |  43 +++----
 .../src/test/resources/data/log.jsonl.gz           | Bin 0 -> 6148486 bytes
 3 files changed, 124 insertions(+), 46 deletions(-)

diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
index c66ea2f3ae..32732e4cad 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java
@@ -18,75 +18,150 @@
  */
 package org.apache.pinot.segment.local.segment.index.creator;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV2;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.index.forward.mutable.VarByteSVMutableForwardIndexTest;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV2;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class CLPForwardIndexCreatorV2Test {
+  private static final String COLUMN_NAME = "column1";
   private static final File TEMP_DIR =
       new File(FileUtils.getTempDirectory(), 
CLPForwardIndexCreatorV2Test.class.getSimpleName());
   private PinotDataBufferMemoryManager _memoryManager;
+  private List<String> _logMessages = new ArrayList<>();
 
   @BeforeClass
   public void setUp()
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
     _memoryManager = new 
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    try (GzipCompressorInputStream gzipInputStream = new 
GzipCompressorInputStream(
+        getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
+        BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(gzipInputStream))) {
+      String line;
+      while ((line = bufferedReader.readLine()) != null) {
+        JsonNode jsonNode = objectMapper.readTree(line);
+        _logMessages.add(jsonNode.get("message").asText());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
   }
 
   @Test
   public void testCLPWriter()
       throws IOException {
-    List<String> logLines = new ArrayList<>();
-    logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
-        + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective: true");
-    logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
-        + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective: true");
-    logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] 
Handled request from 0.0"
-        + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
-    logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] 
Handled request from 0.0"
-        + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
-        + "application/json status code 200 OK");
-    logLines.add("null");
-
     // Create and ingest into a clp mutable forward indexes
-    CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new 
CLPMutableForwardIndexV2("column1", _memoryManager);
-    for (int i = 0; i < logLines.size(); i++) {
-      clpMutableForwardIndexV2.setString(i, logLines.get(i));
+    CLPMutableForwardIndexV2 clpMutableForwardIndexV2 = new 
CLPMutableForwardIndexV2(COLUMN_NAME, _memoryManager);
+    int rawSizeBytes = 0;
+    int maxLength = 0;
+    for (int i = 0; i < _logMessages.size(); i++) {
+      String logMessage = _logMessages.get(i);
+      clpMutableForwardIndexV2.setString(i, logMessage);
+      rawSizeBytes += logMessage.length();
+      maxLength = Math.max(maxLength, logMessage.length());
     }
 
-    // Create a immutable forward index from mutable forward index
-    CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
-        new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, 
ChunkCompressionType.ZSTANDARD);
-    for (int i = 0; i < logLines.size(); i++) {
-      
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
+    // LZ4 compression type
+    long rawStringFwdIndexSizeLZ4 = 
createStringRawForwardIndex(ChunkCompressionType.LZ4, maxLength);
+    long clpFwdIndexSizeLZ4 =
+        createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, 
ChunkCompressionType.LZ4);
+    // For LZ4 compression:
+    // 1. CLP raw forward index should achieve at least 40x compression
+    // 2. at least 25% smaller file size compared to standard raw forward 
index with LZ4 compression
+    Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeLZ4 >= 40);
+    Assert.assertTrue((float) rawStringFwdIndexSizeLZ4 / clpFwdIndexSizeLZ4 >= 
0.25);
+
+    // ZSTD compression type
+    long rawStringFwdIndexSizeZSTD = 
createStringRawForwardIndex(ChunkCompressionType.ZSTANDARD, maxLength);
+    long clpFwdIndexSizeZSTD =
+        createAndValidateClpImmutableForwardIndex(clpMutableForwardIndexV2, 
ChunkCompressionType.ZSTANDARD);
+    // For ZSTD compression
+    // 1. CLP raw forward index should achieve at least 66x compression
+    // 2. at least 19% smaller file size compared to standard raw forward 
index with ZSTD compression
+    Assert.assertTrue((float) rawSizeBytes / clpFwdIndexSizeZSTD >= 66);
+    Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD 
>= 0.19);
+  }
+
+  private long createStringRawForwardIndex(ChunkCompressionType 
compressionType, int maxLength)
+      throws IOException {
+    // Create a raw string immutable forward index
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+    SingleValueVarByteRawIndexCreator index =
+        new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, 
COLUMN_NAME, _logMessages.size(),
+            FieldSpec.DataType.STRING, maxLength);
+    for (String logMessage : _logMessages) {
+      index.putString(logMessage);
     }
-    clpForwardIndexCreatorV2.seal();
-    clpForwardIndexCreatorV2.close();
+    index.seal();
+    index.close();
+
+    File indexFile = new File(TEMP_DIR, COLUMN_NAME + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    return indexFile.length();
+  }
+
+  private long 
createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 
clpMutableForwardIndexV2,
+      ChunkCompressionType compressionType)
+      throws IOException {
+    long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, 
compressionType);
 
     // Read from immutable forward index and validate the content
-    File indexFile = new File(TEMP_DIR, "column1" + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    File indexFile = new File(TEMP_DIR, COLUMN_NAME + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     PinotDataBuffer pinotDataBuffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(indexFile);
-    CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new 
CLPForwardIndexReaderV2(pinotDataBuffer, logLines.size());
+    CLPForwardIndexReaderV2 clpForwardIndexReaderV2 = new 
CLPForwardIndexReaderV2(pinotDataBuffer, _logMessages.size());
     CLPForwardIndexReaderV2.CLPReaderContext clpForwardIndexReaderV2Context = 
clpForwardIndexReaderV2.createContext();
-    for (int i = 0; i < logLines.size(); i++) {
-      Assert.assertEquals(clpForwardIndexReaderV2.getString(i, 
clpForwardIndexReaderV2Context), logLines.get(i));
+    for (int i = 0; i < _logMessages.size(); i++) {
+      Assert.assertEquals(clpForwardIndexReaderV2.getString(i, 
clpForwardIndexReaderV2Context), _logMessages.get(i));
     }
+
+    return indexSize;
+  }
+
+  private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 
clpMutableForwardIndexV2,
+      ChunkCompressionType compressionType)
+      throws IOException {
+    // Create a CLP immutable forward index from mutable forward index
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+    CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
+        new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, 
compressionType);
+    for (int i = 0; i < _logMessages.size(); i++) {
+      
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
+    }
+    clpForwardIndexCreatorV2.seal();
+    clpForwardIndexCreatorV2.close();
+
+    File indexFile = new File(TEMP_DIR, COLUMN_NAME + 
V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    return indexFile.length();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
index b1824570dc..6179a4b8bf 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/CLPMutableForwardIndexV2Test.java
@@ -18,10 +18,15 @@
  */
 package org.apache.pinot.segment.local.segment.index.forward.mutable;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import 
org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndexV2;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
@@ -33,10 +38,24 @@ import org.testng.annotations.Test;
 
 public class CLPMutableForwardIndexV2Test {
   private PinotDataBufferMemoryManager _memoryManager;
+  private List<String> _logMessages = new ArrayList<>();
 
   @BeforeClass
   public void setUp() {
     _memoryManager = new 
DirectMemoryManager(VarByteSVMutableForwardIndexTest.class.getName());
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    try (GzipCompressorInputStream gzipInputStream = new 
GzipCompressorInputStream(
+        getClass().getClassLoader().getResourceAsStream("data/log.jsonl.gz"));
+        BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(gzipInputStream))) {
+      String line;
+      while ((line = bufferedReader.readLine()) != null) {
+        JsonNode jsonNode = objectMapper.readTree(line);
+        _logMessages.add(jsonNode.get("message").asText());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @AfterClass
@@ -52,33 +71,17 @@ public class CLPMutableForwardIndexV2Test {
   public void testReadWriteOnLogMessages()
       throws IOException {
     try (CLPMutableForwardIndexV2 readerWriter = new 
CLPMutableForwardIndexV2("col1", _memoryManager)) {
-      List<String> logLines = new ArrayList<>();
-      for (int i = 0; i < 10000; i++) {
-        logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32c_DEFAULT)] "
-            + "Event pinot::DEFAULT::4a02a32c_DEFAULT : Refreshed 35 property 
LiveInstance took 5 ms. Selective:"
-            + " true");
-        logLines.add("INFO [PropertyCache] 
[HelixController-pipeline-default-pinot-(4a02a32d_DEFAULT)] "
-            + "Event pinot::DEFAULT::4a02a32d_DEFAULT : Refreshed 81 property 
LiveInstance took 4 ms. Selective:"
-            + " true");
-        logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-2] 
Handled request from 0.0"
-            + ".0.0 GET https://0.0.0.0:8443/health?checkType=liveness, 
content-type null status code 200 OK");
-        logLines.add("INFO [ControllerResponseFilter] [grizzly-http-server-6] 
Handled request from 0.0"
-            + ".0.0 GET 
https://pinot-pinot-broker-headless.managed.svc.cluster.local:8093/tables, 
content-type "
-            + "application/json status code 200 OK");
-        logLines.add("null");
-      }
-
       // Typically, log messages should be clp encoded due to low logtype and 
dictionary variable cardinality
       Assert.assertTrue(readerWriter.isClpEncoded());
 
       // Write
-      for (int i = 0; i < logLines.size(); i++) {
-        readerWriter.setString(i, logLines.get(i));
+      for (int i = 0; i < _logMessages.size(); i++) {
+        readerWriter.setString(i, _logMessages.get(i));
       }
 
       // Read
-      for (int i = 0; i < logLines.size(); i++) {
-        Assert.assertEquals(readerWriter.getString(i), logLines.get(i));
+      for (int i = 0; i < _logMessages.size(); i++) {
+        Assert.assertEquals(readerWriter.getString(i), _logMessages.get(i));
       }
     }
   }
diff --git a/pinot-segment-local/src/test/resources/data/log.jsonl.gz 
b/pinot-segment-local/src/test/resources/data/log.jsonl.gz
new file mode 100644
index 0000000000..01f483fb08
Binary files /dev/null and 
b/pinot-segment-local/src/test/resources/data/log.jsonl.gz differ


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to