This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 506892d953 Make thread local variable static to avoid the memory leaking issue (#12242) 506892d953 is described below commit 506892d953a2d5fce754412d1d1167c93f75081f Author: Xuanyi Li <xuany...@uber.com> AuthorDate: Thu Feb 8 08:43:18 2024 -0800 Make thread local variable static to avoid the memory leaking issue (#12242) --- .../readers/forward/VarByteChunkSVForwardIndexReader.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java index 9c9f9ce5f1..585d48d7ea 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java @@ -41,7 +41,7 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkForwardInde private final int _maxChunkSize; // Thread local (reusable) byte[] to read bytes from data file. - private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]); + private static ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[0]); public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { super(dataBuffer, valueType, true); @@ -84,7 +84,7 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkForwardInde int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); int length = valueEndOffset - valueStartOffset; - byte[] bytes = _reusableBytes.get(); + byte[] bytes = getOrExpandByteArray(); chunkBuffer.position(valueStartOffset); chunkBuffer.get(bytes, 0, length); return new String(bytes, 0, length, UTF_8); @@ -103,11 +103,20 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkForwardInde long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); int length = (int) (valueEndOffset - valueStartOffset); - byte[] bytes = _reusableBytes.get(); + byte[] bytes = getOrExpandByteArray(); _dataBuffer.copyTo(valueStartOffset, bytes, 0, length); return new String(bytes, 0, length, UTF_8); } + private byte[] getOrExpandByteArray() { + byte[] bytes = _reusableBytes.get(); + if (bytes.length < _lengthOfLongestEntry) { + _reusableBytes.set(new byte[_lengthOfLongestEntry]); + bytes = _reusableBytes.get(); + } + return bytes; + } + @Override public byte[] getBytes(int docId, ChunkReaderContext context) { if (_isCompressed) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org