This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_mem_control in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ea10e9e1092f67d17f1319047b5541d860013191 Author: 江天 <[email protected]> AuthorDate: Fri Apr 12 14:54:34 2019 +0800 add java doc enhance concurrency refine some naming add more specific logs --- .../engine/bufferwrite/BufferWriteProcessor.java | 9 +++- .../db/engine/memcontrol/BasicMemController.java | 24 ++++++++- .../db/engine/memcontrol/JVMMemController.java | 4 +- .../db/engine/memcontrol/RecordMemController.java | 58 ++++++++++------------ .../db/engine/overflow/io/OverflowProcessor.java | 49 +++++++++++++----- .../db/engine/memcontrol/MemControllerTest.java | 18 +++---- 6 files changed, 102 insertions(+), 60 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java index e09feec..d819c06 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/bufferwrite/BufferWriteProcessor.java @@ -170,7 +170,7 @@ public class BufferWriteProcessor extends Processor { public boolean write(TSRecord tsRecord) throws BufferWriteProcessorException { long memUsage = MemUtils.getRecordSize(tsRecord); BasicMemController.UsageLevel level = BasicMemController.getInstance() - .reportUse(this, memUsage); + .acquireUsage(this, memUsage); for (DataPoint dataPoint : tsRecord.dataPointList) { workMemTable.write(tsRecord.deviceId, dataPoint.getMeasurementId(), dataPoint.getType(), tsRecord.time, @@ -355,7 +355,7 @@ public class BufferWriteProcessor extends Processor { valueCount = 0; switchWorkToFlush(); long version = versionController.nextVersion(); - BasicMemController.getInstance().reportFree(this, memSize.get()); + BasicMemController.getInstance().releaseUsage(this, memSize.get()); memSize.set(0); // switch flushFuture = FlushManager.getInstance().submit(() -> flushTask("asynchronously", @@ -559,4 +559,9 @@ public class BufferWriteProcessor extends Processor { public int hashCode() { return Objects.hash(super.hashCode(), baseDir, fileName); } + + @Override + public String toString() { + return "BufferWriteProcessor in " + insertFilePath; + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java index fe6753a..503afa3 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/BasicMemController.java @@ -150,9 +150,29 @@ public abstract class BasicMemController implements IService { logger.info("MemController exited"); } - public abstract UsageLevel reportUse(Object user, long usage); + /** + * Any object (like OverflowProcessor or BufferWriteProcessor) that wants to hold some fixed size + * of memory should call this method to check the returned memory usage level to decide any + * further actions. + * @param user an object that wants some memory as a buffer or anything. + * @param usage how many bytes does the object want. + * @return one of the three UsageLevels: + * safe - there are still sufficient memories left, the user may go on freely and this + * usage is recorded. + * warning - there is only a small amount of memories available, the user would better + * try to reduce memory usage but can still proceed and this usage is recorded. + * dangerous - there is almost no memories unused, the user cannot proceed before enough + * memory usages are released and this usage is NOT recorded. + */ + public abstract UsageLevel acquireUsage(Object user, long usage); - public abstract void reportFree(Object user, long freeSize); + /** + * When the memories held by one object (like OverflowProcessor or BufferWriteProcessor) is no + * more useful, this object should call this method to release the memories. + * @param user an object that holds some memory as a buffer or anything. + * @param freeSize how many bytes does the object want to release. + */ + public abstract void releaseUsage(Object user, long freeSize); public enum ControllerType { RECORD, JVM diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java index 34a92a4..31a3ef8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/JVMMemController.java @@ -61,7 +61,7 @@ public class JVMMemController extends BasicMemController { } @Override - public UsageLevel reportUse(Object user, long usage) { + public UsageLevel acquireUsage(Object user, long usage) { long memUsage = getTotalUsage() + usage; if (memUsage < warningThreshold) { return UsageLevel.SAFE; @@ -82,7 +82,7 @@ public class JVMMemController extends BasicMemController { } @Override - public void reportFree(Object user, long freeSize) { + public void releaseUsage(Object user, long freeSize) { if (LOGGER.isInfoEnabled()) { LOGGER.info("{} freed from {}, total usage {}", MemUtils.bytesCntToStr(freeSize), user.getClass(), MemUtils.bytesCntToStr(getTotalUsage())); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java index 2ffcc97..e96e280 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memcontrol/RecordMemController.java @@ -18,10 +18,9 @@ */ package org.apache.iotdb.db.engine.memcontrol; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; - import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.utils.MemUtils; @@ -33,15 +32,15 @@ import org.slf4j.LoggerFactory; */ public class RecordMemController extends BasicMemController { - private static Logger LOGGER = LoggerFactory.getLogger(RecordMemController.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RecordMemController.class); // the key is the reference of the memory user, while the value is its memory usage in byte - private Map<Object, Long> memMap; + private Map<Object, AtomicLong> memMap; private AtomicLong totalMemUsed; private RecordMemController(IoTDBConfig config) { super(config); - memMap = new HashMap<>(); + memMap = new ConcurrentHashMap<>(); totalMemUsed = new AtomicLong(0); } @@ -60,11 +59,6 @@ public class RecordMemController extends BasicMemController { totalMemUsed.set(0); } - @Override - public void close() { - super.close(); - } - /** * get the current memory usage level. */ @@ -84,11 +78,10 @@ public class RecordMemController extends BasicMemController { * report the increased memory usage of the object user. */ @Override - public UsageLevel reportUse(Object user, long usage) { - Long oldUsage = memMap.get(user); - if (oldUsage == null) { - oldUsage = 0L; - } + public UsageLevel acquireUsage(Object user, long usage) { + AtomicLong userUsage = memMap.computeIfAbsent(user, k -> new AtomicLong(0)); + long oldUsage = userUsage.get(); + long newTotUsage = totalMemUsed.get() + usage; // check if the new usage will reach dangerous threshold if (newTotUsage > dangerouseThreshold) { @@ -101,12 +94,12 @@ public class RecordMemController extends BasicMemController { // double check if updating will reach dangerous threshold if (newTotUsage < warningThreshold) { // still safe, action taken - memMap.put(user, oldUsage + usage); + userUsage.addAndGet(usage); logSafe(newTotUsage, user, usage, oldUsage); return UsageLevel.SAFE; } else if (newTotUsage < dangerouseThreshold) { // become warning because competition with other threads, still take the action - memMap.put(user, oldUsage + usage); + userUsage.addAndGet(usage); logWarn(newTotUsage, user, usage, oldUsage); return UsageLevel.WARNING; } else { @@ -121,7 +114,8 @@ public class RecordMemController extends BasicMemController { private void logDangerous(long newTotUsage, Object user) { if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Memory request from {} is denied, memory usage : {}", user.getClass(), + LOGGER.warn("Memory request from {} is denied, memory usage : {}", + user, MemUtils.bytesCntToStr(newTotUsage)); } } @@ -129,7 +123,7 @@ public class RecordMemController extends BasicMemController { private void logSafe(long newTotUsage, Object user, long usage, long oldUsage) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Safe Threshold : {} allocated to {}, it is using {}, total usage {}", - MemUtils.bytesCntToStr(usage), user.getClass(), + MemUtils.bytesCntToStr(usage), user, MemUtils.bytesCntToStr(oldUsage + usage), MemUtils.bytesCntToStr(newTotUsage)); } @@ -138,7 +132,7 @@ public class RecordMemController extends BasicMemController { private void logWarn(long newTotUsage, Object user, long usage, long oldUsage) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Warning Threshold : {} allocated to {}, it is using {}, total usage {}", - MemUtils.bytesCntToStr(usage), user.getClass(), + MemUtils.bytesCntToStr(usage), user, MemUtils.bytesCntToStr(oldUsage + usage), MemUtils.bytesCntToStr(newTotUsage)); } @@ -148,26 +142,24 @@ public class RecordMemController extends BasicMemController { * report the decreased memory usage of the object user. */ @Override - public void reportFree(Object user, long freeSize) { - Long usage = memMap.get(user); + public void releaseUsage(Object user, long freeSize) { + AtomicLong usage = memMap.get(user); + long usageLong = 0; if (usage == null) { - LOGGER.error("Unregistered memory usage from {}", user.getClass()); - } else if (freeSize > usage) { + LOGGER.error("Unregistered memory usage from {}", user); + } else if (freeSize > usageLong) { + usageLong = usage.get(); LOGGER - .error("Request to free {} bytes while it only registered {} bytes", freeSize, usage); - totalMemUsed.addAndGet(-usage); - memMap.remove(user); + .error("{} requests to free {} bytes while it only registered {} bytes", user, + freeSize, usage); + totalMemUsed.addAndGet(-usageLong); } else { long newTotalMemUsage = totalMemUsed.addAndGet(-freeSize); - if (usage - freeSize > 0) { - memMap.put(user, usage - freeSize); - } else { - memMap.remove(user); - } + usage.addAndGet(-freeSize); if (LOGGER.isInfoEnabled()) { LOGGER.info("{} freed from {}, it is using {}, total usage {}", MemUtils.bytesCntToStr(freeSize), - user.getClass(), MemUtils.bytesCntToStr(usage - freeSize), + user, MemUtils.bytesCntToStr(usage.get()), MemUtils.bytesCntToStr(newTotalMemUsage)); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java index 0debfe4..6a1cfe8 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.engine.bufferwrite.Action; import org.apache.iotdb.db.engine.bufferwrite.FileNodeConstants; import org.apache.iotdb.db.engine.filenode.FileNodeManager; import org.apache.iotdb.db.engine.memcontrol.BasicMemController; +import org.apache.iotdb.db.engine.memcontrol.BasicMemController.UsageLevel; import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.pool.FlushManager; @@ -174,18 +175,37 @@ public class OverflowProcessor extends Processor { public void insert(TSRecord tsRecord) throws IOException { // memory control long memUage = MemUtils.getRecordSize(tsRecord); - BasicMemController.getInstance().reportUse(this, memUage); - // write data - workSupport.insert(tsRecord); - valueCount++; - // check flush - memUage = memSize.addAndGet(memUage); - if (memUage > memThreshold) { - LOGGER.warn("The usage of memory {} in overflow processor {} reaches the threshold {}", - MemUtils.bytesCntToStr(memUage), getProcessorName(), - MemUtils.bytesCntToStr(memThreshold)); - flush(); + UsageLevel usageLevel = BasicMemController.getInstance().acquireUsage(this, memUage); + switch (usageLevel) { + case SAFE: + // write data + workSupport.insert(tsRecord); + valueCount++; + // check flush + memUage = memSize.addAndGet(memUage); + if (memUage > memThreshold) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("The usage of memory {} in overflow processor {} reaches the threshold {}", + MemUtils.bytesCntToStr(memUage), getProcessorName(), + MemUtils.bytesCntToStr(memThreshold)); + } + flush(); + } + break; + case WARNING: + // write data + workSupport.insert(tsRecord); + valueCount++; + // flush + memSize.addAndGet(memUage); + flush(); + break; + case DANGEROUS: + throw new IOException("The insertion is rejected because dangerous memory level hit"); } + + + } /** @@ -514,7 +534,7 @@ public class OverflowProcessor extends Processor { getProcessorName(), e); } } - BasicMemController.getInstance().reportFree(this, memSize.get()); + BasicMemController.getInstance().releaseUsage(this, memSize.get()); memSize.set(0); valueCount = 0; // switch from work to flush @@ -684,4 +704,9 @@ public class OverflowProcessor extends Processor { public long getLastFlushTime() { return lastFlushTime; } + + @Override + public String toString() { + return "OverflowProcessor in " + parentPath; + } } \ No newline at end of file diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java index f9ac4ce..a9d48fa 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/memcontrol/MemControllerTest.java @@ -52,41 +52,41 @@ public class MemControllerTest { // every one request 1 GB, should get 7 safes, 8 warning and 5 dangerous for (int i = 0; i < 7; i++) { - BasicMemController.UsageLevel level = memController.reportUse(dummyUser[i], 1 * GB); + BasicMemController.UsageLevel level = memController.acquireUsage(dummyUser[i], 1 * GB); assertEquals(BasicMemController.UsageLevel.SAFE, level); } for (int i = 7; i < 15; i++) { - BasicMemController.UsageLevel level = memController.reportUse(dummyUser[i], 1 * GB); + BasicMemController.UsageLevel level = memController.acquireUsage(dummyUser[i], 1 * GB); assertEquals(BasicMemController.UsageLevel.WARNING, level); } for (int i = 15; i < 20; i++) { - BasicMemController.UsageLevel level = memController.reportUse(dummyUser[i], 1 * GB); + BasicMemController.UsageLevel level = memController.acquireUsage(dummyUser[i], 1 * GB); assertEquals(BasicMemController.UsageLevel.DANGEROUS, level); } assertEquals(15 * GB, memController.getTotalUsage()); // every one free its mem for (int i = 0; i < 7; i++) { - memController.reportFree(dummyUser[i], 1 * GB); + memController.releaseUsage(dummyUser[i], 1 * GB); assertEquals((14 - i) * GB, memController.getTotalUsage()); } for (int i = 7; i < 15; i++) { - memController.reportFree(dummyUser[i], 2 * GB); + memController.releaseUsage(dummyUser[i], 2 * GB); assertEquals((14 - i) * GB, memController.getTotalUsage()); } // ask for a too big mem - BasicMemController.UsageLevel level = memController.reportUse(dummyUser[0], 100 * GB); + BasicMemController.UsageLevel level = memController.acquireUsage(dummyUser[0], 100 * GB); assertEquals(BasicMemController.UsageLevel.DANGEROUS, level); // single user ask continuously for (int i = 0; i < 8 * 1024 - 1; i++) { - level = memController.reportUse(dummyUser[0], 1 * MB); + level = memController.acquireUsage(dummyUser[0], 1 * MB); assertEquals(BasicMemController.UsageLevel.SAFE, level); } for (int i = 8 * 1024 - 1; i < 16 * 1024 - 1; i++) { - level = memController.reportUse(dummyUser[0], 1 * MB); + level = memController.acquireUsage(dummyUser[0], 1 * MB); assertEquals(BasicMemController.UsageLevel.WARNING, level); } for (int i = 16 * 1024 - 1; i < 17 * 1024; i++) { - level = memController.reportUse(dummyUser[0], 1 * MB); + level = memController.acquireUsage(dummyUser[0], 1 * MB); System.out.println( memController.getTotalUsage() / GB + " " + memController.getTotalUsage() / MB % 1024); assertEquals(BasicMemController.UsageLevel.DANGEROUS, level);
