[CARBONDATA-2990] Fixed JVM crash when rebuilding bloom datamap Problem: while rebuilding the datamap it access the datamap store so it builds datamap and store in unsafe onheap storage. But while closing the reader it frees all memory acquired during that task. Since acquired memory is onheap but releasing the memory with offheap allocator it crashes the jvm.
Solution: Maintain the type of memory acquired in the memory block itself and get the allocator as per the memory type and release it. This closes #2793 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fbd4a5f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fbd4a5f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fbd4a5f Branch: refs/heads/branch-1.5 Commit: 8fbd4a5f53070b3755f1f573b09e0066fa93a6ea Parents: c3a8704 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sun Sep 30 11:27:57 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Thu Oct 4 14:39:25 2018 +0530 ---------------------------------------------------------------------- .../core/indexstore/UnsafeMemoryDMStore.java | 14 +++---- .../core/memory/HeapMemoryAllocator.java | 5 ++- .../carbondata/core/memory/MemoryBlock.java | 14 ++++++- .../carbondata/core/memory/MemoryType.java | 23 ++++++++++ .../core/memory/UnsafeMemoryAllocator.java | 2 +- .../core/memory/UnsafeMemoryManager.java | 44 +++++++++++--------- 6 files changed, 70 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 196559a..0db1b0a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -19,9 +19,9 @@ package org.apache.carbondata.core.indexstore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema; -import org.apache.carbondata.core.memory.MemoryAllocator; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.memory.MemoryType; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -51,7 +51,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public UnsafeMemoryDMStore() throws MemoryException { this.allocatedSize = capacity; this.memoryBlock = - UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize); + UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize); this.pointers = new int[1000]; } @@ -74,10 +74,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { private void increaseMemory(int requiredMemory) throws MemoryException { MemoryBlock newMemoryBlock = UnsafeMemoryManager - .allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, allocatedSize + requiredMemory); + .allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, allocatedSize + requiredMemory); getUnsafe().copyMemory(this.memoryBlock.getBaseObject(), this.memoryBlock.getBaseOffset(), newMemoryBlock.getBaseObject(), newMemoryBlock.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, this.memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, this.memoryBlock); allocatedSize = allocatedSize + requiredMemory; this.memoryBlock = newMemoryBlock; } @@ -190,10 +190,10 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public void finishWriting() throws MemoryException { if (runningLength < allocatedSize) { MemoryBlock allocate = - UnsafeMemoryManager.allocateMemoryWithRetry(MemoryAllocator.HEAP, taskId, runningLength); + UnsafeMemoryManager.allocateMemoryWithRetry(MemoryType.ONHEAP, taskId, runningLength); getUnsafe().copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); memoryBlock = allocate; } // Compact pointers. @@ -206,7 +206,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { public void freeMemory() { if (!isMemoryFreed) { - UnsafeMemoryManager.INSTANCE.freeMemory(MemoryAllocator.HEAP, taskId, memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(taskId, memoryBlock); isMemoryFreed = true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java index d08f803..58162da 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/HeapMemoryAllocator.java @@ -71,7 +71,8 @@ public class HeapMemoryAllocator implements MemoryAllocator { final long[] array = arrayReference.get(); if (array != null) { assert (array.length * 8L >= size); - MemoryBlock memory = new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size); + MemoryBlock memory = + new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP); // reuse this MemoryBlock memory.setFreedStatus(false); return memory; @@ -82,7 +83,7 @@ public class HeapMemoryAllocator implements MemoryAllocator { } } long[] array = new long[numWords]; - return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size); + return new MemoryBlock(array, CarbonUnsafe.LONG_ARRAY_OFFSET, size, MemoryType.ONHEAP); } @Override public void free(MemoryBlock memory) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java index 418ef89..87ae982 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryBlock.java @@ -31,12 +31,18 @@ public class MemoryBlock extends MemoryLocation { /** * whether freed or not */ - private boolean isFreed = false; + private boolean isFreed; - public MemoryBlock(@Nullable Object obj, long offset, long length) { + /** + * Whether it is offheap or onheap memory type + */ + private MemoryType memoryType; + + public MemoryBlock(@Nullable Object obj, long offset, long length, MemoryType memoryType) { super(obj, offset); this.length = length; this.isFreed = false; + this.memoryType = memoryType; } /** @@ -53,4 +59,8 @@ public class MemoryBlock extends MemoryLocation { public void setFreedStatus(boolean freedStatus) { this.isFreed = freedStatus; } + + public MemoryType getMemoryType() { + return memoryType; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java b/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java new file mode 100644 index 0000000..63e20d6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/memory/MemoryType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.memory; + +public enum MemoryType { + + OFFHEAP, ONHEAP; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java index 67412ac..e596895 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryAllocator.java @@ -28,7 +28,7 @@ public class UnsafeMemoryAllocator implements MemoryAllocator { long address = CarbonUnsafe.getUnsafe().allocateMemory(size); // initializing memory with zero CarbonUnsafe.getUnsafe().setMemory(null, address, size, (byte) 0); - return new MemoryBlock(null, address, size); + return new MemoryBlock(null, address, size, MemoryType.OFFHEAP); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fbd4a5f/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 703d57a..4efea1a 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -68,9 +68,9 @@ public class UnsafeMemoryManager { LOGGER.info("Invalid memory size value: " + defaultWorkingMemorySize); } long takenSize = size; - MemoryAllocator allocator; + MemoryType memoryType; if (offHeap) { - allocator = MemoryAllocator.UNSAFE; + memoryType = MemoryType.OFFHEAP; long defaultSize = Long.parseLong(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); if (takenSize < defaultSize) { takenSize = defaultSize; @@ -86,9 +86,9 @@ public class UnsafeMemoryManager { takenSize = maxMemory; } } - allocator = MemoryAllocator.HEAP; + memoryType = MemoryType.ONHEAP; } - INSTANCE = new UnsafeMemoryManager(takenSize, allocator); + INSTANCE = new UnsafeMemoryManager(takenSize, memoryType); taskIdToMemoryBlockMap = new HashMap<>(); } @@ -98,19 +98,19 @@ public class UnsafeMemoryManager { private long memoryUsed; - private MemoryAllocator allocator; + private MemoryType memoryType; - private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) { + private UnsafeMemoryManager(long totalMemory, MemoryType memoryType) { this.totalMemory = totalMemory; - this.allocator = allocator; + this.memoryType = memoryType; LOGGER - .info("Working Memory manager is created with size " + totalMemory + " with " + allocator); + .info("Working Memory manager is created with size " + totalMemory + " with " + memoryType); } - private synchronized MemoryBlock allocateMemory(MemoryAllocator memoryAllocator, long taskId, + private synchronized MemoryBlock allocateMemory(MemoryType memoryType, long taskId, long memoryRequested) { if (memoryUsed + memoryRequested <= totalMemory) { - MemoryBlock allocate = memoryAllocator.allocate(memoryRequested); + MemoryBlock allocate = getMemoryAllocator(memoryType).allocate(memoryRequested); memoryUsed += allocate.size(); Set<MemoryBlock> listOfMemoryBlock = taskIdToMemoryBlockMap.get(taskId); if (null == listOfMemoryBlock) { @@ -129,16 +129,11 @@ public class UnsafeMemoryManager { } public synchronized void freeMemory(long taskId, MemoryBlock memoryBlock) { - freeMemory(allocator, taskId, memoryBlock); - } - - public synchronized void freeMemory(MemoryAllocator memoryAllocator, long taskId, - MemoryBlock memoryBlock) { if (taskIdToMemoryBlockMap.containsKey(taskId)) { taskIdToMemoryBlockMap.get(taskId).remove(memoryBlock); } if (!memoryBlock.isFreedStatus()) { - memoryAllocator.free(memoryBlock); + getMemoryAllocator(memoryBlock.getMemoryType()).free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; if (LOGGER.isDebugEnabled()) { @@ -160,7 +155,7 @@ public class UnsafeMemoryManager { memoryBlock = iterator.next(); if (!memoryBlock.isFreedStatus()) { occuppiedMemory += memoryBlock.size(); - allocator.free(memoryBlock); + getMemoryAllocator(memoryBlock.getMemoryType()).free(memoryBlock); } } } @@ -188,15 +183,15 @@ public class UnsafeMemoryManager { */ public static MemoryBlock allocateMemoryWithRetry(long taskId, long size) throws MemoryException { - return allocateMemoryWithRetry(INSTANCE.allocator, taskId, size); + return allocateMemoryWithRetry(INSTANCE.memoryType, taskId, size); } - public static MemoryBlock allocateMemoryWithRetry(MemoryAllocator memoryAllocator, long taskId, + public static MemoryBlock allocateMemoryWithRetry(MemoryType memoryType, long taskId, long size) throws MemoryException { MemoryBlock baseBlock = null; int tries = 0; while (tries < 300) { - baseBlock = INSTANCE.allocateMemory(memoryAllocator, taskId, size); + baseBlock = INSTANCE.allocateMemory(memoryType, taskId, size); if (baseBlock == null) { try { LOGGER.info("Memory is not available, retry after 500 millis"); @@ -217,6 +212,15 @@ public class UnsafeMemoryManager { return baseBlock; } + private MemoryAllocator getMemoryAllocator(MemoryType memoryType) { + switch (memoryType) { + case ONHEAP: + return MemoryAllocator.HEAP; + default: + return MemoryAllocator.UNSAFE; + } + } + public static boolean isOffHeap() { return offHeap; }