This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1971 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f678504eaea088f3827a5e593ff7e04e54a13c2e Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Dec 2 11:17:53 2021 +0800 pass tests when canBeSplitIntoFragments := 2 <= fragmentDataSetIndexToLayerPointReaders.size() --- .../db/query/dataset/udf/UDTFJoinDataSet.java | 262 ++++++++++++--------- .../db/query/udf/core/layer/LayerBuilder.java | 2 +- 2 files changed, 147 insertions(+), 117 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java index 1e7bd02..b838a0f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java @@ -19,15 +19,19 @@ package org.apache.iotdb.db.query.dataset.udf; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.BytesUtils; import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.IOException; @@ -71,7 +75,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa private void initTimeHeap() throws IOException { timeHeap = new TimeSelector(resultColumnsLength << 1, true); - for (int i = 0; i < resultColumnsLength; ++i) { + for (int i = 0, n = fragmentDataSets.length; i < n; ++i) { QueryDataSet fragmentDataSet = fragmentDataSets[i]; if (fragmentDataSet.hasNextWithoutConstraint()) { rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint(); @@ -81,8 +85,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa } @Override - public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) - throws IOException, QueryProcessException { + public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) throws IOException { TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); PublicBAOS timeBAOS = new PublicBAOS(); @@ -94,112 +97,123 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa } int[] currentBitmapList = new int[resultColumnsLength]; - // int rowCount = 0; - // while (rowCount < fetchSize - // && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit) - // && !timeHeap.isEmpty()) { - // - // long minTime = timeHeap.pollFirst(); - // if (rowOffset == 0) { - // timeBAOS.write(BytesUtils.longToBytes(minTime)); - // } - // - // for (int i = 0; i < resultColumnsLength; ++i) { - // LayerPointReader reader = transformers[i]; - // - // if (!reader.next() || reader.currentTime() != minTime) { - // if (rowOffset == 0) { - // currentBitmapList[i] = (currentBitmapList[i] << 1); - // } - // continue; - // } - // - // if (rowOffset == 0) { - // currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG; - // TSDataType type = reader.getDataType(); - // switch (type) { - // case INT32: - // int intValue = reader.currentInt(); - // ReadWriteIOUtils.write( - // encoder != null && encoder.needEncode(minTime) - // ? encoder.encodeInt(intValue, minTime) - // : intValue, - // valueBAOSList[i]); - // break; - // case INT64: - // long longValue = reader.currentLong(); - // ReadWriteIOUtils.write( - // encoder != null && encoder.needEncode(minTime) - // ? encoder.encodeLong(longValue, minTime) - // : longValue, - // valueBAOSList[i]); - // break; - // case FLOAT: - // float floatValue = reader.currentFloat(); - // ReadWriteIOUtils.write( - // encoder != null && encoder.needEncode(minTime) - // ? encoder.encodeFloat(floatValue, minTime) - // : floatValue, - // valueBAOSList[i]); - // break; - // case DOUBLE: - // double doubleValue = reader.currentDouble(); - // ReadWriteIOUtils.write( - // encoder != null && encoder.needEncode(minTime) - // ? encoder.encodeDouble(doubleValue, minTime) - // : doubleValue, - // valueBAOSList[i]); - // break; - // case BOOLEAN: - // ReadWriteIOUtils.write(reader.currentBoolean(), valueBAOSList[i]); - // break; - // case TEXT: - // ReadWriteIOUtils.write(reader.currentBinary(), valueBAOSList[i]); - // break; - // default: - // throw new UnSupportedDataTypeException( - // String.format("Data type %s is not supported.", type)); - // } - // } - // - // reader.readyForNext(); - // - // if (reader.next()) { - // timeHeap.add(reader.currentTime()); - // } - // } - // - // if (rowOffset == 0) { - // ++rowCount; - // if (rowCount % 8 == 0) { - // for (int i = 0; i < resultColumnsLength; ++i) { - // ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]); - // currentBitmapList[i] = 0; - // } - // } - // if (rowLimit > 0) { - // ++alreadyReturnedRowNum; - // } - // } else { - // --rowOffset; - // } - // - // rawQueryInputLayer.updateRowRecordListEvictionUpperBound(); - // } - // - // /* - // * feed the bitmap with remaining 0 in the right - // * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000 - // */ - // if (rowCount > 0) { - // int remaining = rowCount % 8; - // if (remaining != 0) { - // for (int i = 0; i < resultColumnsLength; ++i) { - // ReadWriteIOUtils.write( - // (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]); - // } - // } - // } + int rowCount = 0; + while (rowCount < fetchSize + && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit) + && !timeHeap.isEmpty()) { + + long minTime = timeHeap.pollFirst(); + if (rowOffset == 0) { + timeBAOS.write(BytesUtils.longToBytes(minTime)); + } + + for (int i = 0; i < resultColumnsLength; ++i) { + int[] indexes = resultColumnOutputIndexToFragmentDataSetOutputIndex[i]; + int fragmentDataSetIndex = indexes[0]; + int outputColumnIndexInFragmentDataSet = indexes[1]; + + if (rowRecordsCache[fragmentDataSetIndex] == null) { + if (rowOffset == 0) { + currentBitmapList[i] = (currentBitmapList[i] << 1); + } + continue; + } + + RowRecord fragmentRowRecord = rowRecordsCache[fragmentDataSetIndex]; + if (fragmentRowRecord.getTimestamp() != minTime) { + if (rowOffset == 0) { + currentBitmapList[i] = (currentBitmapList[i] << 1); + } + continue; + } + + if (rowOffset == 0) { + currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG; + + Field field = fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet); + if (field == null || field.getDataType() == null) { + currentBitmapList[i] = (currentBitmapList[i] << 1); + continue; + } + + TSDataType type = field.getDataType(); + switch (type) { + case INT32: + int intValue = field.getIntV(); + ReadWriteIOUtils.write( + encoder != null && encoder.needEncode(minTime) + ? encoder.encodeInt(intValue, minTime) + : intValue, + valueBAOSList[i]); + break; + case INT64: + long longValue = field.getLongV(); + ReadWriteIOUtils.write( + encoder != null && encoder.needEncode(minTime) + ? encoder.encodeLong(longValue, minTime) + : longValue, + valueBAOSList[i]); + break; + case FLOAT: + float floatValue = field.getFloatV(); + ReadWriteIOUtils.write( + encoder != null && encoder.needEncode(minTime) + ? encoder.encodeFloat(floatValue, minTime) + : floatValue, + valueBAOSList[i]); + break; + case DOUBLE: + double doubleValue = field.getDoubleV(); + ReadWriteIOUtils.write( + encoder != null && encoder.needEncode(minTime) + ? encoder.encodeDouble(doubleValue, minTime) + : doubleValue, + valueBAOSList[i]); + break; + case BOOLEAN: + ReadWriteIOUtils.write(field.getBoolV(), valueBAOSList[i]); + break; + case TEXT: + ReadWriteIOUtils.write(field.getBinaryV(), valueBAOSList[i]); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", type)); + } + } + } + + updateRowRecordsCache(minTime); + + if (rowOffset == 0) { + ++rowCount; + if (rowCount % 8 == 0) { + for (int i = 0; i < resultColumnsLength; ++i) { + ReadWriteIOUtils.write((byte) currentBitmapList[i], bitmapBAOSList[i]); + currentBitmapList[i] = 0; + } + } + if (rowLimit > 0) { + ++alreadyReturnedRowNum; + } + } else { + --rowOffset; + } + } + + /* + * feed the bitmap with remaining 0 in the right + * if current bitmap is 00011111 and remaining is 3, after feeding the bitmap is 11111000 + */ + if (rowCount > 0) { + int remaining = rowCount % 8; + if (remaining != 0) { + for (int i = 0; i < resultColumnsLength; ++i) { + ReadWriteIOUtils.write( + (byte) (currentBitmapList[i] << (8 - remaining)), bitmapBAOSList[i]); + } + } + } return QueryDataSetUtils.packBuffer( tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, resultColumnsLength); @@ -232,15 +246,31 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa } rowRecord.addField(fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet)); - rowRecordsCache[fragmentDataSetIndex] = null; + } + + updateRowRecordsCache(minTime); + + return rowRecord; + } + + private void updateRowRecordsCache(long minTime) { + for (int i = 0, n = fragmentDataSets.length; i < n; ++i) { + if (rowRecordsCache[i] == null) { + continue; + } + + RowRecord fragmentRowRecord = rowRecordsCache[i]; + if (fragmentRowRecord.getTimestamp() != minTime) { + continue; + } + + rowRecordsCache[i] = null; - if (fragmentDataSets[fragmentDataSetIndex].hasNextWithoutConstraint()) { - fragmentRowRecord = fragmentDataSets[fragmentDataSetIndex].nextWithoutConstraint(); - rowRecordsCache[fragmentDataSetIndex] = fragmentRowRecord; + if (fragmentDataSets[i].hasNextWithoutConstraint()) { + fragmentRowRecord = fragmentDataSets[i].nextWithoutConstraint(); + rowRecordsCache[i] = fragmentRowRecord; timeHeap.add(fragmentRowRecord.getTimestamp()); } } - - return rowRecord; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java index ab60387..a61d872 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java @@ -139,7 +139,7 @@ public class LayerBuilder { /** TODO: make it configurable */ public boolean canBeSplitIntoFragments() { - return 4 <= fragmentDataSetIndexToLayerPointReaders.size(); + return 2 <= fragmentDataSetIndexToLayerPointReaders.size(); } public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet udtfAlignByTimeDataSet)
