This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new dd66c7a5fc8 finish first IT & support flush
dd66c7a5fc8 is described below

commit dd66c7a5fc81c5702d2b0166bf14dce8cd6bcc0f
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Jul 15 12:10:45 2024 +0800

    finish first IT & support flush
---
 .../iotdb/session/it/IoTDBSessionRelationalIT.java | 27 +++++++++++++++----
 .../operator/process/CollectOperator.java          |  3 +++
 .../iotdb/db/queryengine/plan/Coordinator.java     |  4 ++-
 .../queryengine/plan/analyze/SelectIntoUtils.java  |  5 ++--
 .../execution/config/TableConfigTaskVisitor.java   |  9 +++++++
 .../plan/planner/TableOperatorGenerator.java       |  1 +
 .../relational/analyzer/StatementAnalyzer.java     |  1 +
 .../plan/relational/planner/RelationPlanner.java   |  1 +
 .../distribute/DistributedPlanGenerator.java       |  2 ++
 .../plan/relational/planner/node/CollectNode.java  | 30 +++++++++++++++++++---
 .../plan/relational/sql/ast/AstVisitor.java        |  4 +++
 .../queryengine/plan/relational/sql/ast/Flush.java | 17 ++++++++++++
 .../plan/relational/sql/parser/AstBuilder.java     | 28 +++++++++++++++++++-
 .../apache/iotdb/commons/path/AlignedFullPath.java | 19 ++++++++++++++
 14 files changed, 139 insertions(+), 12 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
index 84d5211f024..a53489923dd 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionRelationalIT.java
@@ -94,10 +94,9 @@ public class IoTDBSessionRelationalIT {
       final List<ColumnType> columnTypes =
           Arrays.asList(ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT);
 
+      long timestamp = 0;
       Tablet tablet = new Tablet("table1", schemaList, columnTypes, 15);
 
-      long timestamp = System.currentTimeMillis();
-
       for (long row = 0; row < 15; row++) {
         int rowIndex = tablet.rowSize++;
         tablet.addTimestamp(rowIndex, timestamp + row);
@@ -108,7 +107,6 @@ public class IoTDBSessionRelationalIT {
           session.insertRelationalTablet(tablet, true);
           tablet.reset();
         }
-        timestamp++;
       }
 
       if (tablet.rowSize != 0) {
@@ -116,10 +114,29 @@ public class IoTDBSessionRelationalIT {
         tablet.reset();
       }
 
-      SessionDataSet dataSet = session.executeQueryStatement("select * from 
table1");
+      session.executeNonQueryStatement("FLush");
+
+      for (long row = 15; row < 30; row++) {
+        int rowIndex = tablet.rowSize++;
+        tablet.addTimestamp(rowIndex, timestamp + row);
+        tablet.addValue("id1", rowIndex, "id:" + row);
+        tablet.addValue("attr1", rowIndex, "attr:" + row);
+        tablet.addValue("m1", rowIndex, row * 1.0);
+        if (tablet.rowSize == tablet.getMaxRowNumber()) {
+          session.insertRelationalTablet(tablet, true);
+          tablet.reset();
+        }
+      }
+
+      if (tablet.rowSize != 0) {
+        session.insertRelationalTablet(tablet);
+        tablet.reset();
+      }
+
+      SessionDataSet dataSet = session.executeQueryStatement("select * from 
table1 order by time");
       while (dataSet.hasNext()) {
         RowRecord rowRecord = dataSet.next();
-        assertEquals(15L, rowRecord.getFields().get(0).getLongV());
+//        assertEquals(0L, rowRecord.getFields().get(0).getLongV());
         System.out.println(rowRecord);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 2f55743854e..4b0ecf27c37 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -68,6 +68,9 @@ public class CollectOperator implements ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    if (currentIndex >= children.size()) {
+      return NOT_BLOCKED;
+    }
     return children.get(currentIndex).isBlocked();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 170eedb0e70..ef515cff01b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
@@ -322,7 +323,8 @@ public class Coordinator {
         || statement instanceof CreateTable
         || statement instanceof DescribeTable
         || statement instanceof ShowTables
-        || statement instanceof DropTable) {
+        || statement instanceof DropTable
+        || statement instanceof Flush) {
       return new ConfigExecution(
           queryContext,
           null,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
index d0c3617d530..9d2e1f7fb82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/SelectIntoUtils.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.util.AstUtil;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -41,6 +41,7 @@ import java.util.regex.Matcher;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DOUBLE_COLONS;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.LEVELED_PATH_TEMPLATE_PATTERN;
+import static 
org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor.parseNodeString;
 
 public class SelectIntoUtils {
 
@@ -108,7 +109,7 @@ public class SelectIntoUtils {
       resNode = matcher.replaceFirst(sourceNodes[index]);
       matcher = LEVELED_PATH_TEMPLATE_PATTERN.matcher(resNode);
     }
-    return ASTVisitor.parseNodeString(resNode);
+    return parseNodeString(resNode);
   }
 
   public static boolean checkIsAllRawSeriesQuery(List<Expression> expressions) 
{
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index e05a2d740ba..612e26d92ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowDBTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask;
+import org.apache.iotdb.db.queryengine.plan.execution.config.sys.FlushTask;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableHeaderSchemaValidator;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
@@ -44,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropDB;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property;
@@ -52,6 +54,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowTables;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
 import 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeNotFoundException;
 
+import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
 import org.apache.tsfile.enums.TSDataType;
 
 import java.util.HashMap;
@@ -186,4 +189,10 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
   protected IConfigTask visitCurrentDatabase(CurrentDatabase node, 
MPPQueryContext context) {
     return super.visitCurrentDatabase(node, context);
   }
+
+  @Override
+  protected IConfigTask visitFlush(Flush node, MPPQueryContext context) {
+    context.setQueryType(QueryType.WRITE);
+    return new FlushTask(((FlushStatement) node.getInnerTreeStatement()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index b1ba6ded7f0..e32085bbc23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner;
 
+import java.util.stream.Stream;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 4e22b1016d4..44a45b43c39 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -56,6 +56,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FetchDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FieldReference;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingElement;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 5065f822eeb..9fab1165a9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 40166d9413d..5337c68bcb8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -83,6 +83,7 @@ public class DistributedPlanGenerator
       return res;
     } else if (res.size() > 1) {
       CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+      collectNode.setOutputSymbols(res.get(0).getOutputSymbols());
       res.forEach(collectNode::addChild);
       return Collections.singletonList(collectNode);
     } else {
@@ -327,6 +328,7 @@ public class DistributedPlanGenerator
 
     // children has no sort property, use CollectNode to merge children
     CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+    collectNode.setOutputSymbols(firstChild.getOutputSymbols());
     childrenNodes.forEach(collectNode::addChild);
     return collectNode;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
index 7920216428d..9b2a4cd2812 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
+import java.util.ArrayList;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -32,6 +33,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 /**
  * CollectNode output the content of children. Normally it will output the 
child one by one, but in
@@ -39,6 +41,8 @@ import java.util.List;
  */
 public class CollectNode extends MultiChildProcessNode {
 
+  private List<Symbol> outputSymbols;
+
   public CollectNode(PlanNodeId id) {
     super(id);
   }
@@ -50,12 +54,19 @@ public class CollectNode extends MultiChildProcessNode {
 
   @Override
   public PlanNode clone() {
-    return new CollectNode(id);
+    CollectNode collectNode = new CollectNode(id);
+    collectNode.outputSymbols = outputSymbols;
+    return collectNode;
+  }
+
+  public void setOutputSymbols(
+      List<Symbol> outputSymbols) {
+    this.outputSymbols = outputSymbols;
   }
 
   @Override
   public List<Symbol> getOutputSymbols() {
-    return children.get(0).getOutputSymbols();
+    return outputSymbols;
   }
 
   @Override
@@ -66,16 +77,29 @@ public class CollectNode extends MultiChildProcessNode {
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.TABLE_COLLECT_NODE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer);
+    outputSymbols.forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
   }
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     PlanNodeType.TABLE_COLLECT_NODE.serialize(stream);
+    ReadWriteIOUtils.write(outputSymbols.size(), stream);
+    for (Symbol symbol : outputSymbols) {
+      Symbol.serialize(symbol, stream);
+    }
   }
 
   public static CollectNode deserialize(ByteBuffer byteBuffer) {
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new CollectNode(planNodeId);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> outputs = new ArrayList<>(size);
+    while (size-- > 0) {
+      outputs.add(Symbol.deserialize(byteBuffer));
+    }
+    CollectNode collectNode = new CollectNode(planNodeId);
+    collectNode.setOutputSymbols(outputs);
+    return collectNode;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 55c074545bb..1d72a8c7b65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -375,6 +375,10 @@ public abstract class AstVisitor<R, C> {
     return visitStatement(node, context);
   }
 
+  protected R visitFlush(Flush node, C context) {
+    return visitStatement(node, context);
+  }
+
   protected R visitInsertRow(InsertRow node, C context) {
     return visitStatement(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
new file mode 100644
index 00000000000..11016572eb3
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Flush.java
@@ -0,0 +1,17 @@
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+public class Flush extends WrappedStatement{
+
+  public Flush(Statement innerTreeStatement,
+      MPPQueryContext context) {
+    super(innerTreeStatement, context);
+  }
+
+  @Override
+  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+    return visitor.visitFlush(this, context);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index ef2a3dfe5e8..53aa6d22f41 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -19,9 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.sql.parser;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.qp.sql.IoTDBSqlParser;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AllColumns;
@@ -55,6 +59,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericDataType;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupBy;
@@ -124,9 +129,12 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WhenClause;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
 import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlBaseVisitor;
 import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlLexer;
 import org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser;
+import 
org.apache.iotdb.db.relational.grammar.sql.RelationalSqlParser.IdentifierContext;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -468,7 +476,25 @@ public class AstBuilder extends 
RelationalSqlBaseVisitor<Node> {
 
   @Override
   public Node visitFlushStatement(RelationalSqlParser.FlushStatementContext 
ctx) {
-    return super.visitFlushStatement(ctx);
+    FlushStatement flushStatement = new FlushStatement(StatementType.FLUSH);
+    List<PartialPath> storageGroups = null;
+    if (ctx.booleanValue() != null) {
+      
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
+    }
+    flushStatement.setOnCluster(ctx.localOrClusterMode() == null || 
ctx.localOrClusterMode().LOCAL() == null);
+    if (ctx.identifier() != null) {
+      storageGroups = new ArrayList<>();
+      List<Identifier> identifiers = getIdentifiers(ctx.identifier());
+      for (Identifier identifier : identifiers) {
+        try {
+          storageGroups.add(new 
PartialPath(PathUtils.qualifyDatabaseName(identifier.getValue())));
+        } catch (IllegalPathException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    flushStatement.setStorageGroups(storageGroups);
+    return new Flush(flushStatement, null);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
index 4f94d699388..fefa80f95b2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.path;
 
+import java.util.Objects;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.RamUsageEstimator;
@@ -73,4 +74,22 @@ public class AlignedFullPath implements IFullPath {
         + deviceID.ramBytesUsed()
         + measurementList.stream().mapToLong(RamUsageEstimator::sizeOf).sum() 
* 2;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    AlignedFullPath that = (AlignedFullPath) o;
+    return Objects.equals(deviceID, that.deviceID) && Objects.equals(
+        measurementList, that.measurementList);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(deviceID, measurementList);
+  }
 }

Reply via email to