This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f678504eaea088f3827a5e593ff7e04e54a13c2e
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Dec 2 11:17:53 2021 +0800

    pass tests when canBeSplitIntoFragments := 2 <= 
fragmentDataSetIndexToLayerPointReaders.size()
---
 .../db/query/dataset/udf/UDTFJoinDataSet.java      | 262 ++++++++++++---------
 .../db/query/udf/core/layer/LayerBuilder.java      |   2 +-
 2 files changed, 147 insertions(+), 117 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 1e7bd02..b838a0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,15 +19,19 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.IOException;
 
@@ -71,7 +75,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements 
DirectAlignByTimeDa
   private void initTimeHeap() throws IOException {
     timeHeap = new TimeSelector(resultColumnsLength << 1, true);
 
-    for (int i = 0; i < resultColumnsLength; ++i) {
+    for (int i = 0, n = fragmentDataSets.length; i < n; ++i) {
       QueryDataSet fragmentDataSet = fragmentDataSets[i];
       if (fragmentDataSet.hasNextWithoutConstraint()) {
         rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint();
@@ -81,8 +85,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements 
DirectAlignByTimeDa
   }
 
   @Override
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder)
-      throws IOException, QueryProcessException {
+  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) 
throws IOException {
     TSQueryDataSet tsQueryDataSet = new TSQueryDataSet();
 
     PublicBAOS timeBAOS = new PublicBAOS();
@@ -94,112 +97,123 @@ public class UDTFJoinDataSet extends QueryDataSet 
implements DirectAlignByTimeDa
     }
     int[] currentBitmapList = new int[resultColumnsLength];
 
-    //    int rowCount = 0;
-    //    while (rowCount < fetchSize
-    //        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
-    //        && !timeHeap.isEmpty()) {
-    //
-    //      long minTime = timeHeap.pollFirst();
-    //      if (rowOffset == 0) {
-    //        timeBAOS.write(BytesUtils.longToBytes(minTime));
-    //      }
-    //
-    //      for (int i = 0; i < resultColumnsLength; ++i) {
-    //        LayerPointReader reader = transformers[i];
-    //
-    //        if (!reader.next() || reader.currentTime() != minTime) {
-    //          if (rowOffset == 0) {
-    //            currentBitmapList[i] = (currentBitmapList[i] << 1);
-    //          }
-    //          continue;
-    //        }
-    //
-    //        if (rowOffset == 0) {
-    //          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
-    //          TSDataType type = reader.getDataType();
-    //          switch (type) {
-    //            case INT32:
-    //              int intValue = reader.currentInt();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeInt(intValue, minTime)
-    //                      : intValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case INT64:
-    //              long longValue = reader.currentLong();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeLong(longValue, minTime)
-    //                      : longValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case FLOAT:
-    //              float floatValue = reader.currentFloat();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeFloat(floatValue, minTime)
-    //                      : floatValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case DOUBLE:
-    //              double doubleValue = reader.currentDouble();
-    //              ReadWriteIOUtils.write(
-    //                  encoder != null && encoder.needEncode(minTime)
-    //                      ? encoder.encodeDouble(doubleValue, minTime)
-    //                      : doubleValue,
-    //                  valueBAOSList[i]);
-    //              break;
-    //            case BOOLEAN:
-    //              ReadWriteIOUtils.write(reader.currentBoolean(), 
valueBAOSList[i]);
-    //              break;
-    //            case TEXT:
-    //              ReadWriteIOUtils.write(reader.currentBinary(), 
valueBAOSList[i]);
-    //              break;
-    //            default:
-    //              throw new UnSupportedDataTypeException(
-    //                  String.format("Data type %s is not supported.", type));
-    //          }
-    //        }
-    //
-    //        reader.readyForNext();
-    //
-    //        if (reader.next()) {
-    //          timeHeap.add(reader.currentTime());
-    //        }
-    //      }
-    //
-    //      if (rowOffset == 0) {
-    //        ++rowCount;
-    //        if (rowCount % 8 == 0) {
-    //          for (int i = 0; i < resultColumnsLength; ++i) {
-    //            ReadWriteIOUtils.write((byte) currentBitmapList[i], 
bitmapBAOSList[i]);
-    //            currentBitmapList[i] = 0;
-    //          }
-    //        }
-    //        if (rowLimit > 0) {
-    //          ++alreadyReturnedRowNum;
-    //        }
-    //      } else {
-    //        --rowOffset;
-    //      }
-    //
-    //      rawQueryInputLayer.updateRowRecordListEvictionUpperBound();
-    //    }
-    //
-    //    /*
-    //     * feed the bitmap with remaining 0 in the right
-    //     * if current bitmap is 00011111 and remaining is 3, after feeding 
the bitmap is 11111000
-    //     */
-    //    if (rowCount > 0) {
-    //      int remaining = rowCount % 8;
-    //      if (remaining != 0) {
-    //        for (int i = 0; i < resultColumnsLength; ++i) {
-    //          ReadWriteIOUtils.write(
-    //              (byte) (currentBitmapList[i] << (8 - remaining)), 
bitmapBAOSList[i]);
-    //        }
-    //      }
-    //    }
+    int rowCount = 0;
+    while (rowCount < fetchSize
+        && (rowLimit <= 0 || alreadyReturnedRowNum < rowLimit)
+        && !timeHeap.isEmpty()) {
+
+      long minTime = timeHeap.pollFirst();
+      if (rowOffset == 0) {
+        timeBAOS.write(BytesUtils.longToBytes(minTime));
+      }
+
+      for (int i = 0; i < resultColumnsLength; ++i) {
+        int[] indexes = resultColumnOutputIndexToFragmentDataSetOutputIndex[i];
+        int fragmentDataSetIndex = indexes[0];
+        int outputColumnIndexInFragmentDataSet = indexes[1];
+
+        if (rowRecordsCache[fragmentDataSetIndex] == null) {
+          if (rowOffset == 0) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+          }
+          continue;
+        }
+
+        RowRecord fragmentRowRecord = rowRecordsCache[fragmentDataSetIndex];
+        if (fragmentRowRecord.getTimestamp() != minTime) {
+          if (rowOffset == 0) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+          }
+          continue;
+        }
+
+        if (rowOffset == 0) {
+          currentBitmapList[i] = (currentBitmapList[i] << 1) | FLAG;
+
+          Field field = 
fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet);
+          if (field == null || field.getDataType() == null) {
+            currentBitmapList[i] = (currentBitmapList[i] << 1);
+            continue;
+          }
+
+          TSDataType type = field.getDataType();
+          switch (type) {
+            case INT32:
+              int intValue = field.getIntV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeInt(intValue, minTime)
+                      : intValue,
+                  valueBAOSList[i]);
+              break;
+            case INT64:
+              long longValue = field.getLongV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeLong(longValue, minTime)
+                      : longValue,
+                  valueBAOSList[i]);
+              break;
+            case FLOAT:
+              float floatValue = field.getFloatV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeFloat(floatValue, minTime)
+                      : floatValue,
+                  valueBAOSList[i]);
+              break;
+            case DOUBLE:
+              double doubleValue = field.getDoubleV();
+              ReadWriteIOUtils.write(
+                  encoder != null && encoder.needEncode(minTime)
+                      ? encoder.encodeDouble(doubleValue, minTime)
+                      : doubleValue,
+                  valueBAOSList[i]);
+              break;
+            case BOOLEAN:
+              ReadWriteIOUtils.write(field.getBoolV(), valueBAOSList[i]);
+              break;
+            case TEXT:
+              ReadWriteIOUtils.write(field.getBinaryV(), valueBAOSList[i]);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", type));
+          }
+        }
+      }
+
+      updateRowRecordsCache(minTime);
+
+      if (rowOffset == 0) {
+        ++rowCount;
+        if (rowCount % 8 == 0) {
+          for (int i = 0; i < resultColumnsLength; ++i) {
+            ReadWriteIOUtils.write((byte) currentBitmapList[i], 
bitmapBAOSList[i]);
+            currentBitmapList[i] = 0;
+          }
+        }
+        if (rowLimit > 0) {
+          ++alreadyReturnedRowNum;
+        }
+      } else {
+        --rowOffset;
+      }
+    }
+
+    /*
+     * feed the bitmap with remaining 0 in the right
+     * if current bitmap is 00011111 and remaining is 3, after feeding the 
bitmap is 11111000
+     */
+    if (rowCount > 0) {
+      int remaining = rowCount % 8;
+      if (remaining != 0) {
+        for (int i = 0; i < resultColumnsLength; ++i) {
+          ReadWriteIOUtils.write(
+              (byte) (currentBitmapList[i] << (8 - remaining)), 
bitmapBAOSList[i]);
+        }
+      }
+    }
 
     return QueryDataSetUtils.packBuffer(
         tsQueryDataSet, timeBAOS, valueBAOSList, bitmapBAOSList, 
resultColumnsLength);
@@ -232,15 +246,31 @@ public class UDTFJoinDataSet extends QueryDataSet 
implements DirectAlignByTimeDa
       }
 
       
rowRecord.addField(fragmentRowRecord.getFields().get(outputColumnIndexInFragmentDataSet));
-      rowRecordsCache[fragmentDataSetIndex] = null;
+    }
+
+    updateRowRecordsCache(minTime);
+
+    return rowRecord;
+  }
+
+  private void updateRowRecordsCache(long minTime) {
+    for (int i = 0, n = fragmentDataSets.length; i < n; ++i) {
+      if (rowRecordsCache[i] == null) {
+        continue;
+      }
+
+      RowRecord fragmentRowRecord = rowRecordsCache[i];
+      if (fragmentRowRecord.getTimestamp() != minTime) {
+        continue;
+      }
+
+      rowRecordsCache[i] = null;
 
-      if (fragmentDataSets[fragmentDataSetIndex].hasNextWithoutConstraint()) {
-        fragmentRowRecord = 
fragmentDataSets[fragmentDataSetIndex].nextWithoutConstraint();
-        rowRecordsCache[fragmentDataSetIndex] = fragmentRowRecord;
+      if (fragmentDataSets[i].hasNextWithoutConstraint()) {
+        fragmentRowRecord = fragmentDataSets[i].nextWithoutConstraint();
+        rowRecordsCache[i] = fragmentRowRecord;
         timeHeap.add(fragmentRowRecord.getTimestamp());
       }
     }
-
-    return rowRecord;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index ab60387..a61d872 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -139,7 +139,7 @@ public class LayerBuilder {
 
   /** TODO: make it configurable */
   public boolean canBeSplitIntoFragments() {
-    return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
+    return 2 <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
   public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet 
udtfAlignByTimeDataSet)

Reply via email to