This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch vectorMemTable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27883e0c9485f503fc721037440b3c9ce1b6e41d Author: HTHou <[email protected]> AuthorDate: Thu Mar 11 17:07:09 2021 +0800 Memtable --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 3 +- .../iotdb/db/engine/flush/NotifyFlushMemTable.java | 4 +- .../iotdb/db/engine/memtable/AbstractMemTable.java | 48 +++++++++++++++++++--- .../apache/iotdb/db/engine/memtable/IMemTable.java | 7 ++++ .../db/engine/memtable/IWritableMemChunk.java | 6 ++- .../db/engine/memtable/PrimitiveMemTable.java | 3 +- .../iotdb/db/engine/memtable/WritableMemChunk.java | 15 +++++-- .../db/engine/querycontext/ReadOnlyMemChunk.java | 3 ++ .../tsfile/write/schema/MeasurementSchema.java | 34 ++++++++++++++- 9 files changed, 108 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index c1bc5a9..4134d1c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -30,6 +30,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -110,7 +111,7 @@ public class MemTableFlushTask { for (Map.Entry<String, IWritableMemChunk> iWritableMemChunkEntry : value.entrySet()) { long startTime = System.currentTimeMillis(); IWritableMemChunk series = iWritableMemChunkEntry.getValue(); - MeasurementSchema desc = series.getSchema(); + IMeasurementSchema desc = series.getSchema(); TVList tvList = series.getSortedTVListForFlush(); sortTime += System.currentTimeMillis() - startTime; encodingTaskQueue.put(new Pair<>(tvList, desc)); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java index 5ba50d0..4aeb66c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.flush; import org.apache.iotdb.db.engine.memtable.AbstractMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.IWritableMemChunk; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; /** * Only used in sync flush and async close to start a flush task This memtable is not managed by @@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class NotifyFlushMemTable extends AbstractMemTable { @Override - protected IWritableMemChunk genMemSeries(MeasurementSchema schema) { + protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { return null; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 21c9bf2..dd18de4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.IOException; @@ -93,12 +94,12 @@ public abstract class AbstractMemTable implements IMemTable { } private IWritableMemChunk createIfNotExistAndGet( - String deviceId, String measurement, MeasurementSchema schema) { + String deviceId, IMeasurementSchema schema) { Map<String, IWritableMemChunk> memSeries = memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>()); return memSeries.computeIfAbsent( - measurement, + schema.getMeasurementId(), k -> { seriesNumber++; totalPointsNumThreshold += avgSeriesPointNumThreshold; @@ -106,7 +107,7 @@ public abstract class AbstractMemTable implements IMemTable { }); } - protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema); + protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema); @Override public void insert(InsertRowPlan insertRowPlan) { @@ -138,6 +139,34 @@ public abstract class AbstractMemTable implements IMemTable { } @Override + public void insert(InsertVectorPlan insertVectorPlan) { + updatePlanIndexes(insertVectorPlan.getIndex()); + Object[] values = insertVectorPlan.getValues(); + + MeasurementMNode[] measurementMNodes = insertVectorPlan.getMeasurementMNodes(); + String[] measurements = insertVectorPlan.getMeasurements(); + IMeasurementSchema vmSchema = (IMeasurementSchema) measurementMNodes[0].getSchema(); + for (int i = 0; i < values.length; i++) { + Object value = values[i]; + if (value == null) { + continue; + } + + memSize += + MemUtils.getRecordSize( + vmSchema.getValueTSDataTypeList().get(i), value, disableMemControl); + } + write( + insertVectorPlan.getDeviceId().getFullPath(), + vmSchema, + insertVectorPlan.getTime(), + values); + + totalPointsNum += + insertVectorPlan.getMeasurements().length - insertVectorPlan.getFailedMeasurementNumber(); + } + + @Override public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end) throws WriteProcessException { updatePlanIndexes(insertTabletPlan.getIndex()); @@ -160,7 +189,17 @@ public abstract class AbstractMemTable implements IMemTable { MeasurementSchema schema, long insertTime, Object objectValue) { - IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema); + IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema); + memSeries.write(insertTime, objectValue); + } + + @Override + public void write( + String deviceId, + IMeasurementSchema schema, + long insertTime, + Object objectValue) { + IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema); memSeries.write(insertTime, objectValue); } @@ -174,7 +213,6 @@ public abstract class AbstractMemTable implements IMemTable { IWritableMemChunk memSeries = createIfNotExistAndGet( insertTabletPlan.getDeviceId().getFullPath(), - insertTabletPlan.getMeasurements()[i], insertTabletPlan.getMeasurementMNodes()[i].getSchema()); memSeries.write( insertTabletPlan.getTimes(), diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index ce412a2..3a46e03 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.io.IOException; @@ -53,6 +54,12 @@ public interface IMemTable { long insertTime, Object objectValue); + void write( + String deviceId, + IMeasurementSchema schema, + long insertTime, + Object objectValue); + void write(InsertTabletPlan insertTabletPlan, int start, int end); /** @return the number of points */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java index bdb4bbc..10298e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.memtable; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; public interface IWritableMemChunk { @@ -36,6 +36,8 @@ public interface IWritableMemChunk { void putBinary(long t, Binary v); void putBoolean(long t, boolean v); + + void putVector(long t, byte[] v); void putLongs(long[] t, long[] v, int start, int end); @@ -56,7 +58,7 @@ public interface IWritableMemChunk { long count(); - MeasurementSchema getSchema(); + IMeasurementSchema getSchema(); /** * served for query requests. diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java index 254d722..0336c61 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.memtable; import org.apache.iotdb.db.rescon.TVListAllocator; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import java.util.HashMap; @@ -38,7 +39,7 @@ public class PrimitiveMemTable extends AbstractMemTable { } @Override - protected IWritableMemChunk genMemSeries(MeasurementSchema schema) { + protected IWritableMemChunk genMemSeries(IMeasurementSchema schema) { return new WritableMemChunk(schema, TVListAllocator.getInstance().allocate(schema.getType())); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java index 4536896..8cf3422 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java @@ -22,14 +22,15 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; public class WritableMemChunk implements IWritableMemChunk { - private MeasurementSchema schema; + private IMeasurementSchema schema; private TVList list; - public WritableMemChunk(MeasurementSchema schema, TVList list) { + public WritableMemChunk(IMeasurementSchema schema, TVList list) { this.schema = schema; this.list = list; } @@ -55,6 +56,9 @@ public class WritableMemChunk implements IWritableMemChunk { case TEXT: putBinary(insertTime, (Binary) objectValue); break; + case VECTOR: + putVector(insertTime, (byte[]) objectValue); + break; default: throw new UnSupportedDataTypeException("Unsupported data type:" + schema.getType()); } @@ -123,6 +127,11 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override + public void putVector(long t, byte[] v) { + list.putVector(t, v); + } + + @Override public void putLongs(long[] t, long[] v, int start, int end) { list.putLongs(t, v, start, end); } @@ -188,7 +197,7 @@ public class WritableMemChunk implements IWritableMemChunk { } @Override - public MeasurementSchema getSchema() { + public IMeasurementSchema getSchema() { return schema; } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java index 8236e15..791af95 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java @@ -124,6 +124,9 @@ public class ReadOnlyMemChunk { case DOUBLE: statsByType.update(timeValuePair.getTimestamp(), timeValuePair.getValue().getDouble()); break; + case VECTOR: + statsByType.update(timeValuePair.getTimestamp()); + break; default: throw new QueryProcessException("Unsupported data type:" + dataType); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java index 34ee496..3b2368d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/MeasurementSchema.java @@ -33,6 +33,7 @@ import java.io.OutputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,7 +43,8 @@ import java.util.Objects; * MeasurementSchema maintains respective TSEncodingBuilder; For TSDataType, only ENUM has * TSDataTypeConverter up to now. */ -public class MeasurementSchema implements Comparable<MeasurementSchema>, Serializable { +public class MeasurementSchema implements Comparable<MeasurementSchema>, + Serializable, IMeasurementSchema { public static final MeasurementSchema TIME_SCHEMA = new MeasurementSchema( @@ -323,4 +325,34 @@ public class MeasurementSchema implements Comparable<MeasurementSchema>, Seriali public void setType(TSDataType type) { this.type = type.serialize(); } + + @Override + public TSEncoding getTimeTSEncoding() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<String> getValueMeasurementIdList() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<TSDataType> getValueTSDataTypeList() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<TSEncoding> getValueTSEncodingList() { + // TODO Auto-generated method stub + return null; + } + + @Override + public List<Encoder> getValueEncoderList() { + // TODO Auto-generated method stub + return null; + } }
