This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c938d62d79 [SPARK-43123][SQL] Internal field metadata should not be 
leaked to catalogs
4c938d62d79 is described below

commit 4c938d62d791742b9f0c6a77b66fc06a90d7c0ad
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Apr 14 17:08:29 2023 +0800

    [SPARK-43123][SQL] Internal field metadata should not be leaked to catalogs
    
    ### What changes were proposed in this pull request?
    
    In Spark, we have defined some internal field metadata to help query 
resolution and compilation. For example, there are quite some field metadata 
that are related to metadata columns.
    
    However, when we create tables, these internal field metadata can be 
leaked. This PR updates CTAS/RTAS commands to remove these internal field 
metadata before creating tables. CREATE/REPLACE TABLE command is fine as users 
can't generate these internal field metadata via the type string.
    
    ### Why are the changes needed?
    
    to avoid potential issues, like mistakenly treating a data column as 
metadata column
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    Closes #40776 from cloud-fan/meta.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +--
 .../apache/spark/sql/catalyst/util/package.scala   | 34 ++++++++++++++----
 .../execution/command/createDataSourceTables.scala |  5 +--
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 42 +++++++++++-----------
 .../spark/sql/connector/MetadataColumnSuite.scala  | 16 +++++++++
 5 files changed, 69 insertions(+), 32 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 126b0618727..74592c15d23 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -41,7 +41,7 @@ import 
org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, 
StringUtils}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, 
CharVarcharUtils, StringUtils}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.{View => _, _}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -492,7 +492,7 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
             case l: Literal => Alias(l, toPrettySQL(l))()
             case e =>
               val metaForAutoGeneratedAlias = new MetadataBuilder()
-                .putString("__autoGeneratedAlias", "true")
+                .putString(AUTO_GENERATED_ALIAS, "true")
                 .build()
               Alias(e, toPrettySQL(e))(explicitMetadata = 
Some(metaForAutoGeneratedAlias))
           }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index e1ce45c1353..23ddb534af9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
+import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, 
StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -191,12 +191,13 @@ package object util extends Logging {
 
   val METADATA_COL_ATTR_KEY = "__metadata_col"
 
+  /**
+   * If set, this metadata column can only be accessed with qualifiers, e.g. 
`qualifiers.col` or
+   * `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
+   */
+  val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"
+
   implicit class MetadataColumnHelper(attr: Attribute) {
-    /**
-     * If set, this metadata column can only be accessed with qualifiers, e.g. 
`qualifiers.col` or
-     * `qualifiers.*`. If not set, metadata columns cannot be accessed via 
star.
-     */
-    val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"
 
     def isMetadataCol: Boolean = MetadataAttribute.isValid(attr.metadata)
 
@@ -225,4 +226,25 @@ package object util extends Logging {
       }
     }
   }
+
+  val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias"
+
+  val INTERNAL_METADATA_KEYS = Seq(
+    AUTO_GENERATED_ALIAS,
+    METADATA_COL_ATTR_KEY,
+    QUALIFIED_ACCESS_ONLY,
+    FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY,
+    
FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY,
+    
FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY
+  )
+
+  def removeInternalMetadata(schema: StructType): StructType = {
+    StructType(schema.map { field =>
+      var builder = new MetadataBuilder().withMetadata(field.metadata)
+      INTERNAL_METADATA_KEYS.foreach { key =>
+        builder = builder.remove(key)
+      }
+      field.copy(metadata = builder.build())
+    })
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index bf14ef14cf4..3848d550515 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -22,7 +22,7 @@ import java.net.URI
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.CommandExecutionMode
 import org.apache.spark.sql.execution.datasources._
@@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
       }
       val result = saveDataIntoTable(
         sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = 
false)
-      val tableSchema = CharVarcharUtils.getRawSchema(result.schema, 
sessionState.conf)
+      val tableSchema = CharVarcharUtils.getRawSchema(
+        removeInternalMetadata(result.schema), sessionState.conf)
       val newTable = table.copy(
         storage = table.storage.copy(locationUri = tableLocation),
         // We will use the schema of resolved.relation as the schema of the 
table (instead of
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 8355ac8e703..426f33129a6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,15 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
TableSpec, UnaryNode}
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, 
WriteDeltaProjections}
+import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, 
CharVarcharUtils, WriteDeltaProjections}
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, 
INSERT_OPERATION, UPDATE_OPERATION}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
StagedTable, StagingTableCatalog, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, 
DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, 
WriterCommitMessage}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, 
SQLMetrics}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
@@ -69,7 +70,7 @@ case class CreateTableAsSelectExec(
     query: LogicalPlan,
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
-    ifNotExists: Boolean) extends TableWriteExecHelper {
+    ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {
 
   val properties = CatalogV2Util.convertTableProperties(tableSpec)
 
@@ -78,14 +79,10 @@ case class CreateTableAsSelectExec(
       if (ifNotExists) {
         return Nil
       }
-
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-
-    val columns = CatalogV2Util.structTypeToV2Columns(
-      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
-    val table = catalog.createTable(ident, columns,
-      partitioning.toArray, properties.asJava)
+    val table = catalog.createTable(
+      ident, getV2Columns(query.schema), partitioning.toArray, 
properties.asJava)
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
@@ -106,7 +103,7 @@ case class AtomicCreateTableAsSelectExec(
     query: LogicalPlan,
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
-    ifNotExists: Boolean) extends TableWriteExecHelper {
+    ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {
 
   val properties = CatalogV2Util.convertTableProperties(tableSpec)
 
@@ -115,13 +112,10 @@ case class AtomicCreateTableAsSelectExec(
       if (ifNotExists) {
         return Nil
       }
-
       throw QueryCompilationErrors.tableAlreadyExistsError(ident)
     }
-    val columns = CatalogV2Util.structTypeToV2Columns(
-      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
     val stagedTable = catalog.stageCreate(
-      ident, columns, partitioning.toArray, properties.asJava)
+      ident, getV2Columns(query.schema), partitioning.toArray, 
properties.asJava)
     writeToTable(catalog, stagedTable, writeOptions, ident, query)
   }
 }
@@ -144,7 +138,8 @@ case class ReplaceTableAsSelectExec(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     orCreate: Boolean,
-    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends 
TableWriteExecHelper {
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit)
+  extends V2CreateTableAsSelectBaseExec {
 
   val properties = CatalogV2Util.convertTableProperties(tableSpec)
 
@@ -164,10 +159,8 @@ case class ReplaceTableAsSelectExec(
     } else if (!orCreate) {
       throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
     }
-    val columns = CatalogV2Util.structTypeToV2Columns(
-      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
     val table = catalog.createTable(
-      ident, columns, partitioning.toArray, properties.asJava)
+      ident, getV2Columns(query.schema), partitioning.toArray, 
properties.asJava)
     writeToTable(catalog, table, writeOptions, ident, query)
   }
 }
@@ -192,13 +185,13 @@ case class AtomicReplaceTableAsSelectExec(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     orCreate: Boolean,
-    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends 
TableWriteExecHelper {
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit)
+  extends V2CreateTableAsSelectBaseExec {
 
   val properties = CatalogV2Util.convertTableProperties(tableSpec)
 
   override protected def run(): Seq[InternalRow] = {
-    val columns = CatalogV2Util.structTypeToV2Columns(
-      CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
+    val columns = getV2Columns(query.schema)
     if (catalog.tableExists(ident)) {
       val table = catalog.loadTable(ident)
       invalidateCache(catalog, table, ident)
@@ -559,9 +552,14 @@ case class DeltaWithMetadataWritingSparkTask(
   }
 }
 
-private[v2] trait TableWriteExecHelper extends LeafV2CommandExec {
+private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
   override def output: Seq[Attribute] = Nil
 
+  protected def getV2Columns(schema: StructType): Array[Column] = {
+    CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(
+      removeInternalMetadata(schema), conf).asNullable)
+  }
+
   protected def writeToTable(
       catalog: TableCatalog,
       table: Table,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index d03bf402170..b043bf2f5be 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.{col, struct}
+import org.apache.spark.sql.types.IntegerType
 
 class MetadataColumnSuite extends DatasourceV2SQLBase {
   import testImplicits._
@@ -340,4 +343,17 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       assert(relations(0).output != relations(1).output)
     }
   }
+
+  test("SPARK-43123: Metadata column related field metadata should not be 
leaked to catalogs") {
+    withTable(tbl, "testcat.target") {
+      prepareTable()
+      sql(s"CREATE TABLE testcat.target AS SELECT index FROM $tbl")
+      val cols = catalog("testcat").asTableCatalog.loadTable(
+        Identifier.of(Array.empty, "target")).columns()
+      assert(cols.length == 1)
+      assert(cols.head.name() == "index")
+      assert(cols.head.dataType() == IntegerType)
+      assert(cols.head.metadataInJSON() == null)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to