This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 2b475ed [CARBONDATA-3720] Support alter table scenario for new insert into flow 2b475ed is described below commit 2b475edc24bbf78085f29ab55198ac7b4a6b24e7 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Sun Feb 23 11:04:30 2020 +0530 [CARBONDATA-3720] Support alter table scenario for new insert into flow Why is this PR needed? Currently, the rearrange logic is based on schema ordinal. For alter table drop and add columns with/without partition, schema ordinal based re arrange may not work as index will be outside the projection size. This logic become complex to handle. Hence don't use schema ordinal for rerrange, implemented a position map based rearrange. What changes were proposed in this PR? Implemented a position map based rearrange. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3634 --- .../management/CarbonInsertIntoCommand.scala | 90 ++++++++-------------- 1 file changed, 31 insertions(+), 59 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index 68189e2..25887e7 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -26,9 +26,9 @@ import scala.collection.mutable import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} +import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.util.CausedBy @@ -37,8 +37,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} +import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema} import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo} import org.apache.carbondata.core.util.path.CarbonTablePath @@ -97,15 +97,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String], // If logical plan is unresolved, need to convert it to resolved. dataFrame = Dataset.ofRows(sparkSession, logicalPlan) logicalPlan = dataFrame.queryExecution.analyzed - // Currently projection re-ordering is based on schema ordinal, - // for some scenarios in alter table scenario, schema ordinal logic cannot be applied. - // So, sending it to old flow - // TODO: Handle alter table in future, this also must use new flow. - if (CarbonProperties.isBadRecordHandlingEnabledForInsert || - isAlteredSchema(tableInfo.getFactTable)) { + if (CarbonProperties.isBadRecordHandlingEnabledForInsert) { + // use old converter flow isInsertIntoWithConverterFlow = true - } - if (isInsertIntoWithConverterFlow) { return Seq.empty } setAuditTable(CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), tableName) @@ -458,67 +452,45 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String], } } - private def isAlteredSchema(tableSchema: TableSchema): Boolean = { - if (tableInfo.getFactTable.getSchemaEvolution != null) { - tableInfo - .getFactTable - .getSchemaEvolution - .getSchemaEvolutionEntryList.asScala.exists(entry => - (entry.getAdded != null && entry.getAdded.size() > 0) || - (entry.getRemoved != null && entry.getRemoved.size() > 0) - ) - } else { - false - } - } - def getReArrangedIndexAndSelectedSchema( tableInfo: TableInfo, partitionColumnSchema: mutable.Buffer[ColumnSchema]): (Seq[Int], Seq[ColumnSchema]) = { var reArrangedIndex: Seq[Int] = Seq() var selectedColumnSchema: Seq[ColumnSchema] = Seq() - var complexChildCount: Int = 0 var partitionIndex: Seq[Int] = Seq() - val columnSchema = tableInfo.getFactTable.getListOfColumns.asScala + // internal order ColumnSchema (non-flat structure) + val columnSchema = (table.getVisibleDimensions.asScala ++ + table.getVisibleMeasures.asScala).map(_.getColumnSchema) val partitionColumnNames = if (partitionColumnSchema != null) { partitionColumnSchema.map(x => x.getColumnName).toSet } else { null } - // get invisible column indexes, alter table scenarios can have it before or after new column - // dummy measure will have ordinal -1 and it is invisible, ignore that column. - // alter table old columns are just invisible columns with proper ordinal - val invisibleIndex = columnSchema.filter(col => col.isInvisible && col.getSchemaOrdinal != -1) - .map(col => col.getSchemaOrdinal) - columnSchema.filterNot(col => col.isInvisible).foreach { + var createOrderColumns = table.getCreateOrderColumn.asScala + val createOrderMap = mutable.Map[String, Int]() + if (partitionColumnNames != null) { + // For alter table drop/add column scenarios, partition column may not be in the end. + // Need to keep it in the end. + createOrderColumns = createOrderColumns.filterNot(col => + partitionColumnNames.contains(col.getColumnSchema.getColumnName)) ++ + createOrderColumns.filter(col => + partitionColumnNames.contains(col.getColumnSchema.getColumnName)) + } + createOrderColumns.zipWithIndex.map { + case (col, index) => createOrderMap.put(col.getColName, index) + } + columnSchema.foreach { col => var skipPartitionColumn = false - if (col.getColumnName.contains(".")) { - // If the schema ordinal is -1, - // no need to consider it during shifting columns to derive new shifted ordinal - if (col.getSchemaOrdinal != -1) { - complexChildCount = complexChildCount + 1 - } + if (partitionColumnNames != null && + partitionColumnNames.contains(col.getColumnName)) { + partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName) + skipPartitionColumn = true } else { - // get number of invisible index count before this column - val invisibleIndexCount = invisibleIndex.count(index => index < col.getSchemaOrdinal) - if (col.getDataType.isComplexType) { - // Calculate re-arrange index by ignoring the complex child count. - // As projection will have only parent columns - reArrangedIndex = reArrangedIndex :+ - (col.getSchemaOrdinal - complexChildCount - invisibleIndexCount) - } else { - if (partitionColumnNames != null && - partitionColumnNames.contains(col.getColumnName)) { - partitionIndex = partitionIndex :+ (col.getSchemaOrdinal - invisibleIndexCount) - skipPartitionColumn = true - } else { - reArrangedIndex = reArrangedIndex :+ (col.getSchemaOrdinal - invisibleIndexCount) - } - } - if (!skipPartitionColumn) { - selectedColumnSchema = selectedColumnSchema :+ col - } + reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName) + } + if (!skipPartitionColumn) { + selectedColumnSchema = selectedColumnSchema :+ col } } if (partitionColumnSchema != null) {