This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch memtable_sort_in_query in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 56bc73cae3e4b0160c8ae3ed54cc2b0af7b4f3c6 Author: 151250176 <[email protected]> AuthorDate: Tue Nov 17 16:36:40 2020 +0800 improve tv list --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 2 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 55 +++++---- .../db/engine/memtable/IWritableMemChunk.java | 26 +++- .../iotdb/db/engine/memtable/WritableMemChunk.java | 32 ++++- .../db/engine/querycontext/ReadOnlyMemChunk.java | 21 +++- .../iotdb/db/utils/datastructure/TVList.java | 132 ++++++++++++--------- .../db/engine/memtable/PrimitiveMemTableTest.java | 2 +- 7 files changed, 171 insertions(+), 99 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index e9f4c92..98a79ce 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -89,7 +89,7 @@ public class MemTableFlushTask { long startTime = System.currentTimeMillis(); IWritableMemChunk series = memTable.getMemTableMap().get(deviceId).get(measurementId); MeasurementSchema desc = series.getSchema(); - TVList tvList = series.getSortedTVList(); + TVList tvList = series.getSortedTVListForFlush(); sortTime += System.currentTimeMillis() - startTime; encodingTaskQueue.add(new Pair<>(tvList, desc)); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 5960d43..026b343 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -46,31 +46,23 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public abstract class AbstractMemTable implements IMemTable { private final Map<String, Map<String, IWritableMemChunk>> memTableMap; - + /** + * The initial value is true because we want calculate the text data size when recover memTable!! + */ + protected boolean disableMemControl = true; private long version = Long.MAX_VALUE; - private List<Modification> modifications = new ArrayList<>(); - private int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig() .getAvgSeriesPointNumberThreshold(); - /** * memory size of data points, including TEXT values */ private long memSize = 0; - /** * memory usage of all TVLists memory usage regardless of whether these TVLists are full, * including TEXT values */ private long tvListRamCost = 0; - - /** - * The initial value is true because we want calculate the text data size when recover - * memTable!! - */ - protected boolean disableMemControl = true; - private int seriesNumber = 0; private long totalPointsNum = 0; @@ -129,14 +121,16 @@ public abstract class AbstractMemTable implements IMemTable { } Object value = insertRowPlan.getValues()[i]; - memSize += MemUtils.getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value, - disableMemControl); + memSize += MemUtils + .getRecordSize(insertRowPlan.getMeasurementMNodes()[i].getSchema().getType(), value, + disableMemControl); write(insertRowPlan.getDeviceId().getFullPath(), insertRowPlan.getMeasurements()[i], insertRowPlan.getMeasurementMNodes()[i].getSchema(), insertRowPlan.getTime(), value); } - totalPointsNum += insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); + totalPointsNum += + insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); } @Override @@ -146,8 +140,9 @@ public abstract class AbstractMemTable implements IMemTable { try { write(insertTabletPlan, start, end); memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl); - totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber()) - * (end - start); + totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan + .getFailedMeasurementNumber()) + * (end - start); } catch (RuntimeException e) { throw new WriteProcessException(e); } @@ -168,8 +163,10 @@ public abstract class AbstractMemTable implements IMemTable { if (insertTabletPlan.getColumns()[i] == null) { continue; } - IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId().getFullPath(), - insertTabletPlan.getMeasurements()[i], insertTabletPlan.getMeasurementMNodes()[i].getSchema()); + IWritableMemChunk memSeries = createIfNotExistAndGet( + insertTabletPlan.getDeviceId().getFullPath(), + insertTabletPlan.getMeasurements()[i], + insertTabletPlan.getMeasurementMNodes()[i].getSchema()); memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i], insertTabletPlan.getDataTypes()[i], start, end); } @@ -248,11 +245,20 @@ public abstract class AbstractMemTable implements IMemTable { return null; } List<TimeRange> deletionList = constructDeletionList(deviceId, measurement, timeLowerBound); - IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement); - TVList chunkCopy = memChunk.getTVList().clone(); - chunkCopy.setDeletionList(deletionList); - return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion()); + TVList chunkCopy = null; + int curSize = 0; + // synchronize memtable map to get and sort + // when next query come, it will find the data has been sorted and get reference of the data + synchronized (memTableMap) { + IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement); + chunkCopy = memChunk.getSortedTVListForQuery(); + chunkCopy.increaseReferenceCount(); + curSize = chunkCopy.size(); + } + + return new ReadOnlyMemChunk(measurement, dataType, encoding, chunkCopy, props, getVersion(), + curSize, deletionList); } private List<TimeRange> constructDeletionList(String deviceId, String measurement, @@ -273,7 +279,8 @@ public abstract class AbstractMemTable implements IMemTable { } @Override - public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) { + public void delete(PartialPath originalPath, PartialPath devicePath, long startTimestamp, + long endTimestamp) { Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath()); if (deviceMap == null) { return; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index 9dc19fd..f733864 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -63,12 +63,30 @@ public interface IWritableMemChunk { /** * served for query requests. + * <p> + * if tv list has been sorted, just return reference of it + * <p> + * if tv list hasn't been sorted and has no reference, sort and return reference of it + * <p> + * if tv list hasn't been sorted and has reference we should copy and sort it, then return ths + * list + * <p> + * the mechanism is just like copy on write * - * @return + * @return sorted tv list */ - default TVList getSortedTVList() { - return null; - } + TVList getSortedTVListForQuery(); + + /** + * served for flush requests. + * <p> + * if tv list has reference, copy it. Then sort it + * <p> + * the mechanism is just like copy on write + * + * @return sorted tv list + */ + TVList getSortedTVListForFlush(); default TVList getTVList() { return null; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 3d981d0..7bc76ca 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -153,8 +153,30 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public synchronized TVList getSortedTVList() { - list.sort(); + public synchronized TVList getSortedTVListForQuery() { + // check reference count + if (list.getReferenceCount() > 0 && !list.isSorted()) { + list = list.clone(); + } + + if (!list.isSorted()) { + list.sort(); + } + + return list; + } + + @Override + public TVList getSortedTVListForFlush() { + // check reference count + if (list.getReferenceCount() > 0) { + list = list.clone(); + } + + if (!list.isSorted()) { + list.sort(); + } + return list; } @@ -185,13 +207,13 @@ public class WritableMemChunk implements IWritableMemChunk { @Override public String toString() { - int size = getSortedTVList().size(); + int size = getSortedTVListForQuery().size(); StringBuilder out = new StringBuilder("MemChunk Size: " + size + System.lineSeparator()); if (size != 0) { out.append("Data type:").append(schema.getType()).append(System.lineSeparator()); - out.append("First point:").append(getSortedTVList().getTimeValuePair(0)) + out.append("First point:").append(getSortedTVListForQuery().getTimeValuePair(0)) .append(System.lineSeparator()); - out.append("Last point:").append(getSortedTVList().getTimeValuePair(size - 1)) + out.append("Last point:").append(getSortedTVListForQuery().getTimeValuePair(size - 1)) .append(System.lineSeparator()); ; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java index 5be7104..d3f2c20 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.querycontext; import java.io.IOException; +import java.util.List; import java.util.Map; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader; @@ -30,10 +31,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.reader.IPointReader; public class ReadOnlyMemChunk { + // deletion list for this chunk + private final List<TimeRange> deletionList; + private String measurementUid; private TSDataType dataType; private TSEncoding encoding; @@ -48,8 +53,11 @@ public class ReadOnlyMemChunk { private IPointReader chunkPointReader; + private int chunkDataSize; + public ReadOnlyMemChunk(String measurementUid, TSDataType dataType, TSEncoding encoding, - TVList tvList, Map<String, String> props, long version) + TVList tvList, Map<String, String> props, long version, int size, + List<TimeRange> deletionList) throws IOException, QueryProcessException { this.measurementUid = measurementUid; this.dataType = dataType; @@ -58,9 +66,12 @@ public class ReadOnlyMemChunk { if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) { this.floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER)); } - tvList.sort(); + this.chunkData = tvList; - this.chunkPointReader = tvList.getIterator(floatPrecision, encoding); + this.chunkDataSize = size; + this.deletionList = deletionList; + + this.chunkPointReader = tvList.getIterator(floatPrecision, encoding, chunkDataSize, deletionList); initChunkMeta(); } @@ -68,7 +79,7 @@ public class ReadOnlyMemChunk { Statistics statsByType = Statistics.getStatsByType(dataType); ChunkMetadata metaData = new ChunkMetadata(measurementUid, dataType, 0, statsByType); if (!isEmpty()) { - IPointReader iterator = chunkData.getIterator(floatPrecision, encoding); + IPointReader iterator = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList); while (iterator.hasNextTimeValuePair()) { TimeValuePair timeValuePair = iterator.nextTimeValuePair(); switch (dataType) { @@ -114,7 +125,7 @@ public class ReadOnlyMemChunk { } public IPointReader getPointReader() { - chunkPointReader = chunkData.getIterator(floatPrecision, encoding); + chunkPointReader = chunkData.getIterator(floatPrecision, encoding, chunkDataSize, deletionList); return chunkPointReader; } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 13b4766..5d98727 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iotdb.db.rescon.PrimitiveArrayManager; +import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -35,31 +37,69 @@ import org.apache.iotdb.tsfile.utils.Binary; public abstract class TVList { - private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent"; - protected static final int SMALL_ARRAY_LENGTH = 32; - + private static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent"; protected List<long[]> timestamps; protected int size; protected long[][] sortedTimestamps; protected boolean sorted = true; - - /** - * this field is effective only in the Tvlist in a RealOnlyMemChunk. - */ - private List<TimeRange> deletionList; - private long version; - + // record reference count of this tv list + // currently this reference will only be increase because we can't know when to decrease it + protected AtomicInteger referenceCount; protected long pivotTime; - protected long minTime; + private long version; public TVList() { timestamps = new ArrayList<>(); size = 0; minTime = Long.MAX_VALUE; + referenceCount = new AtomicInteger(); + } + + public static TVList newList(TSDataType dataType) { + switch (dataType) { + case TEXT: + return new BinaryTVList(); + case FLOAT: + return new FloatTVList(); + case INT32: + return new IntTVList(); + case INT64: + return new LongTVList(); + case DOUBLE: + return new DoubleTVList(); + case BOOLEAN: + return new BooleanTVList(); + default: + break; + } + return null; + } + + public static long tvListArrayMemSize(TSDataType type) { + long size = 0; + // time size + size += + PrimitiveArrayManager.ARRAY_SIZE * 8; + // value size + size += + PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize(); + return size; + } + + public boolean isSorted() { + return sorted; + } + + public void increaseReferenceCount() { + referenceCount.incrementAndGet(); + } + + public int getReferenceCount() { + return referenceCount.get(); } public int size() { @@ -222,9 +262,6 @@ public abstract class TVList { clearValue(); clearSortedValue(); - if (deletionList != null) { - deletionList.clear(); - } } protected void clearTime() { @@ -245,8 +282,8 @@ public abstract class TVList { abstract void clearValue(); /** - * The arrays for sorting are not including in write memory now, - * the memory usage is considered as temporary memory. + * The arrays for sorting are not including in write memory now, the memory usage is considered as + * temporary memory. */ abstract void clearSortedValue(); @@ -271,6 +308,7 @@ public abstract class TVList { if (sorted) { return; } + if (lo == hi) { return; } @@ -307,33 +345,6 @@ public abstract class TVList { return runHi - lo; } - public static TVList newList(TSDataType dataType) { - switch (dataType) { - case TEXT: - return new BinaryTVList(); - case FLOAT: - return new FloatTVList(); - case INT32: - return new IntTVList(); - case INT64: - return new LongTVList(); - case DOUBLE: - return new DoubleTVList(); - case BOOLEAN: - return new BooleanTVList(); - default: - break; - } - return null; - } - - /** - * this field is effective only in the Tvlist in a RealOnlyMemChunk. - */ - public void setDeletionList(List<TimeRange> list) { - this.deletionList = list; - } - protected int compare(int idx1, int idx2) { long t1 = getTime(idx1); long t2 = getTime(idx2); @@ -469,23 +480,14 @@ public abstract class TVList { protected abstract TimeValuePair getTimeValuePair(int index, long time, Integer floatPrecision, TSEncoding encoding); + @TestOnly public IPointReader getIterator() { return new Ite(); } - public IPointReader getIterator(int floatPrecision, TSEncoding encoding) { - return new Ite(floatPrecision, encoding); - } - - public static long tvListArrayMemSize(TSDataType type) { - long size = 0; - // time size - size += - PrimitiveArrayManager.ARRAY_SIZE * 8; - // value size - size += - PrimitiveArrayManager.ARRAY_SIZE * type.getDataTypeSize(); - return size; + public IPointReader getIterator(int floatPrecision, TSEncoding encoding, int size, + List<TimeRange> deletionList) { + return new Ite(floatPrecision, encoding, size, deletionList); } private class Ite implements IPointReader { @@ -496,13 +498,24 @@ public abstract class TVList { private Integer floatPrecision; private TSEncoding encoding; private int deleteCursor = 0; + /** + * because TV list may be share with different query, each iterator has to record it's own size + */ + private int iteSize = 0; + /** + * this field is effective only in the Tvlist in a RealOnlyMemChunk. + */ + private List<TimeRange> deletionList; public Ite() { + this.iteSize = TVList.this.size; } - public Ite(int floatPrecision, TSEncoding encoding) { + public Ite(int floatPrecision, TSEncoding encoding, int size, List<TimeRange> deletionList) { this.floatPrecision = floatPrecision; this.encoding = encoding; + this.iteSize = size; + this.deletionList = deletionList; } @Override @@ -511,7 +524,7 @@ public abstract class TVList { return true; } - while (cur < size) { + while (cur < iteSize) { long time = getTime(cur); if (isPointDeleted(time) || (cur + 1 < size() && (time == getTime(cur + 1)))) { cur++; @@ -522,7 +535,8 @@ public abstract class TVList { cur++; return true; } - return hasCachedPair; + + return false; } private boolean isPointDeleted(long timestamp) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java index 94c6378..b967fe1 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java @@ -58,7 +58,7 @@ public class PrimitiveMemTableTest { for (int i = 0; i < count; i++) { series.write(i, i); } - IPointReader it = series.getSortedTVList().getIterator(); + IPointReader it = series.getSortedTVListForQuery().getIterator(); int i = 0; while (it.hasNextTimeValuePair()) { Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());
