This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch vectorMemTable in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb42aede25ab35a0a43d9e9ad3d14aa7a124765f Author: HTHou <[email protected]> AuthorDate: Thu Mar 11 13:55:58 2021 +0800 memtable --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 6 + .../iotdb/db/rescon/PrimitiveArrayManager.java | 9 +- .../iotdb/db/utils/datastructure/TVList.java | 14 ++ .../iotdb/db/utils/datastructure/VectorTVList.java | 230 +++++++++++++++++++++ .../db/utils/datastructure/VectorTVListTest.java | 67 ++++++ .../tsfile/file/metadata/enums/TSDataType.java | 6 +- .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 62 ++++++ 7 files changed, 392 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index 9397af1..dcd82ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -190,6 +190,12 @@ public class MemTableFlushTask { case TEXT: seriesWriterImpl.write(time, tvPairs.getBinary(i)); break; + case VECTOR: + // TODO: +// for ( : tvPairs.getVector(i)) { +// seriesWriterImpl.write(time, tvPairs.getVector(i)[], get); +// } + break; default: LOGGER.error( "Storage group {} does not support data type: {}", storageGroup, dataType); diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java index aa6c264..b471332 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/PrimitiveArrayManager.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; - +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +71,7 @@ public class PrimitiveArrayManager { bufferedArraysMap.put(TSDataType.FLOAT, new ArrayDeque<>()); bufferedArraysMap.put(TSDataType.DOUBLE, new ArrayDeque<>()); bufferedArraysMap.put(TSDataType.TEXT, new ArrayDeque<>()); + bufferedArraysMap.put(TSDataType.VECTOR, new ArrayDeque<>()); } private PrimitiveArrayManager() { @@ -127,6 +128,9 @@ public class PrimitiveArrayManager { case TEXT: dataArray = new Binary[ARRAY_SIZE]; break; + case VECTOR: + dataArray = new byte[ARRAY_SIZE][]; + break; default: throw new UnSupportedDataTypeException(dataType.toString()); } @@ -205,6 +209,9 @@ public class PrimitiveArrayManager { } else if (dataArray instanceof Binary[]) { Arrays.fill((Binary[]) dataArray, null); dataType = TSDataType.TEXT; + } else if (dataArray instanceof TsPrimitiveType[][]) { + Arrays.fill((TsPrimitiveType[][]) dataArray, null); + dataType = TSDataType.VECTOR; } else { throw new UnSupportedDataTypeException("Unknown data array type"); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index d24beae..3e0ef74 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -73,6 +73,8 @@ public abstract class TVList { return new DoubleTVList(); case BOOLEAN: return new BooleanTVList(); + case VECTOR: + return new VectorTVList(); default: break; } @@ -137,6 +139,10 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } + public void putVector(long time, byte[] value) { + throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); + } + public void putLongs(long[] time, long[] value, int start, int end) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -161,6 +167,10 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } + public void putVectors(long[] time, byte[][] value, int start, int end) { + throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); + } + public long getLong(int index) { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } @@ -185,6 +195,10 @@ public abstract class TVList { throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } + public byte[] getVector(int index) { + throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); + } + public abstract void sort(); public long getMinTime() { diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java new file mode 100644 index 0000000..e0a652a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/VectorTVList.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.utils.datastructure; + +import org.apache.iotdb.db.rescon.PrimitiveArrayManager; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE; + +public class VectorTVList extends TVList { + + private List<Object[]> values; + + private byte[][][] sortedValues; + + private byte[] pivotValue; + + VectorTVList() { + super(); + values = new ArrayList<>(); + } + + @Override + public void putVector(long timestamp, byte[] value) { + checkExpansion(); + int arrayIndex = size / ARRAY_SIZE; + int elementIndex = size % ARRAY_SIZE; + minTime = Math.min(minTime, timestamp); + timestamps.get(arrayIndex)[elementIndex] = timestamp; + values.get(arrayIndex)[elementIndex] = value; + size++; + if (sorted && size > 1 && timestamp < getTime(size - 2)) { + sorted = false; + } + } + + @Override + public byte[] getVector(int index) { + if (index >= size) { + throw new ArrayIndexOutOfBoundsException(index); + } + int arrayIndex = index / ARRAY_SIZE; + int elementIndex = index % ARRAY_SIZE; + return (byte[]) values.get(arrayIndex)[elementIndex]; + } + + protected void set(int index, long timestamp, byte[] value) { + if (index >= size) { + throw new ArrayIndexOutOfBoundsException(index); + } + int arrayIndex = index / ARRAY_SIZE; + int elementIndex = index % ARRAY_SIZE; + timestamps.get(arrayIndex)[elementIndex] = timestamp; + values.get(arrayIndex)[elementIndex] = value; + } + + @Override + public VectorTVList clone() { + VectorTVList cloneList = new VectorTVList(); + cloneAs(cloneList); + for (Object[] valueArray : values) { + cloneList.values.add(cloneValue(valueArray)); + } + return cloneList; + } + + private TsPrimitiveType[][] cloneValue(Object[] valueArray) { + TsPrimitiveType[][] cloneArray = (TsPrimitiveType[][])new Object[valueArray.length]; + System.arraycopy(valueArray, 0, cloneArray, 0, valueArray.length); + return cloneArray; + } + + @Override + public void sort() { + if (sortedTimestamps == null || sortedTimestamps.length < size) { + sortedTimestamps = + (long[][]) PrimitiveArrayManager.createDataListsByType(TSDataType.INT64, size); + } + if (sortedValues == null || sortedValues.length < size) { + sortedValues = + (byte[][][]) + PrimitiveArrayManager.createDataListsByType(TSDataType.VECTOR, size); + } + sort(0, size); + clearSortedValue(); + clearSortedTime(); + sorted = true; + } + + @Override + void clearValue() { + if (values != null) { + for (Object[] dataArray : values) { + PrimitiveArrayManager.release(dataArray); + } + values.clear(); + } + } + + @Override + void clearSortedValue() { + if (sortedValues != null) { + sortedValues = null; + } + } + + @Override + protected void setFromSorted(int src, int dest) { + set( + dest, + sortedTimestamps[src / ARRAY_SIZE][src % ARRAY_SIZE], + sortedValues[src / ARRAY_SIZE][src % ARRAY_SIZE]); + } + + @Override + protected void set(int src, int dest) { + long srcT = getTime(src); + byte[] srcV = getVector(src); + set(dest, srcT, srcV); + } + + @Override + protected void setToSorted(int src, int dest) { + sortedTimestamps[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getTime(src); + sortedValues[dest / ARRAY_SIZE][dest % ARRAY_SIZE] = getVector(src); + } + + @Override + protected void reverseRange(int lo, int hi) { + hi--; + while (lo < hi) { + long loT = getTime(lo); + byte[] loV = getVector(lo); + long hiT = getTime(hi); + byte[] hiV = getVector(hi); + set(lo++, hiT, hiV); + set(hi--, loT, loV); + } + } + + @Override + protected void expandValues() { + values.add((Object[]) getPrimitiveArraysByType(TSDataType.VECTOR)); + } + + @Override + protected void saveAsPivot(int pos) { + pivotTime = getTime(pos); + pivotValue = getVector(pos); + } + + @Override + protected void setPivotTo(int pos) { + set(pos, pivotTime, pivotValue); + } + + @Override + public TimeValuePair getTimeValuePair(int index) { + return new TimeValuePair( + getTime(index), TsPrimitiveType.getByType(TSDataType.VECTOR, getVector(index))); + } + + @Override + protected TimeValuePair getTimeValuePair( + int index, long time, Integer floatPrecision, TSEncoding encoding) { + return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.VECTOR, getBinary(index))); + } + + @Override + protected void releaseLastValueArray() { + PrimitiveArrayManager.release(values.remove(values.size() - 1)); + } + + @Override + public void putVectors(long[] time, byte[][] value, int start, int end) { + checkExpansion(); + int idx = start; + + updateMinTimeAndSorted(time, start, end); + + while (idx < end) { + int inputRemaining = end - idx; + int arrayIdx = size / ARRAY_SIZE; + int elementIdx = size % ARRAY_SIZE; + int internalRemaining = ARRAY_SIZE - elementIdx; + if (internalRemaining >= inputRemaining) { + // the remaining inputs can fit the last array, copy all remaining inputs into last array + System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, inputRemaining); + System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining); + size += inputRemaining; + break; + } else { + // the remaining inputs cannot fit the last array, fill the last array and create a new + // one and enter the next loop + System.arraycopy(time, idx, timestamps.get(arrayIdx), elementIdx, internalRemaining); + System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining); + idx += internalRemaining; + size += internalRemaining; + checkExpansion(); + } + } + } + + @Override + public TSDataType getDataType() { + return TSDataType.VECTOR; + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java new file mode 100644 index 0000000..7095612 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.utils.datastructure; + +import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class VectorTVListTest { + + @Test + public void testVectorTVList() { + VectorTVList tvList = new VectorTVList(); + for (int i = 0; i < 1000; i++) { + byte[] value = new byte[4 * 5]; + byte[] bytes = new byte[4]; + for (int j = 0; j < 20; j++) { + if (j % 4 == 0) { + bytes = BytesUtils.intToBytes(i); + } + value[j] = bytes[j % 4]; + } + tvList.putVector(i, value); + } + for (int i = 0; i < tvList.size; i++) { + Assert.assertEquals(String.valueOf(i), tvList.getVector(i).toString()); + Assert.assertEquals(i, tvList.getTime(i)); + } + } + + @Test + public void testVectorTVLists() { + VectorTVList tvList = new VectorTVList(); + byte[][] vectorList = new byte[1001][4 * 5]; + List<Long> timeList = new ArrayList<>(); + for (int i = 1000; i >= 0; i--) { + timeList.add((long) i); + for (int j = 0; j < 5; j++) { + vectorList[i][j] = 0; + } + } + tvList.putVectors(ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), vectorList, 0, 1000); + for (long i = 0; i < tvList.size; i++) { + Assert.assertEquals(tvList.size - i, tvList.getTime((int) i)); + } + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java index 0edda45..02a2086 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/TSDataType.java @@ -41,7 +41,10 @@ public enum TSDataType { DOUBLE((byte) 4), /** TEXT */ - TEXT((byte) 5); + TEXT((byte) 5), + + /** VECTOR */ + VECTOR((byte) 6); private final byte type; @@ -96,6 +99,7 @@ public enum TSDataType { case TEXT: case INT64: case DOUBLE: + case VECTOR: return 8; default: throw new UnSupportedDataTypeException(this.toString()); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java index 73b01d2..32c621f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java @@ -45,11 +45,23 @@ public abstract class TsPrimitiveType implements Serializable { return new TsPrimitiveType.TsDouble((double) v); case TEXT: return new TsPrimitiveType.TsBinary((Binary) v); + case VECTOR: + return new TsPrimitiveType.TsVector((TsPrimitiveType[]) v); default: throw new UnSupportedDataTypeException("Unsupported data type:" + dataType); } } + public void setVector(TsPrimitiveType[] val) { + // TODO Auto-generated method stub + + } + + public TsPrimitiveType[] getVector() { + // TODO Auto-generated method stub + return null; + } + public boolean getBoolean() { throw new UnsupportedOperationException("getBoolean() is not supported for current sub-class"); } @@ -462,4 +474,54 @@ public abstract class TsPrimitiveType implements Serializable { return false; } } + + public static class TsVector extends TsPrimitiveType { + + private TsPrimitiveType[] value; + + public TsVector(TsPrimitiveType[] value) { + this.value = value; + } + + @Override + public TsPrimitiveType[] getVector() { + return value; + } + + @Override + public void setVector(TsPrimitiveType[] val) { + this.value = val; + } + + @Override + public int getSize() { + int size = 0; + for (TsPrimitiveType type : value) { + size += type.getSize(); + } + // object header + array object header + return 4 + 4 + size; + } + + @Override + public Object getValue() { + return getVector(); + } + + @Override + public String getStringValue() { + StringBuilder builder = new StringBuilder("["); + builder.append(value[0].getStringValue()); + for (TsPrimitiveType type : value) { + builder.append(", ").append(type.getStringValue()); + } + builder.append("]"); + return builder.toString(); + } + + @Override + public TSDataType getDataType() { + return TSDataType.VECTOR; + } + } }
