This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fbddcfd62a2 Pipe Meta: Completed the logical view related logics & 
some bug fixes in IT and dirs (#12159)
fbddcfd62a2 is described below

commit fbddcfd62a26b7af5ea116b6394d0bd630cac442
Author: Caideyipi <[email protected]>
AuthorDate: Tue Mar 12 23:27:23 2024 +0800

    Pipe Meta: Completed the logical view related logics & some bug fixes in IT 
and dirs (#12159)
---
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   |   5 +
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   5 +-
 .../iotdb/confignode/conf/ConfigNodeConstant.java  |  13 -
 .../PipeConfigNodeSnapshotResourceManager.java     |   4 +-
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     |  11 +
 .../iotdb/confignode/service/ConfigNode.java       |   4 +-
 .../pipe/receiver/PipePlanToStatementVisitor.java  |  16 +-
 .../receiver/PipeStatementTSStatusVisitor.java     |  10 +
 .../receiver/thrift/IoTDBDataNodeReceiver.java     |   3 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 261 +++++++++++----------
 .../metadata/view/CreateLogicalViewStatement.java  |  12 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   1 +
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |   9 +-
 13 files changed, 189 insertions(+), 165 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index a3d0e0383a1..c94650817a9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -411,6 +411,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
       Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("extractor.pattern", null);
+      extractorAttributes.put("extractor.inclusion", "data");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
@@ -482,6 +483,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
       Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("extractor.pattern", "root.db1");
+      extractorAttributes.put("extractor.inclusion", "data");
 
       connectorAttributes.put("connector", "iotdb-thrift-connector");
       connectorAttributes.put("connector.batch.enable", "false");
@@ -579,6 +581,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
 
+      extractorAttributes.put("extractor.inclusion", "data");
       extractorAttributes.put("extractor.pattern", "root.db.d2");
       extractorAttributes.put("extractor.history.enable", "false");
       extractorAttributes.put("extractor.realtime.enable", "true");
@@ -664,6 +667,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
       Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("extractor.pattern", "root.db.d1");
+      extractorAttributes.put("extractor.inclusion", "data");
       extractorAttributes.put("extractor.history.enable", "true");
       // 1970-01-01T08:00:02+08:00
       extractorAttributes.put("extractor.history.start-time", "2000");
@@ -813,6 +817,7 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
       Map<String, String> connectorAttributes = new HashMap<>();
 
       extractorAttributes.put("source.pattern", "root.db.d1");
+      extractorAttributes.put("source.inclusion", "data");
       extractorAttributes.put("source.start-time", 
"1970-01-01T08:00:02+08:00");
       // 1970-01-01T08:00:04+08:00
       extractorAttributes.put("source.end-time", "4000");
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 286f8c99e96..6ed563ec137 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -135,11 +135,11 @@ public class ConfigNodeConfig {
 
   /** System directory, including version file for each database and metadata. 
*/
   private String systemDir =
-      ConfigNodeConstant.DATA_DIR + File.separator + 
IoTDBConstant.SYSTEM_FOLDER_NAME;
+      IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + 
IoTDBConstant.SYSTEM_FOLDER_NAME;
 
   /** Consensus directory, storage consensus protocol logs. */
   private String consensusDir =
-      ConfigNodeConstant.DATA_DIR + File.separator + 
ConfigNodeConstant.CONSENSUS_FOLDER;
+      IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + 
IoTDBConstant.CONSENSUS_FOLDER_NAME;
 
   /** External lib directory, stores user-uploaded JAR files. */
   private String extLibDir = IoTDBConstant.EXT_FOLDER_NAME;
@@ -318,6 +318,7 @@ public class ConfigNodeConfig {
     triggerTemporaryLibDir = addHomeDir(triggerTemporaryLibDir);
     pipeDir = addHomeDir(pipeDir);
     pipeTemporaryLibDir = addHomeDir(pipeTemporaryLibDir);
+    pipeReceiverFileDir = addHomeDir(pipeReceiverFileDir);
   }
 
   private String addHomeDir(String dir) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
index 214db2c5956..ca521e3a73c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.confignode.conf;
 
-import java.io.File;
-
 public class ConfigNodeConstant {
 
   public static final String GLOBAL_NAME = "IoTDB-ConfigNode";
@@ -31,25 +29,14 @@ public class ConfigNodeConstant {
   public static final String SYSTEM_FILE_NAME = "confignode-system.properties";
 
   public static final String CONFIGNODE_PACKAGE = 
"org.apache.iotdb.confignode.service";
-  public static final String JMX_TYPE = "type";
   public static final String CONFIGNODE_JMX_PORT = "confignode.jmx.port";
 
-  public static final String DATA_DIR = "data" + File.separator + "confignode";
-  public static final String CONF_DIR = "conf";
-  public static final String CONSENSUS_FOLDER = "consensus";
-  public static final String UDF_FOLDER = "udf";
-
-  public static final int MIN_SUPPORTED_JDK_VERSION = 8;
-
   public static final String REMOVE_CONFIGNODE_USAGE =
       "Executed failed, check usage: 
<Node-id>/<internal_address>:<internal_port>";
 
   public static final String REMOVE_DATANODE_PROCESS = 
"[REMOVE_DATANODE_PROCESS]";
   public static final String REGION_MIGRATE_PROCESS = 
"[REGION_MIGRATE_PROCESS]";
 
-  public static final String IOTDB_FOREGROUND = "iotdb-foreground";
-  public static final String IOTDB_PIDFILE = "iotdb-pidfile";
-
   private ConfigNodeConstant() {
     // empty constructor
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
index a439c0ea68b..4ab5a540940 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/snapshot/PipeConfigNodeSnapshotResourceManager.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.confignode.manager.pipe.resource.snapshot;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.resource.PipeSnapshotResourceManager;
-import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -28,7 +28,7 @@ import java.util.HashSet;
 public class PipeConfigNodeSnapshotResourceManager extends 
PipeSnapshotResourceManager {
 
   private PipeConfigNodeSnapshotResourceManager() {
-    super(new 
HashSet<>(Collections.singletonList(ConfigNodeConstant.CONSENSUS_FOLDER)));
+    super(new 
HashSet<>(Collections.singletonList(IoTDBConstant.CONSENSUS_FOLDER_NAME)));
   }
 
   private static class PipeConfigNodeSnapshotResourceManagerHolder {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
index efae2511b43..498ef992f3a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/transfer/agent/receiver/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSche
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeUnsetSchemaTemplatePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan;
@@ -166,6 +167,16 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
     return super.visitPipeDeleteTimeSeries(pipeDeleteTimeSeriesPlan, context);
   }
 
+  @Override
+  public TSStatus visitPipeDeleteLogicalView(
+      PipeDeleteLogicalViewPlan pipeDeleteLogicalViewPlan, TSStatus context) {
+    if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return super.visitPipeDeleteLogicalView(pipeDeleteLogicalViewPlan, 
context);
+  }
+
   @Override
   public TSStatus visitPipeDeactivateTemplate(
       PipeDeactivateTemplatePlan pipeDeactivateTemplatePlan, TSStatus context) 
{
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index dfa10dc45bb..3c756f1e32f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -93,7 +93,7 @@ public class ConfigNode implements ConfigNodeMBean {
       String.format(
           "%s:%s=%s",
           IoTDBConstant.IOTDB_SERVICE_JMX_NAME,
-          ConfigNodeConstant.JMX_TYPE,
+          IoTDBConstant.JMX_TYPE,
           ServiceType.CONFIG_NODE.getJmxName());
   private final RegisterManager registerManager = new RegisterManager();
 
@@ -238,7 +238,7 @@ public class ConfigNode implements ConfigNodeMBean {
   }
 
   void processPid() {
-    String pidFile = System.getProperty(ConfigNodeConstant.IOTDB_PIDFILE);
+    String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE);
     if (pidFile != null) {
       new File(pidFile).deleteOnExit();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
index 1616dd02917..3d4f8a6b6b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipePlanToStatementVisitor.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.pipe.receiver;
 
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -46,7 +45,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesS
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
-import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -133,18 +131,10 @@ public class PipePlanToStatementVisitor extends 
PlanVisitor<Statement, Void> {
           Objects.nonNull(group.getAttributesList())
               ? group.getAttributesList()
               : new ArrayList<>());
-      try {
-        if (Objects.nonNull(group.getMeasurements())) {
-          for (int i = 0; i < group.getMeasurements().size(); ++i) {
-            paths.add(
-                new PartialPath(path2Group.getKey().getFullPath(), 
group.getMeasurements().get(i)));
-          }
+      if (Objects.nonNull(group.getMeasurements())) {
+        for (int i = 0; i < group.getMeasurements().size(); ++i) {
+          
paths.add(path2Group.getKey().concatNode(group.getMeasurements().get(i)));
         }
-      } catch (IllegalPathException e) {
-        LOGGER.error(
-            "failed to create multi timeseries statement because of {}", 
e.getMessage(), e);
-        throw new PipeException(
-            "failed to create multi timeseries statement because of " + 
e.getMessage(), e);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
index 814ec1e1a83..4d112b841da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTSStatusVisitor.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateAlignedTime
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateTimeSeriesStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.ActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.template.BatchActivateTemplateStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 /**
@@ -155,6 +156,15 @@ public class PipeStatementTSStatusVisitor extends 
StatementVisitor<TSStatus, TSS
     return visitStatement(alterTimeSeriesStatement, context);
   }
 
+  @Override
+  public TSStatus visitCreateLogicalView(CreateLogicalViewStatement statement, 
TSStatus context) {
+    if (context.getCode() == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(context.getMessage());
+    }
+    return super.visitCreateLogicalView(statement, context);
+  }
+
   @Override
   public TSStatus visitActivateTemplate(
       ActivateTemplateStatement activateTemplateStatement, TSStatus context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
index 0b99a5859ed..946a456645c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
@@ -281,7 +281,8 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq 
req) {
-    // TODO: parse exception and status for alter logical view node
+    // We may be able to skip the alter logical view's exception parsing 
because
+    // the "AlterLogicalViewNode" is itself idempotent
     return req.getPlanNode() instanceof AlterLogicalViewNode
         ? new TPipeTransferResp(
             ClusterConfigTaskExecutor.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index d6b16b15087..198ef3f6b58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -3318,56 +3318,71 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     analysis.setWhereExpression(whereExpression);
   }
 
-  // region view
+  // Region view
 
-  /**
-   * Compute how many paths exist, get the schema tree and the number of 
existed paths.
-   *
-   * @return a pair of ISchemaTree, and the number of exist paths.
-   */
-  private Pair<ISchemaTree, Integer> fetchSchemaOfPathsAndCount(
-      List<PartialPath> pathList, Analysis analysis, MPPQueryContext context) {
-    ISchemaTree schemaTree = analysis.getSchemaTree();
-    if (schemaTree == null) {
-      // source is not represented by query, thus has not done fetch schema.
-      PathPatternTree pathPatternTree = new PathPatternTree();
-      for (PartialPath path : pathList) {
-        pathPatternTree.appendPathPattern(path);
+  // Create Logical View
+  @Override
+  public Analysis visitCreateLogicalView(
+      CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext 
context) {
+    Analysis analysis = new Analysis();
+    context.setQueryType(QueryType.WRITE);
+    analysis.setStatement(createLogicalViewStatement);
+
+    if (createLogicalViewStatement.getViewExpressions() == null) {
+      // Analyze query in statement
+      QueryStatement queryStatement = 
createLogicalViewStatement.getQueryStatement();
+      if (queryStatement != null) {
+        Pair<List<Expression>, Analysis> queryAnalysisPair =
+            this.analyzeQueryInLogicalViewStatement(analysis, queryStatement, 
context);
+        if (queryAnalysisPair.right.isFinishQueryAfterAnalyze()) {
+          return analysis;
+        } else if (queryAnalysisPair.left != null) {
+          try {
+            
createLogicalViewStatement.setSourceExpressions(queryAnalysisPair.left);
+          } catch (UnsupportedViewException e) {
+            analysis.setFinishQueryAfterAnalyze(true);
+            analysis.setFailStatus(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
+            return analysis;
+          }
+        }
       }
-      schemaTree = this.schemaFetcher.fetchSchema(pathPatternTree, true, 
context);
+      // Only check and use source paths when view expressions are not set 
because there
+      // is no need to check source when renaming views and the check may not 
be satisfied
+      // when the statement is generated by pipe
+      checkSourcePathsInCreateLogicalView(analysis, 
createLogicalViewStatement);
+      if (analysis.isFinishQueryAfterAnalyze()) {
+        return analysis;
+      }
+
+      // Make sure there is no view in source
+      List<Expression> sourceExpressionList = 
createLogicalViewStatement.getSourceExpressionList();
+      checkViewsInSource(analysis, sourceExpressionList, context);
+      if (analysis.isFinishQueryAfterAnalyze()) {
+        return analysis;
+      }
+
+      // Use source and into item to generate target views
+      // If expressions are filled the target paths must be filled likewise
+      createLogicalViewStatement.parseIntoItemIfNecessary();
     }
 
-    // search each path, make sure they all exist.
-    int numOfExistPaths = 0;
-    for (PartialPath path : pathList) {
-      Pair<List<MeasurementPath>, Integer> pathPair = 
schemaTree.searchMeasurementPaths(path);
-      numOfExistPaths += !pathPair.left.isEmpty() ? 1 : 0;
+    // Check target paths.
+    checkTargetPathsInCreateLogicalView(analysis, createLogicalViewStatement);
+    if (analysis.isFinishQueryAfterAnalyze()) {
+      return analysis;
     }
-    return new Pair<>(schemaTree, numOfExistPaths);
-  }
 
-  /**
-   * @param pathList the paths you want to check
-   * @param schemaTree the given schema tree
-   * @return if all paths you give can be found in schema tree, return a pair 
of view paths and
-   *     null; else return view paths and the non-exist path.
-   */
-  private Pair<List<PartialPath>, PartialPath> findAllViewsInPaths(
-      List<PartialPath> pathList, ISchemaTree schemaTree) {
-    List<PartialPath> result = new ArrayList<>();
-    for (PartialPath path : pathList) {
-      Pair<List<MeasurementPath>, Integer> measurementPathList =
-          schemaTree.searchMeasurementPaths(path);
-      if (measurementPathList.left.isEmpty()) {
-        return new Pair<>(result, path);
-      }
-      for (MeasurementPath measurementPath : measurementPathList.left) {
-        if (measurementPath.getMeasurementSchema().isLogicalView()) {
-          result.add(measurementPath);
-        }
-      }
+    // Set schema partition info, this info will be used to split logical plan 
node.
+    PathPatternTree patternTree = new PathPatternTree();
+    for (PartialPath thisFullPath : 
createLogicalViewStatement.getTargetPathList()) {
+      patternTree.appendFullPath(thisFullPath);
     }
-    return new Pair<>(result, null);
+    SchemaPartition schemaPartitionInfo =
+        partitionFetcher.getOrCreateSchemaPartition(
+            patternTree, context.getSession().getUserName());
+    analysis.setSchemaPartitionInfo(schemaPartitionInfo);
+
+    return analysis;
   }
 
   private Pair<List<Expression>, Analysis> analyzeQueryInLogicalViewStatement(
@@ -3404,6 +3419,32 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return new Pair<>(expressionList, analysis);
   }
 
+  private void checkSourcePathsInCreateLogicalView(
+      Analysis analysis, CreateLogicalViewStatement 
createLogicalViewStatement) {
+    Pair<Boolean, String> checkResult =
+        createLogicalViewStatement.checkSourcePathsIfNotUsingQueryStatement();
+    if (Boolean.FALSE.equals(checkResult.left)) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+              "The path " + checkResult.right + " is illegal."));
+      return;
+    }
+
+    List<PartialPath> targetPathList = 
createLogicalViewStatement.getTargetPathList();
+    if (createLogicalViewStatement.getSourceExpressionList().size() != 
targetPathList.size()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
+              String.format(
+                  "The number of target paths (%d) and sources (%d) are miss 
matched! Please check your SQL.",
+                  createLogicalViewStatement.getTargetPathList().size(),
+                  
createLogicalViewStatement.getSourceExpressionList().size())));
+    }
+  }
+
   private void checkViewsInSource(
       Analysis analysis, List<Expression> sourceExpressionList, 
MPPQueryContext context) {
     List<PartialPath> pathsNeedCheck = new ArrayList<>();
@@ -3415,7 +3456,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     Pair<ISchemaTree, Integer> schemaOfNeedToCheck =
         fetchSchemaOfPathsAndCount(pathsNeedCheck, analysis, context);
     if (schemaOfNeedToCheck.right != pathsNeedCheck.size()) {
-      // some source paths is not exist, and could not fetch schema.
+      // Some source paths is not exist, and could not fetch schema.
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
           RpcUtils.getStatus(
@@ -3426,7 +3467,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     Pair<List<PartialPath>, PartialPath> viewInSourceCheckResult =
         findAllViewsInPaths(pathsNeedCheck, schemaOfNeedToCheck.left);
     if (viewInSourceCheckResult.right != null) {
-      // some source paths is not exist
+      // Some source paths is not exist
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
           RpcUtils.getStatus(
@@ -3437,7 +3478,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       return;
     }
     if (!viewInSourceCheckResult.left.isEmpty()) {
-      // some source paths is logical view
+      // Some source paths is logical view
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
           RpcUtils.getStatus(
@@ -3446,9 +3487,59 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  private void checkPathsInCreateLogicalView(
+  /**
+   * Compute how many paths exist, get the schema tree and the number of 
existed paths.
+   *
+   * @return a pair of ISchemaTree, and the number of exist paths.
+   */
+  private Pair<ISchemaTree, Integer> fetchSchemaOfPathsAndCount(
+      List<PartialPath> pathList, Analysis analysis, MPPQueryContext context) {
+    ISchemaTree schemaTree = analysis.getSchemaTree();
+    if (schemaTree == null) {
+      // source is not represented by query, thus has not done fetch schema.
+      PathPatternTree pathPatternTree = new PathPatternTree();
+      for (PartialPath path : pathList) {
+        pathPatternTree.appendPathPattern(path);
+      }
+      schemaTree = this.schemaFetcher.fetchSchema(pathPatternTree, true, 
context);
+    }
+
+    // search each path, make sure they all exist.
+    int numOfExistPaths = 0;
+    for (PartialPath path : pathList) {
+      Pair<List<MeasurementPath>, Integer> pathPair = 
schemaTree.searchMeasurementPaths(path);
+      numOfExistPaths += !pathPair.left.isEmpty() ? 1 : 0;
+    }
+    return new Pair<>(schemaTree, numOfExistPaths);
+  }
+
+  /**
+   * @param pathList the paths you want to check
+   * @param schemaTree the given schema tree
+   * @return if all paths you give can be found in schema tree, return a pair 
of view paths and
+   *     null; else return view paths and the non-exist path.
+   */
+  private Pair<List<PartialPath>, PartialPath> findAllViewsInPaths(
+      List<PartialPath> pathList, ISchemaTree schemaTree) {
+    List<PartialPath> result = new ArrayList<>();
+    for (PartialPath path : pathList) {
+      Pair<List<MeasurementPath>, Integer> measurementPathList =
+          schemaTree.searchMeasurementPaths(path);
+      if (measurementPathList.left.isEmpty()) {
+        return new Pair<>(result, path);
+      }
+      for (MeasurementPath measurementPath : measurementPathList.left) {
+        if (measurementPath.getMeasurementSchema().isLogicalView()) {
+          result.add(measurementPath);
+        }
+      }
+    }
+    return new Pair<>(result, null);
+  }
+
+  private void checkTargetPathsInCreateLogicalView(
       Analysis analysis, CreateLogicalViewStatement 
createLogicalViewStatement) {
-    Pair<Boolean, String> checkResult = 
createLogicalViewStatement.checkAllPaths();
+    Pair<Boolean, String> checkResult = 
createLogicalViewStatement.checkTargetPaths();
     if (Boolean.FALSE.equals(checkResult.left)) {
       analysis.setFinishQueryAfterAnalyze(true);
       analysis.setFailStatus(
@@ -3457,8 +3548,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
               "The path " + checkResult.right + " is illegal."));
       return;
     }
-    // make sure there are no redundant paths in targets. Please note that 
redundant paths in source
-    // are legal!
+    // Make sure there are no redundant paths in targets. Note that redundant 
paths in source
+    // are legal.
     List<PartialPath> targetPathList = 
createLogicalViewStatement.getTargetPathList();
     Set<String> targetStringSet = new HashSet<>();
     for (PartialPath path : targetPathList) {
@@ -3472,18 +3563,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         return;
       }
     }
-    if (createLogicalViewStatement.getSourceExpressionList().size() != 
targetPathList.size()) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      analysis.setFailStatus(
-          RpcUtils.getStatus(
-              TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
-              String.format(
-                  "The number of target paths (%d) and sources (%d) are miss 
matched! Please check your SQL.",
-                  createLogicalViewStatement.getTargetPathList().size(),
-                  
createLogicalViewStatement.getSourceExpressionList().size())));
-      return;
-    }
-    // make sure all paths are NOt under any template
+    // Make sure all paths are not under any templates
     try {
       for (PartialPath path : createLogicalViewStatement.getTargetPathList()) {
         checkIsTemplateCompatible(path, null);
@@ -3497,63 +3577,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  // create Logical View
-  @Override
-  public Analysis visitCreateLogicalView(
-      CreateLogicalViewStatement createLogicalViewStatement, MPPQueryContext 
context) {
-    Analysis analysis = new Analysis();
-    context.setQueryType(QueryType.WRITE);
-    analysis.setStatement(createLogicalViewStatement);
-
-    if (createLogicalViewStatement.getViewExpressions() == null) {
-      // analyze query in statement
-      QueryStatement queryStatement = 
createLogicalViewStatement.getQueryStatement();
-      if (queryStatement != null) {
-        Pair<List<Expression>, Analysis> queryAnalysisPair =
-            this.analyzeQueryInLogicalViewStatement(analysis, queryStatement, 
context);
-        if (queryAnalysisPair.right.isFinishQueryAfterAnalyze()) {
-          return analysis;
-        } else if (queryAnalysisPair.left != null) {
-          try {
-            
createLogicalViewStatement.setSourceExpressions(queryAnalysisPair.left);
-          } catch (UnsupportedViewException e) {
-            analysis.setFinishQueryAfterAnalyze(true);
-            analysis.setFailStatus(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
-            return analysis;
-          }
-        }
-      }
-    }
-
-    // use source and into item to generate target views
-    createLogicalViewStatement.parseIntoItemIfNecessary();
-
-    // check target paths; check source expressions.
-    checkPathsInCreateLogicalView(analysis, createLogicalViewStatement);
-    if (analysis.isFinishQueryAfterAnalyze()) {
-      return analysis;
-    }
-
-    // make sure there is no view in source
-    List<Expression> sourceExpressionList = 
createLogicalViewStatement.getSourceExpressionList();
-    checkViewsInSource(analysis, sourceExpressionList, context);
-    if (analysis.isFinishQueryAfterAnalyze()) {
-      return analysis;
-    }
-
-    // set schema partition info, this info will be used to split logical plan 
node.
-    PathPatternTree patternTree = new PathPatternTree();
-    for (PartialPath thisFullPath : 
createLogicalViewStatement.getTargetPathList()) {
-      patternTree.appendFullPath(thisFullPath);
-    }
-    SchemaPartition schemaPartitionInfo =
-        partitionFetcher.getOrCreateSchemaPartition(
-            patternTree, context.getSession().getUserName());
-    analysis.setSchemaPartitionInfo(schemaPartitionInfo);
-
-    return analysis;
-  }
-
   @Override
   public Analysis visitShowLogicalView(
       ShowLogicalViewStatement showLogicalViewStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
index f6cbb097260..29dc6fab471 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/view/CreateLogicalViewStatement.java
@@ -235,8 +235,8 @@ public class CreateLogicalViewStatement extends Statement {
   /**
    * Check errors in targetPaths.
    *
-   * @return Pair<Boolean, String>. True: checks passed; False: checks failed. 
if check failed,
-   *     return the string of illegal path.
+   * @return Pair {@literal <}Boolean, String{@literal >}. True: checks 
passed; False: checks
+   *     failed. If check failed, return the string of illegal path.
    */
   public Pair<Boolean, String> checkTargetPaths() {
     for (PartialPath thisPath : this.getTargetPathList()) {
@@ -251,8 +251,8 @@ public class CreateLogicalViewStatement extends Statement {
    * Check errors in sourcePaths. Only usable when not using read statement. 
If this statement is
    * generated with a read statement, check always pass; if not, check each 
full paths.
    *
-   * @return Pair<Boolean, String>. True: checks passed; False: checks failed. 
if check failed,
-   *     return the string of illegal path.
+   * @return Pair {@literal <}Boolean, String{@literal >}. True: checks 
passed; False: checks
+   *     failed. If check failed, return the string of illegal path.
    */
   public Pair<Boolean, String> checkSourcePathsIfNotUsingQueryStatement() {
     if (this.sourcePaths.viewPathType == ViewPathType.PATHS_GROUP
@@ -267,8 +267,8 @@ public class CreateLogicalViewStatement extends Statement {
   }
 
   /**
-   * @return return true if checks passed; else return false. if check failed, 
return the string of
-   *     illegal path.
+   * @return return {@link true} if checks passed; else return {@link false}. 
If check failed,
+   *     return the string of illegal path.
    */
   public Pair<Boolean, String> checkAllPaths() {
     Pair<Boolean, String> result = this.checkTargetPaths();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 801825e29ce..35581b06705 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.commons.conf;
 
 import java.io.File;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index b0ce404cd92..737313ec378 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
@@ -53,12 +53,7 @@ public class PipeMeta {
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
   }
 
-  public void serialize(DataOutputStream outputStream) throws IOException {
-    staticMeta.serialize(outputStream);
-    runtimeMeta.serialize(outputStream);
-  }
-
-  public void serialize(FileOutputStream outputStream) throws IOException {
+  public void serialize(OutputStream outputStream) throws IOException {
     staticMeta.serialize(outputStream);
     runtimeMeta.serialize(outputStream);
   }


Reply via email to