imback82 commented on a change in pull request #33200:
URL: https://github.com/apache/spark/pull/33200#discussion_r672486685



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3574,15 +3568,64 @@ class Analyzer(override val catalogManager: 
CatalogManager)
 
   /**
    * Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity
-   * for alter table commands.
+   * for alter table column commands.
    */
-  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+  object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-      case a: AlterTableCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
+      case a: AlterTableColumnCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
-          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
+          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, 
Some(u))
+        }
+
+      case a @ AlterTableAddColumns(r: ResolvedTable, cols) if 
hasUnresolvedColumns(cols) =>
+        // 'colsToAdd' keeps track of new columns being added. It stores a 
mapping from a
+        // normalized parent name of fields to field names that belong to the 
parent.
+        // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 
'colsToAdd' will become
+        // Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
+        val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
+        def addColumn(
+            col: QualifiedColType,
+            parentSchema: StructType,
+            parentName: String,
+            normalizedParentName: Seq[String]): QualifiedColType = {
+          val fieldsAdded = colsToAdd.getOrElse(normalizedParentName, Nil)
+          val resolvedPosition = col.position.map {
+            case u: UnresolvedFieldPosition => u.position match {
+              case after: After =>
+                val allFields = parentSchema.fieldNames ++ fieldsAdded
+                allFields.find(n => conf.resolver(n, after.column())) match {
+                  case Some(colName) =>
+                    ResolvedFieldPosition(ColumnPosition.after(colName))
+                  case None =>
+                    throw 
QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(
+                      after, parentName)
+                }
+              case _ => ResolvedFieldPosition(u.position)
+            }
+            case resolved => resolved
+          }
+          val fieldName = col.name.last
+          colsToAdd(normalizedParentName) = fieldsAdded :+ fieldName
+          col.copy(name = normalizedParentName :+ fieldName, position = 
resolvedPosition)
         }
+        val schema = r.table.schema
+        val resolvedCols = cols.map { col =>
+          val parent = col.name.init
+          if (parent.nonEmpty) {
+            // Adding a nested field, need to normalize the parent column and 
position.
+            val resolvedParent = resolveFieldNames(r, parent)
+            val parentDataType = resolvedParent.field.dataType match {
+              case s: StructType => s
+              case o => throw 
QueryCompilationErrors.parentTypeNotStructError(col.name, o)

Review comment:
       updated.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3574,15 +3568,64 @@ class Analyzer(override val catalogManager: 
CatalogManager)
 
   /**
    * Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity
-   * for alter table commands.
+   * for alter table column commands.
    */
-  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+  object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-      case a: AlterTableCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
+      case a: AlterTableColumnCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
-          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
+          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, 
u.origin)
+        }
+
+      case a @ AlterTableAddColumns(r: ResolvedTable, cols) if 
hasUnresolvedColumns(cols) =>

Review comment:
       Note that since column name is `Seq[String]` this can be resolved many 
times if the column's position is None:
   ```
   private def hasUnresolvedColumns(cols: Seq[QualifiedColType]): Boolean = {
     cols.exists(_.position.forall(!_.resolved))
   }
   ```

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3574,15 +3568,64 @@ class Analyzer(override val catalogManager: 
CatalogManager)
 
   /**
    * Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity
-   * for alter table commands.
+   * for alter table column commands.
    */
-  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+  object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-      case a: AlterTableCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
+      case a: AlterTableColumnCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
-          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
+          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, 
Some(u))
+        }
+
+      case a @ AlterTableAddColumns(r: ResolvedTable, cols) if 
hasUnresolvedColumns(cols) =>
+        // 'colsToAdd' keeps track of new columns being added. It stores a 
mapping from a
+        // normalized parent name of fields to field names that belong to the 
parent.
+        // For example, if we add columns "a.b.c", "a.b.d", and "a.c", 
'colsToAdd' will become
+        // Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
+        val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
+        def addColumn(
+            col: QualifiedColType,
+            parentSchema: StructType,
+            parentName: String,
+            normalizedParentName: Seq[String]): QualifiedColType = {
+          val fieldsAdded = colsToAdd.getOrElse(normalizedParentName, Nil)
+          val resolvedPosition = col.position.map {
+            case u: UnresolvedFieldPosition => u.position match {
+              case after: After =>
+                val allFields = parentSchema.fieldNames ++ fieldsAdded
+                allFields.find(n => conf.resolver(n, after.column())) match {
+                  case Some(colName) =>
+                    ResolvedFieldPosition(ColumnPosition.after(colName))
+                  case None =>
+                    throw 
QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(
+                      after, parentName)
+                }
+              case _ => ResolvedFieldPosition(u.position)
+            }
+            case resolved => resolved
+          }
+          val fieldName = col.name.last
+          colsToAdd(normalizedParentName) = fieldsAdded :+ fieldName
+          col.copy(name = normalizedParentName :+ fieldName, position = 
resolvedPosition)
         }
+        val schema = r.table.schema
+        val resolvedCols = cols.map { col =>
+          val parent = col.name.init
+          if (parent.nonEmpty) {
+            // Adding a nested field, need to normalize the parent column and 
position.
+            val resolvedParent = resolveFieldNames(r, parent)

Review comment:
       updated.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3574,15 +3568,64 @@ class Analyzer(override val catalogManager: 
CatalogManager)
 
   /**
    * Rule to mostly resolve, normalize and rewrite column names based on case 
sensitivity
-   * for alter table commands.
+   * for alter table column commands.
    */
-  object ResolveAlterTableCommands extends Rule[LogicalPlan] {
+  object ResolveAlterTableColumnCommands extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
-      case a: AlterTableCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
+      case a: AlterTableColumnCommand if a.table.resolved && 
hasUnresolvedFieldName(a) =>
         val table = a.table.asInstanceOf[ResolvedTable]
         a.transformExpressions {
-          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
+          case u: UnresolvedFieldName => resolveFieldNames(table, u.name, 
u.origin)
+        }
+
+      case a @ AlterTableAddColumns(r: ResolvedTable, cols) if 
hasUnresolvedColumns(cols) =>

Review comment:
       Note that since column name is `Seq[String]` this can be resolved many 
times if the column's position is None (e.g, `hasUnresolvedColumns` doesn't 
consider `QualifiedColType.name`:
   ```
   private def hasUnresolvedColumns(cols: Seq[QualifiedColType]): Boolean = {
     cols.exists(_.position.forall(!_.resolved))
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to