This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 506892d953 Make thread local variable static to avoid the memory 
leaking issue (#12242)
506892d953 is described below

commit 506892d953a2d5fce754412d1d1167c93f75081f
Author: Xuanyi Li <xuany...@uber.com>
AuthorDate: Thu Feb 8 08:43:18 2024 -0800

    Make thread local variable static to avoid the memory leaking issue (#12242)
---
 .../readers/forward/VarByteChunkSVForwardIndexReader.java | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index 9c9f9ce5f1..585d48d7ea 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -41,7 +41,7 @@ public final class VarByteChunkSVForwardIndexReader extends 
BaseChunkForwardInde
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
-  private final ThreadLocal<byte[]> _reusableBytes = 
ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
+  private static ThreadLocal<byte[]> _reusableBytes = 
ThreadLocal.withInitial(() -> new byte[0]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType 
valueType) {
     super(dataBuffer, valueType, true);
@@ -84,7 +84,7 @@ public final class VarByteChunkSVForwardIndexReader extends 
BaseChunkForwardInde
     int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
     int length = valueEndOffset - valueStartOffset;
-    byte[] bytes = _reusableBytes.get();
+    byte[] bytes = getOrExpandByteArray();
     chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
     return new String(bytes, 0, length, UTF_8);
@@ -103,11 +103,20 @@ public final class VarByteChunkSVForwardIndexReader 
extends BaseChunkForwardInde
     long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, 
chunkStartOffset);
 
     int length = (int) (valueEndOffset - valueStartOffset);
-    byte[] bytes = _reusableBytes.get();
+    byte[] bytes = getOrExpandByteArray();
     _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return new String(bytes, 0, length, UTF_8);
   }
 
+  private byte[] getOrExpandByteArray() {
+    byte[] bytes = _reusableBytes.get();
+    if (bytes.length < _lengthOfLongestEntry) {
+      _reusableBytes.set(new byte[_lengthOfLongestEntry]);
+      bytes = _reusableBytes.get();
+    }
+    return bytes;
+  }
+
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
     if (_isCompressed) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to