This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9086826929d0455d34c1aabe82dcb3fe2c513225
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Dec 1 17:35:47 2021 +0800

    init QueryDataSet and fix bug
---
 .../db/query/dataset/udf/UDTFAlignByTimeDataSet.java  |  2 +-
 .../iotdb/db/query/dataset/udf/UDTFJoinDataSet.java   | 19 ++++++++++---------
 .../iotdb/db/query/udf/core/layer/LayerBuilder.java   |  8 ++++++--
 .../iotdb/tsfile/read/query/dataset/QueryDataSet.java | 18 ++++++++++++++++++
 4 files changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index b0b6a3e..7572516 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -111,7 +111,7 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet 
implements DirectAlignBy
   public QueryDataSet executeInFragmentsIfPossible() throws 
QueryProcessException, IOException {
     // TODO make the behaviour of the return value of 
layerBuilder.generateJoinDataSet() the same as
     // TODO the original dataset
-    return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet() : 
this;
+    return canBeSplitIntoFragments() ? layerBuilder.generateJoinDataSet(this) 
: this;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index 323e1c2..f92f600 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,17 +19,15 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
-import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
-import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
-import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
-import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 
-public class UDTFJoinDataSet extends QueryDataSet implements 
DirectAlignByTimeDataSet {
+public class UDTFJoinDataSet extends QueryDataSet
+//    implements DirectAlignByTimeDataSet
+{
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
 
@@ -51,9 +49,12 @@ public class UDTFJoinDataSet extends QueryDataSet implements 
DirectAlignByTimeDa
   private TimeSelector timeHeap;
 
   public UDTFJoinDataSet(
+      UDTFAlignByTimeDataSet udtfAlignByTimeDataSet,
       UDTFFragmentDataSet[] fragmentDataSets,
       int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
       throws IOException {
+    super(udtfAlignByTimeDataSet);
+
     this.fragmentDataSets = fragmentDataSets;
     this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
         resultColumnOutputIndexToFragmentDataSetOutputIndex;
@@ -114,8 +115,8 @@ public class UDTFJoinDataSet extends QueryDataSet 
implements DirectAlignByTimeDa
     return rowRecord;
   }
 
-  @Override
-  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder encoder) {
-    throw new NotImplementedException();
-  }
+  //  @Override
+  //  public TSQueryDataSet fillBuffer(int fetchSize, WatermarkEncoder 
encoder) {
+  //    throw new NotImplementedException();
+  //  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index a7b0582..ab60387 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
+import org.apache.iotdb.db.query.dataset.udf.UDTFAlignByTimeDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFFragmentDataSet;
 import org.apache.iotdb.db.query.dataset.udf.UDTFJoinDataSet;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -141,7 +142,8 @@ public class LayerBuilder {
     return 4 <= fragmentDataSetIndexToLayerPointReaders.size();
   }
 
-  public QueryDataSet generateJoinDataSet() throws QueryProcessException, 
IOException {
+  public QueryDataSet generateJoinDataSet(UDTFAlignByTimeDataSet 
udtfAlignByTimeDataSet)
+      throws QueryProcessException, IOException {
     int n = fragmentDataSetIndexToLayerPointReaders.size();
     UDTFFragmentDataSet[] fragmentDataSets = new UDTFFragmentDataSet[n];
     for (int i = 0; i < n; ++i) {
@@ -152,6 +154,8 @@ public class LayerBuilder {
     }
 
     return new UDTFJoinDataSet(
-        fragmentDataSets, resultColumnOutputIndexToFragmentDataSetOutputIndex);
+        udtfAlignByTimeDataSet,
+        fragmentDataSets,
+        resultColumnOutputIndexToFragmentDataSetOutputIndex);
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
index e1e2f4f..c418e51 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/dataset/QueryDataSet.java
@@ -50,6 +50,24 @@ public abstract class QueryDataSet {
 
   protected int columnNum;
 
+  public QueryDataSet(QueryDataSet that) {
+    this.paths = that.paths;
+    this.dataTypes = that.dataTypes;
+
+    this.rowLimit = that.rowLimit;
+    this.rowOffset = that.rowOffset;
+    this.alreadyReturnedRowNum = that.alreadyReturnedRowNum;
+    this.fetchSize = that.fetchSize;
+    this.ascending = that.ascending;
+
+    this.endPoint = that.endPoint;
+
+    this.withoutAnyNull = that.withoutAnyNull;
+    this.withoutAllNull = that.withoutAllNull;
+
+    this.columnNum = that.columnNum;
+  }
+
   /** For redirect query. Need keep consistent with EndPoint in rpc.thrift. */
   public static class EndPoint {
     private String ip = null;

Reply via email to