[HOTFIX] Fix NPE in LRU cache when entry from the same table is getting evicted to load another entry from same table
Problem When driver LRU cache size is configured to a small value then on running concurrent queries sometimes while loading the block dataMap in LRU cache one of the dataMap entries from the same table is getting deleted because of shortage of space. Due to this in the flow after loading the dataMap cache NPE is thrown. This is because when an cacheable entry is removed from LRU cache then invalidate is called on that cacheable entry to clear the unsafe memory used by that entry. Invalidate method makes the references null and clears the unsafe memory which leads to NPE when accessed again. Solution Currently dataMap cache uses unsafe offheap memory for datamap caching. To avoid this the code is modified to use unsafe with onheap so that JVM itself takes care of clearing the memory when required. We do not require to explicitly set the references to null. This closes #2759 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2a4f5300 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2a4f5300 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2a4f5300 Branch: refs/heads/branch-1.5 Commit: 2a4f53001058346843e0248c60fee2943087efc9 Parents: 5c0da31 Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Tue Sep 25 19:21:08 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Sep 27 12:08:28 2018 +0530 ---------------------------------------------------------------------- .../indexstore/BlockletDataMapIndexWrapper.java | 1 - .../core/indexstore/SafeMemoryDMStore.java | 1 - .../core/indexstore/UnsafeMemoryDMStore.java | 16 +++++++++------- .../indexstore/blockletindex/BlockDataMap.java | 2 -- .../core/memory/HeapMemoryAllocator.java | 8 ++++++-- .../core/memory/UnsafeMemoryManager.java | 19 +++++++++++++++---- .../util/AbstractDataFileFooterConverter.java | 4 ++-- 7 files changed, 32 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java index 7b8a13b..33d69aa 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexWrapper.java @@ -70,7 +70,6 @@ public class BlockletDataMapIndexWrapper implements Cacheable, Serializable { for (DataMap dataMap : dataMaps) { dataMap.clear(); } - dataMaps = null; } public List<BlockDataMap> getDataMaps() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java index 0b3d4d8..042790f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SafeMemoryDMStore.java @@ -62,7 +62,6 @@ public class SafeMemoryDMStore extends AbstractMemoryDMStore { if (!isMemoryFreed) { if (null != dataMapRows) { dataMapRows.clear(); - dataMapRows = null; } isMemoryFreed = true; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 3e8ce12..196559a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; +import org.apache.carbondata.core.memory.MemoryAllocator; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; @@ -49,7 +50,8 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public UnsafeMemoryDMStore() throws MemoryException { this.allocatedSize = capacity; - this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize); + this.memoryBlock = + UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize); this.pointers = new int[1000]; } @@ -71,11 +73,11 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { } private void increaseMemory(int requiredMemory) throws MemoryException { - MemoryBlock newMemoryBlock = - UnsafeMemoryManager.allocateMemoryWithRetry(taskId, allocatedSize + requiredMemory); + MemoryBlock newMemoryBlock = UnsafeMemoryManager + .allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize + requiredMemory); getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(), newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, this.memoryBlock); allocatedSize = allocatedSize + requiredMemory; this.memoryBlock = newMemoryBlock; } @@ -188,10 +190,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public void finishWriting() throws MemoryException { if (runningLength < allocatedSize) { MemoryBlock allocate = - UnsafeMemoryManager.allocateMemoryWithRetry(taskId, runningLength); + UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, runningLength); getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock); memoryBlock = allocate; } // Compact pointers. @@ -204,7 +206,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public void freeMemory() { if (!isMemoryFreed) { - UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock); isMemoryFreed = true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 0cf9914..d7b7977 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -943,12 +943,10 @@ public class BlockDataMap extends CoarseGrainDataMap @Override public void clear() { if (memoryDMStore != null) { memoryDMStore.freeMemory(); - memoryDMStore = null; } // clear task min/max unsafe memory if (null != taskSummaryDMStore) { taskSummaryDMStore.freeMemory(); - taskSummaryDMStore = null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java index d898dc9..d08f803 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.Map; import javax.annotation.concurrent.GuardedBy; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; /** @@ -39,8 +40,11 @@ public class HeapMemoryAllocator implements MemoryAllocator { public HeapMemoryAllocator() { poolingThresholdBytes = CarbonProperties.getInstance().getHeapMemoryPoolingThresholdBytes(); - // if set 'poolingThresholdBytes' to -1, it should not go through the pooling mechanism. - if (poolingThresholdBytes == -1) { + boolean isDriver = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "false")); + // if set 'poolingThresholdBytes' to -1 or the object creation call is in driver, + // it should not go through the pooling mechanism. + if (poolingThresholdBytes == -1 || isDriver) { shouldPooling = false; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index e3593c5..703d57a 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -107,9 +107,10 @@ public class UnsafeMemoryManager { .info("Working Memory manager is created with size " + totalMemory + " with " + allocator); } - private synchronized MemoryBlock allocateMemory(long taskId, long memoryRequested) { + private synchronized MemoryBlock allocateMemory(MemoryAllocator memoryAllocator, long taskId, + long memoryRequested) { if (memoryUsed + memoryRequested <= totalMemory) { - MemoryBlock allocate = allocator.allocate(memoryRequested); + MemoryBlock allocate = memoryAllocator.allocate(memoryRequested); memoryUsed += allocate.size(); Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); if (null == listOfMemoryBlock) { @@ -128,11 +129,16 @@ public class UnsafeMemoryManager { } public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { + freeMemory(allocator, taskId, memoryBlock); + } + + public synchronized void freeMemory(MemoryAllocator memoryAllocator, long taskId, + MemoryBlock memoryBlock) { if (taskIdToMemoryBlockMap.containsKey(taskId)) { taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); } if (!memoryBlock.isFreedStatus()) { - allocator.free(memoryBlock); + memoryAllocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { @@ -182,10 +188,15 @@ public class UnsafeMemoryManager { */ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException { + return allocateMemoryWithRetry(INSTANCE.allocator, taskId, size); + } + + public static MemoryBlock allocateMemoryWithRetry(MemoryAllocator memoryAllocator, long taskId, + long size) throws MemoryException { MemoryBlock baseBlock = null; int tries = 0; while (tries < 300) { - baseBlock = INSTANCE.allocateMemory(taskId, size); + baseBlock = INSTANCE.allocateMemory(memoryAllocator, taskId, size); if (baseBlock == null) { try { LOGGER.info("Memory is not available, retry after 500 millis"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/2a4f5300/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java index b1dd580..601ce50 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java +++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java @@ -295,8 +295,8 @@ public abstract class AbstractDataFileFooterConverter { blockletMinMaxFlag = blockletIndexList.get(i).getMinMaxIndex().getIsMinMaxSet(); for (int j = 0; j < maxValue.length; j++) { // can be null for stores < 1.5.0 version - if (null != blockletMinMaxFlag && !blockletMinMaxFlag[i]) { - blockMinMaxFlag[i] = blockletMinMaxFlag[i]; + if (null != blockletMinMaxFlag && !blockletMinMaxFlag[j]) { + blockMinMaxFlag[j] = blockletMinMaxFlag[j]; currentMaxValue[j] = new byte[0]; currentMinValue[j] = new byte[0]; continue;