This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty-mpp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8227c55084efc2c1a3a130591c3020082b0f2a1e Author: JackieTien97 <[email protected]> AuthorDate: Tue Apr 12 16:26:59 2022 +0800 [IOTDB-2889] Add method converting TsBlock to TSQueryDataSet in QueryDataSetUtils --- .../iotdb/db/mpp/execution/QueryExecution.java | 18 ++- .../apache/iotdb/db/utils/QueryDataSetUtils.java | 177 ++++++++++++++++++++- 2 files changed, 192 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java index 78458122d7..e70733b50e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryExecution.java @@ -170,6 +170,22 @@ public class QueryExecution implements IQueryExecution { } } + /** @return true if there is more tsblocks, otherwise false */ + public boolean hasNextResult() { + try { + initialResultHandle(); + return resultHandle.isFinished(); + } catch (IOException e) { + throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } + } + + /** return the result column count without the time column */ + public int getOutputValueColumnCount() { + return 1; + } + /** * This method is a synchronized method. For READ, it will block until all the FragmentInstances * have been submitted. For WRITE, it will block until all the FragmentInstances have finished. @@ -204,7 +220,7 @@ public class QueryExecution implements IQueryExecution { } } - private synchronized void initialResultHandle() throws IOException { + private void initialResultHandle() throws IOException { if (this.resultHandle == null) { this.resultHandle = DataBlockService.getInstance() diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index 075e5a2ba7..c65e57f208 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -18,12 +18,15 @@ */ package org.apache.iotdb.db.utils; +import org.apache.iotdb.db.mpp.execution.QueryExecution; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; @@ -40,7 +43,7 @@ import java.util.List; /** TimeValuePairUtils to convert between thrift format and TsFile format. */ public class QueryDataSetUtils { - private static final int flag = 0x01; + private static final int FLAG = 0x01; private QueryDataSetUtils() {} @@ -88,7 +91,7 @@ public class QueryDataSetUtils { if (field == null || field.getDataType() == null) { bitmap[k] = (bitmap[k] << 1); } else { - bitmap[k] = (bitmap[k] << 1) | flag; + bitmap[k] = (bitmap[k] << 1) | FLAG; TSDataType type = field.getDataType(); switch (type) { case INT32: @@ -173,6 +176,176 @@ public class QueryDataSetUtils { return tsQueryDataSet; } + public static TSQueryDataSet convertTsBlockByFetchSize( + QueryExecution queryExecution, int fetchSize) throws IOException { + int columnNum = queryExecution.getOutputValueColumnCount(); + TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); + // one time column and each value column has a actual value buffer and a bitmap value to + // indicate whether it is a null + int columnNumWithTime = columnNum * 2 + 1; + DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; + ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; + for (int i = 0; i < columnNumWithTime; i++) { + byteArrayOutputStreams[i] = new ByteArrayOutputStream(); + dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); + } + + int rowCount = 0; + int[] valueOccupation = new int[columnNum]; + while (rowCount < fetchSize && queryExecution.hasNextResult()) { + TsBlock tsBlock = queryExecution.getBatchResult(); + int currentCount = tsBlock.getPositionCount(); + for (int i = 0; i < currentCount; i++) { + // use columnOutput to write byte array + dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); + } + for (int k = 0; k < columnNum; k++) { + DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; + DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; + // used to record a bitmap for every 8 row record + int bitmap = 0; + Column column = tsBlock.getColumn(k); + TSDataType type = column.getDataType(); + switch (type) { + case INT32: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + dataOutputStream.writeInt(column.getInt(i)); + valueOccupation[k] += 4; + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + case INT64: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + dataOutputStream.writeLong(column.getLong(i)); + valueOccupation[k] += 8; + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + case FLOAT: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + dataOutputStream.writeFloat(column.getFloat(i)); + valueOccupation[k] += 4; + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + case DOUBLE: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + dataOutputStream.writeDouble(column.getDouble(i)); + valueOccupation[k] += 8; + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + case BOOLEAN: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + dataOutputStream.writeBoolean(column.getBoolean(i)); + valueOccupation[k] += 1; + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + case TEXT: + for (int i = 0; i < currentCount; i++) { + if (column.isNull(i)) { + bitmap = bitmap << 1; + } else { + bitmap = (bitmap << 1) | FLAG; + Binary binary = column.getBinary(i); + dataOutputStream.writeInt(binary.getLength()); + dataOutputStream.write(binary.getValues()); + valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); + } + if (i % 8 == 0) { + dataBitmapOutputStream.writeByte(bitmap); + // we should clear the bitmap every 8 row record + bitmap = 0; + } + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + // feed the remaining bitmap + int remaining = currentCount % 8; + if (remaining != 0) { + dataBitmapOutputStream.writeByte(bitmap << (8 - remaining)); + } + } + rowCount += currentCount; + } + + // calculate the time buffer size + int timeOccupation = rowCount * 8; + ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); + timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); + timeBuffer.flip(); + tsQueryDataSet.setTime(timeBuffer); + + // calculate the bitmap buffer size + int bitmapOccupation = (rowCount + 7) / 8; + + List<ByteBuffer> bitmapList = new LinkedList<>(); + List<ByteBuffer> valueList = new LinkedList<>(); + for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { + ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); + valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); + valueBuffer.flip(); + valueList.add(valueBuffer); + + ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); + bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); + bitmapBuffer.flip(); + bitmapList.add(bitmapBuffer); + } + tsQueryDataSet.setBitmapList(bitmapList); + tsQueryDataSet.setValueList(valueList); + return tsQueryDataSet; + } + public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) { long[] times = new long[size]; for (int i = 0; i < size; i++) {
