JackieTien97 commented on code in PR #15685:
URL: https://github.com/apache/iotdb/pull/15685#discussion_r2199937925
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/table/v1/impl/RestApiServiceImpl.java:
##########
@@ -125,23 +98,48 @@ public Response executeQueryStatement(SQL sql,
SecurityContext securityContext)
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new
SetThreadName(result.queryId.getId())) {
- return QueryDataSetHandler.fillQueryDataSet(
- queryExecution,
- statement,
- sql.getRowLimit() == null ? defaultQueryRowLimit :
sql.getRowLimit());
+ Response res =
Review Comment:
<img width="1588" height="1778" alt="Image"
src="https://github.com/user-attachments/assets/9d7d04eb-9f7c-4dc4-829d-baaf7a98711d"
/>
It seems not changed
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java:
##########
@@ -593,10 +596,119 @@ protected Scope visitUse(Use node, Optional<Scope>
scope) {
throw new SemanticException("USE statement is not supported yet.");
}
+ private boolean containsAnyFieldColumn(
+ List<String> insertColumns, Map<String, ColumnSchema> columnSchemaMap)
{
+ for (String column : insertColumns) {
+ if (columnSchemaMap.containsKey(column)
+ && columnSchemaMap.get(column).getColumnCategory() ==
TsTableColumnCategory.FIELD) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Do not consider type coercion at the moment
+ private boolean typesMatchForInsert(List<Type> tableTypes, List<Type>
queryTypes) {
+ if (tableTypes.size() != queryTypes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < tableTypes.size(); i++) {
+ if (!tableTypes.get(i).equals(queryTypes.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
protected Scope visitInsert(Insert insert, Optional<Scope> scope) {
- throw new SemanticException(
- "This kind of insert statement is not supported yet, please check
your grammar.");
+ queryContext.setQueryType(QueryType.READ_WRITE);
+ // analyze the query that creates the data
+ Scope queryScope = analyze(insert.getQuery(), Optional.empty(), false);
+
+ // verify the insert table exists
+ QualifiedObjectName targetTable =
+ createQualifiedObjectName(sessionContext, insert.getTarget());
+ if (!metadata.tableExists(targetTable)) {
+ TableMetadataImpl.throwTableNotExistsException(
+ targetTable.getDatabaseName(), targetTable.getObjectName());
+ }
+ // verify access privileges
+ accessControl.checkCanInsertIntoTable(sessionContext.getUserName(),
targetTable);
+
+ // verify the insert destination columns match the query
+ Optional<TableSchema> tableSchema =
metadata.getTableSchema(sessionContext, targetTable);
+ if (!tableSchema.isPresent()) {
+ TableMetadataImpl.throwTableNotExistsException(
+ targetTable.getDatabaseName(), targetTable.getObjectName());
+ }
+ List<ColumnSchema> columns =
+ tableSchema.get().getColumns().stream()
+ .filter(column -> !column.isHidden())
+ .collect(toImmutableList());
+ analysis.registerTable(insert.getTable(), tableSchema, targetTable);
+
+ List<String> tableColumns =
+
columns.stream().map(ColumnSchema::getName).collect(toImmutableList());
+
+ List<String> insertColumns;
Review Comment:
same as above
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3417,6 +3423,68 @@ public Operator visitPatternRecognition(
labelNames);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TreeIntoOperator.class.getSimpleName());
Review Comment:
```suggestion
TableIntoOperator.class.getSimpleName());
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java:
##########
@@ -199,6 +200,18 @@ public List<PlanNode> visitOutput(final OutputNode node,
final PlanContext conte
return Collections.singletonList(node);
}
+ @Override
+ public List<PlanNode> visitInto(final IntoNode node, final PlanContext
context) {
+ final List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+ final OrderingScheme childOrdering =
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+ if (childOrdering != null) {
+ nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+ }
Review Comment:
You don't need to put this planNode into `nodeOrderingMap`,
`nodeOrderingMap` is used to record the output ordering of some plannode,
however there is no ordering for `IntoNode`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableInsertTabletStatementGenerator.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;
+
+public class TableInsertTabletStatementGenerator extends
InsertTabletStatementGenerator {
+
+ private final String databaseName;
+ private final AtomicLong writtenCounter;
+ private final int timeColumnIndex;
+ private final List<TsTableColumnCategory> tsTableColumnCategories;
+
+ public TableInsertTabletStatementGenerator(
+ String databaseName,
+ PartialPath devicePath,
Review Comment:
```suggestion
PartialPath table,
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java:
##########
@@ -98,6 +115,33 @@ public TSStatus insertTablets(InsertMultiTabletsStatement
statement) {
}
}
+ public TSStatus insertRelationalTablets(InsertTabletStatement statement) {
Review Comment:
```suggestion
public TSStatus insertRelationalTablet(InsertTabletStatement statement) {
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class InsertTabletStatementGenerator {
+ protected int rowLimit;
Review Comment:
should be final?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.client.DataNodeInternalClient;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import com.google.common.util.concurrent.Futures;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class TableIntoOperator extends AbstractIntoOperator {
+ protected InsertTabletStatementGenerator insertTabletStatementGenerator;
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TableIntoOperator.class);
+
+ private final PartialPath targetDevice;
Review Comment:
```suggestion
private final PartialPath targetTable;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java:
##########
@@ -1125,6 +1147,87 @@ protected RelationalInsertRowNode fromInsertRowStatement(
return insertNode;
}
+ @Override
+ protected RelationPlan visitInsert(final Insert node, final Void context) {
+ // query plan and visible fields
+ Query query = node.getQuery();
+ RelationPlan plan = process(query, null);
+ List<Symbol> visibleFieldMappings = visibleFields(plan);
+
+ // table columns
+ Table table = node.getTable();
+ QualifiedObjectName targetTable = createQualifiedObjectName(sessionInfo,
table.getName());
+ Optional<TableSchema> tableSchema = metadata.getTableSchema(sessionInfo,
targetTable);
+ if (!tableSchema.isPresent()) {
+ TableMetadataImpl.throwTableNotExistsException(
+ targetTable.getDatabaseName(), targetTable.getObjectName());
+ }
+ List<ColumnSchema> tableColumns = tableSchema.get().getColumns();
+ Map<String, ColumnSchema> columnSchemaMap =
tableSchema.get().getColumnSchemaMap();
+
+ // insert columns
+ Analysis.Insert insert = analysis.getInsert();
+ List<ColumnSchema> insertColumns = insert.getColumns();
+
+ // prepare Assignments and ColumnSchema builder
+ Assignments.Builder assignments = Assignments.builder();
+ ImmutableList.Builder<ColumnSchema> insertedColumnsBuilder =
ImmutableList.builder();
+
+ // insert null if table column is not in query columns.
+ for (ColumnSchema column : tableColumns) {
+ if (column.isHidden()) {
+ continue;
+ }
+ Symbol output = symbolAllocator.newSymbol(column.getName(),
column.getType());
+ Expression expression;
+ Type tableType = column.getType();
+ int index = insertColumns.indexOf(columnSchemaMap.get(column.getName()));
+ if (index < 0) {
+ expression = new NullLiteral();
+ } else {
+ Symbol input = visibleFieldMappings.get(index);
+ Type queryType = symbolAllocator.getTypes().getTableModelType(input);
+ if (!queryType.equals(tableType)) {
+ throw new SemanticException(
+ String.format(
+ "Insert query has mismatched column type: Table: [%s],
Query: [%s]",
+ tableType, queryType));
+ }
+ expression = input.toSymbolReference();
+ }
+ assignments.put(output, expression);
+ insertedColumnsBuilder.add(column);
+ }
+
+ // Project Node
+ ProjectNode projectNode =
+ new ProjectNode(
+ queryContext.getQueryId().genPlanNodeId(), plan.getRoot(),
assignments.build());
+ List<ColumnSchema> insertedColumns = insertedColumnsBuilder.build();
+ List<Field> fields =
+ insertedColumns.stream()
+ .map(
+ column ->
+ Field.newUnqualified(
+ column.getName(), column.getType(),
column.getColumnCategory()))
+ .collect(toImmutableList());
+ Scope scope =
+ Scope.builder().withRelationType(RelationId.anonymous(), new
RelationType(fields)).build();
+ plan = new RelationPlan(projectNode, scope,
projectNode.getOutputSymbols(), Optional.empty());
+
+ // Into Node
+ IntoNode intoNode =
+ new IntoNode(
+ queryContext.getQueryId().genPlanNodeId(),
+ plan.getRoot(),
+ targetTable.getDatabaseName(),
+ table.getName().getSuffix(),
+ tableColumns,
+ symbolAllocator.newSymbol("rows", LongType.INT64));
Review Comment:
"rows" should be a constant in `IntoNode`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3417,6 +3423,68 @@ public Operator visitPatternRecognition(
labelNames);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TreeIntoOperator.class.getSimpleName());
+
+ try {
+ String tableName = node.getTable();
+ PartialPath devicePath = new PartialPath(tableName);
+
+ Map<String, TSDataType> tsDataTypeMap = new LinkedHashMap<>();
+ Map<String, InputLocation> inputLocationMap = new LinkedHashMap<>();
+ List<TSDataType> inputColumnTypes = new ArrayList<>();
+ List<TsTableColumnCategory> inputColumnCategories = new ArrayList<>();
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = new
ArrayList<>();
Review Comment:
what's this used for?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java:
##########
@@ -593,10 +596,119 @@ protected Scope visitUse(Use node, Optional<Scope>
scope) {
throw new SemanticException("USE statement is not supported yet.");
}
+ private boolean containsAnyFieldColumn(
+ List<String> insertColumns, Map<String, ColumnSchema> columnSchemaMap)
{
+ for (String column : insertColumns) {
+ if (columnSchemaMap.containsKey(column)
+ && columnSchemaMap.get(column).getColumnCategory() ==
TsTableColumnCategory.FIELD) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Do not consider type coercion at the moment
+ private boolean typesMatchForInsert(List<Type> tableTypes, List<Type>
queryTypes) {
+ if (tableTypes.size() != queryTypes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < tableTypes.size(); i++) {
+ if (!tableTypes.get(i).equals(queryTypes.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
protected Scope visitInsert(Insert insert, Optional<Scope> scope) {
- throw new SemanticException(
- "This kind of insert statement is not supported yet, please check
your grammar.");
+ queryContext.setQueryType(QueryType.READ_WRITE);
+ // analyze the query that creates the data
+ Scope queryScope = analyze(insert.getQuery(), Optional.empty(), false);
+
+ // verify the insert table exists
+ QualifiedObjectName targetTable =
+ createQualifiedObjectName(sessionContext, insert.getTarget());
+ if (!metadata.tableExists(targetTable)) {
+ TableMetadataImpl.throwTableNotExistsException(
+ targetTable.getDatabaseName(), targetTable.getObjectName());
+ }
+ // verify access privileges
+ accessControl.checkCanInsertIntoTable(sessionContext.getUserName(),
targetTable);
+
+ // verify the insert destination columns match the query
+ Optional<TableSchema> tableSchema =
metadata.getTableSchema(sessionContext, targetTable);
+ if (!tableSchema.isPresent()) {
+ TableMetadataImpl.throwTableNotExistsException(
+ targetTable.getDatabaseName(), targetTable.getObjectName());
+ }
+ List<ColumnSchema> columns =
+ tableSchema.get().getColumns().stream()
+ .filter(column -> !column.isHidden())
+ .collect(toImmutableList());
+ analysis.registerTable(insert.getTable(), tableSchema, targetTable);
+
+ List<String> tableColumns =
+
columns.stream().map(ColumnSchema::getName).collect(toImmutableList());
Review Comment:
better construct a Set<String> tableColumnsSet, List.contains is O(N).
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java:
##########
@@ -1125,6 +1147,87 @@ protected RelationalInsertRowNode fromInsertRowStatement(
return insertNode;
}
+ @Override
+ protected RelationPlan visitInsert(final Insert node, final Void context) {
Review Comment:
It seems that Trino doesn't put the related codes here. It directly does
this in LogicalPlanner
<img width="3024" height="1890" alt="Image"
src="https://github.com/user-attachments/assets/27007796-9ae8-4228-9a29-12280bc5ea04"
/>
If so, we don't need to bring `Metadata` here
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java:
##########
@@ -199,6 +200,18 @@ public List<PlanNode> visitOutput(final OutputNode node,
final PlanContext conte
return Collections.singletonList(node);
}
+ @Override
+ public List<PlanNode> visitInto(final IntoNode node, final PlanContext
context) {
+ final List<PlanNode> childrenNodes = node.getChild().accept(this, context);
+ final OrderingScheme childOrdering =
nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
+ if (childOrdering != null) {
+ nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
+ }
+
+ node.setChild(mergeChildrenViaCollectOrMergeSort(childOrdering,
childrenNodes));
Review Comment:
Actually, we may need to think more about if IntoNode can be pushed down
into each child, and then add a IntoMergeNode to merge the write results
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3417,6 +3423,68 @@ public Operator visitPatternRecognition(
labelNames);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TreeIntoOperator.class.getSimpleName());
+
+ try {
+ String tableName = node.getTable();
+ PartialPath devicePath = new PartialPath(tableName);
+
+ Map<String, TSDataType> tsDataTypeMap = new LinkedHashMap<>();
+ Map<String, InputLocation> inputLocationMap = new LinkedHashMap<>();
+ List<TSDataType> inputColumnTypes = new ArrayList<>();
+ List<TsTableColumnCategory> inputColumnCategories = new ArrayList<>();
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = new
ArrayList<>();
+
+ List<ColumnSchema> inputColumns = node.getColumns();
+ for (int i = 0; i < inputColumns.size(); i++) {
+ String columnName = inputColumns.get(i).getName();
+ inputLocationMap.put(columnName, new InputLocation(0, i));
+
+ TsTableColumnCategory columnCategory =
inputColumns.get(i).getColumnCategory();
+ if (columnCategory == TIME) {
+ continue;
+ }
+
+ TSDataType columnType =
InternalTypeManager.getTSDataType(inputColumns.get(i).getType());
+ tsDataTypeMap.put(columnName, columnType);
+ inputColumnTypes.add(columnType);
+ inputColumnCategories.add(columnCategory);
+ sourceTargetPathPairList.add(
+ new Pair<>(
+ columnName,
+ new MeasurementPath(String.format("%s.%s", tableName,
columnName), columnType)));
+ }
+
+ long statementSizePerLine =
+ OperatorGeneratorUtil.calculateStatementSizePerLine(
+ new ArrayList<>(tsDataTypeMap.values()));
+
+ return new TableIntoOperator(
+ operatorContext,
+ child,
+ node.getDatabase(),
+ devicePath,
+ inputColumnTypes,
+ inputColumnCategories,
+ inputLocationMap,
+ tsDataTypeMap,
+ true,
+ FragmentInstanceManager.getInstance().getIntoOperationExecutor(),
+ statementSizePerLine);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
Review Comment:
throw a specific exception, like sematicExcption, but I think here will
never throw exception.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.client.DataNodeInternalClient;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import com.google.common.util.concurrent.Futures;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class TableIntoOperator extends AbstractIntoOperator {
+ protected InsertTabletStatementGenerator insertTabletStatementGenerator;
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TableIntoOperator.class);
+
+ private final PartialPath targetDevice;
+
+ public TableIntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ String databaseName,
+ PartialPath targetDevice,
+ List<TSDataType> inputColumnTypes,
+ List<TsTableColumnCategory> inputColumnCategories,
+ Map<String, InputLocation> measurementToInputLocationMap,
+ Map<String, TSDataType> measurementToDataTypeMap,
+ Boolean isAligned,
Review Comment:
```suggestion
boolean isAligned,
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.client.DataNodeInternalClient;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import com.google.common.util.concurrent.Futures;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class TableIntoOperator extends AbstractIntoOperator {
Review Comment:
We may need to consider Tablet in `TableIntoOperator` estimated size
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableIntoOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.protocol.client.DataNodeInternalClient;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import com.google.common.util.concurrent.Futures;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class TableIntoOperator extends AbstractIntoOperator {
+ protected InsertTabletStatementGenerator insertTabletStatementGenerator;
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(TableIntoOperator.class);
+
+ private final PartialPath targetDevice;
+
+ public TableIntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ String databaseName,
+ PartialPath targetDevice,
Review Comment:
```suggestion
PartialPath targetTable,
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/InsertTabletStatementGenerator.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class InsertTabletStatementGenerator {
+ protected int rowLimit;
+
+ protected PartialPath devicePath;
+ protected boolean isAligned;
+ protected String[] measurements;
+ protected TSDataType[] dataTypes;
+ protected InputLocation[] inputLocations;
+
+ protected int rowCount = 0;
+
+ protected long[] times;
+ protected Object[] columns;
+ protected BitMap[] bitMaps;
+
+ protected List<Type> sourceTypeConvertors;
+
+ public void reset() {
+ this.rowCount = 0;
+ this.times = new long[rowLimit];
+ this.columns = new Object[this.measurements.length];
+ for (int i = 0; i < this.measurements.length; i++) {
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ columns[i] = new boolean[rowLimit];
Review Comment:
why not reuse the each array
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableInsertTabletStatementGenerator.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME;
+
+public class TableInsertTabletStatementGenerator extends
InsertTabletStatementGenerator {
+
+ private final String databaseName;
+ private final AtomicLong writtenCounter;
+ private final int timeColumnIndex;
+ private final List<TsTableColumnCategory> tsTableColumnCategories;
+
+ public TableInsertTabletStatementGenerator(
+ String databaseName,
+ PartialPath devicePath,
+ Map<String, InputLocation> measurementToInputLocationMap,
+ Map<String, TSDataType> measurementToDataTypeMap,
+ Boolean isAligned,
Review Comment:
```suggestion
boolean isAligned,
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3417,6 +3423,68 @@ public Operator visitPatternRecognition(
labelNames);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TreeIntoOperator.class.getSimpleName());
+
+ try {
+ String tableName = node.getTable();
+ PartialPath devicePath = new PartialPath(tableName);
+
+ Map<String, TSDataType> tsDataTypeMap = new LinkedHashMap<>();
+ Map<String, InputLocation> inputLocationMap = new LinkedHashMap<>();
+ List<TSDataType> inputColumnTypes = new ArrayList<>();
+ List<TsTableColumnCategory> inputColumnCategories = new ArrayList<>();
+ List<Pair<String, PartialPath>> sourceTargetPathPairList = new
ArrayList<>();
+
+ List<ColumnSchema> inputColumns = node.getColumns();
+ for (int i = 0; i < inputColumns.size(); i++) {
+ String columnName = inputColumns.get(i).getName();
+ inputLocationMap.put(columnName, new InputLocation(0, i));
+
+ TsTableColumnCategory columnCategory =
inputColumns.get(i).getColumnCategory();
+ if (columnCategory == TIME) {
+ continue;
+ }
+
+ TSDataType columnType =
InternalTypeManager.getTSDataType(inputColumns.get(i).getType());
+ tsDataTypeMap.put(columnName, columnType);
+ inputColumnTypes.add(columnType);
+ inputColumnCategories.add(columnCategory);
+ sourceTargetPathPairList.add(
+ new Pair<>(
+ columnName,
+ new MeasurementPath(String.format("%s.%s", tableName,
columnName), columnType)));
+ }
+
+ long statementSizePerLine =
+ OperatorGeneratorUtil.calculateStatementSizePerLine(
+ new ArrayList<>(tsDataTypeMap.values()));
Review Comment:
```suggestion
inputColumnTypes);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3417,6 +3423,68 @@ public Operator visitPatternRecognition(
labelNames);
}
+ @Override
+ public Operator visitInto(IntoNode node, LocalExecutionPlanContext context) {
+ Operator child = node.getChild().accept(this, context);
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TreeIntoOperator.class.getSimpleName());
+
+ try {
+ String tableName = node.getTable();
+ PartialPath devicePath = new PartialPath(tableName);
Review Comment:
```suggestion
PartialPath tableName =
DEVICE_PATH_CACHE.getPartialPath(node.getTable());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]