cloud-fan commented on a change in pull request #29893:
URL: https://github.com/apache/spark/pull/29893#discussion_r531418233



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3104,6 +3106,62 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  object ResolveUserSpecifiedColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
+      case i: InsertIntoStatement if i.table.resolved && i.query.resolved &&
+          i.userSpecifiedCols.nonEmpty =>
+        val resolved = resolveUserSpecifiedColumns(i)
+        val projection = addColumnListOnQuery(i.table.output, resolved, 
i.query)
+        i.copy(userSpecifiedCols = Nil, query = projection)
+    }
+
+    private def resolveUserSpecifiedColumns(i: InsertIntoStatement): 
Seq[NamedExpression] = {
+      SchemaUtils.checkColumnNameDuplication(
+        i.userSpecifiedCols, "in the column list", resolver)
+
+      i.userSpecifiedCols.map { col =>
+          i.table.resolve(Seq(col), resolver)
+            .getOrElse(i.table.failAnalysis(s"Cannot resolve column name 
$col"))
+      }
+    }
+
+    private def addColumnListOnQuery(
+        tableOutput: Seq[Attribute],
+        cols: Seq[NamedExpression],
+        query: LogicalPlan): LogicalPlan = {
+      val errors = new mutable.ArrayBuffer[String]()
+
+      def failAdd(): Unit = {
+        val errMsg = if (errors.nonEmpty) errors.mkString("\n- ", "\n- ", "") 
else ""
+        query.failAnalysis(
+          s"""Cannot write to table due to mismatched user specified columns 
and data columns:
+             |Specified columns: ${cols.map(c => s"'${c.name}'").mkString(", 
")}
+             |Data columns: ${query.output.map(c => 
s"'${c.name}'").mkString(", ")}$errMsg"""
+            .stripMargin)
+      }
+
+      if (cols.size != query.output.size) failAdd()
+
+      val nameToQueryExpr = cols.zip(query.output).toMap
+      val resolved = tableOutput.flatMap { tableAttr =>
+        if (nameToQueryExpr.contains(tableAttr)) {
+          TableOutputResolver.checkField(
+            tableAttr, nameToQueryExpr(tableAttr), byName = false, conf, err 
=> errors += err)
+        } else {
+          None

Review comment:
       when can we go to this branch?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to