cloud-fan commented on a change in pull request #25507: [SPARK-28667][SQL] 
Support InsertInto through the V2SessionCatalog 
URL: https://github.com/apache/spark/pull/25507#discussion_r317572985
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -770,40 +760,35 @@ class Analyzer(
 
   object ResolveInsertInto extends Rule[LogicalPlan] {
     override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
-      case i @ InsertIntoStatement(
-          UnresolvedRelation(CatalogObjectIdentifier(Some(tableCatalog), 
ident)), _, _, _, _)
-          if i.query.resolved =>
-        loadTable(tableCatalog, ident)
-            .map(DataSourceV2Relation.create)
-            .map(relation => {
-              // ifPartitionNotExists is append with validation, but 
validation is not supported
-              if (i.ifPartitionNotExists) {
-                throw new AnalysisException(
-                  s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
-              }
-
-              val partCols = partitionColumnNames(relation.table)
-              validatePartitionSpec(partCols, i.partitionSpec)
+      case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if 
i.query.resolved =>
+        lookupV2Relation(u) match {
+          case scala.Right(Some(v2Table: Table)) =>
+            val relation = DataSourceV2Relation.create(v2Table)
+            // ifPartitionNotExists is append with validation, but validation 
is not supported
+            if (i.ifPartitionNotExists) {
+              throw new AnalysisException(
+                s"Cannot write, IF NOT EXISTS is not supported for table: 
${relation.table.name}")
+            }
 
-              val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
-              val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
-              val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
-                  conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+            val partCols = partitionColumnNames(relation.table)
+            validatePartitionSpec(partCols, i.partitionSpec)
 
-              if (!i.overwrite) {
-                AppendData.byPosition(relation, query)
-              } else if (dynamicPartitionOverwrite) {
-                OverwritePartitionsDynamic.byPosition(relation, query)
-              } else {
-                OverwriteByExpression.byPosition(
-                  relation, query, staticDeleteExpression(relation, 
staticPartitions))
-              }
-            })
-            .getOrElse(i)
+            val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get)
+            val query = addStaticPartitionColumns(relation, i.query, 
staticPartitions)
+            val dynamicPartitionOverwrite = partCols.size > 
staticPartitions.size &&
+              conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
 
-      case i @ InsertIntoStatement(UnresolvedRelation(AsTableIdentifier(_)), 
_, _, _, _)
-          if i.query.resolved =>
-        InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
+            if (!i.overwrite) {
+              AppendData.byPosition(relation, query)
+            } else if (dynamicPartitionOverwrite) {
+              OverwritePartitionsDynamic.byPosition(relation, query)
+            } else {
+              OverwriteByExpression.byPosition(
+                relation, query, staticDeleteExpression(relation, 
staticPartitions))
+            }
+          case _ =>
+            InsertIntoTable(i.table, i.partitionSpec, i.query, i.overwrite, 
i.ifPartitionNotExists)
 
 Review comment:
   `lookupV2Relation` needs to return `Either` because we need to catch this 
case and create v1 insert command. I have a simpler idea:
   1. `ResolveInsertInto` only handles v2 insert command, and leave 
`InsertIntoStatement(UnresolvedRelation, ...)` unchanged if we can't load a v2 
table.
   2. `ResolveRelations` catches the `InsertIntoStatement(UnresolvedRelation, 
...)` and convert it to v1 insert command if we can load a v1 table.
   
   By doing this, `lookupV2Relation` doesn't need to distinguish between normal 
v2 catalog and v2 session catalog during table lookup, and can return 
`Option[Table]`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to