zhztheplayer commented on code in PR #11107: URL: https://github.com/apache/incubator-gluten/pull/11107#discussion_r2538432586
########## backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala: ########## @@ -0,0 +1,2107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.delta.DeltaInsertIntoTableSuiteShims.{INSERT_INTO_TMP_VIEW_ERROR_MSG, INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG} +import org.apache.spark.sql.delta.schema.{InvariantViolationException, SchemaUtils} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} +import org.apache.spark.sql.functions.{lit, struct} +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ + +import org.scalatest.BeforeAndAfter + +import java.io.File +import java.util.TimeZone + +import scala.collection.JavaConverters._ + +class DeltaInsertIntoSQLSuite + extends DeltaInsertIntoTestsWithTempViews( + supportsDynamicOverwrite = true, + includeSQLOnlyTests = true) + with DeltaSQLCommandTest + with DeltaExcludedBySparkVersionTestMixinShims { + + override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { + val tmpView = "tmp_view" + withTempView(tmpView) { + insert.createOrReplaceTempView(tmpView) + val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO" + sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView") + } + } + + testSparkMasterOnly("Variant type") { + withTable("t") { + sql("CREATE TABLE t (id LONG, v VARIANT) USING delta") + sql("INSERT INTO t (id, v) VALUES (1, parse_json('{\"a\": 1}'))") + sql("INSERT INTO t (id, v) VALUES (2, parse_json('{\"b\": 2}'))") + sql("INSERT INTO t SELECT id, parse_json(cast(id as string)) v FROM range(2)") + + checkAnswer( + sql("select * from t").selectExpr("id", "to_json(v)"), + Seq(Row(1, "{\"a\":1}"), Row(2, "{\"b\":2}"), Row(0, "0"), Row(1, "1"))) + } + } + + test("insert overwrite should work with selecting constants") { + withTable("t1") { + sql("CREATE TABLE t1 (a int, b int, c int) USING delta PARTITIONED BY (b, c)") + sql("INSERT OVERWRITE TABLE t1 PARTITION (c=3) SELECT 1, 2") + checkAnswer( + sql("SELECT * FROM t1"), + Row(1, 2, 3) :: Nil + ) + sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c=3) SELECT 1") + checkAnswer( + sql("SELECT * FROM t1"), + Row(1, 2, 3) :: Nil + ) + sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c) SELECT 1, 3") + checkAnswer( + sql("SELECT * FROM t1"), + Row(1, 2, 3) :: Nil + ) + } + } + + test("insertInto: append by name") { + import testImplicits._ + val t1 = "tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + sql(s"INSERT INTO $t1(id, data) VALUES(1L, 'a')") + // Can be in a different order + sql(s"INSERT INTO $t1(data, id) VALUES('b', 2L)") + // Can be casted automatically + sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)") + verifyTable(t1, df) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + // Missing columns + assert(intercept[AnalysisException] { + sql(s"INSERT INTO $t1(data) VALUES(4)") + }.getMessage.contains("Column id is not specified in INSERT")) + // Missing columns with matching dataType + assert(intercept[AnalysisException] { + sql(s"INSERT INTO $t1(data) VALUES('b')") + }.getMessage.contains("Column id is not specified in INSERT")) + } + // Duplicate columns + assert( + intercept[AnalysisException]( + sql(s"INSERT INTO $t1(data, data) VALUES(5)")).getMessage.nonEmpty) + } + } + + test("insertInto: overwrite by name") { + import testImplicits._ + val t1 = "tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") + sql(s"INSERT OVERWRITE $t1(id, data) VALUES(1L, 'a')") + verifyTable(t1, Seq((1L, "a")).toDF("id", "data")) + // Can be in a different order + sql(s"INSERT OVERWRITE $t1(data, id) VALUES('b', 2L)") + verifyTable(t1, Seq((2L, "b")).toDF("id", "data")) + // Can be casted automatically + sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)") + verifyTable(t1, Seq((3L, "c")).toDF("id", "data")) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + // Missing columns + assert(intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1(data) VALUES(4)") + }.getMessage.contains("Column id is not specified in INSERT")) + // Missing columns with matching datatype + assert(intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1(data) VALUES(4L)") + }.getMessage.contains("Column id is not specified in INSERT")) + } + // Duplicate columns + assert( + intercept[AnalysisException]( + sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")).getMessage.nonEmpty) + } + } + + test("insertInto should throw an AnalysisError on name mismatch") { + def testInsertByNameError(targetSchema: String, expectedErrorClass: String): Unit = { + val sourceTableName = "source" + val targetTableName = "target" + val format = "delta" + withTable(sourceTableName, targetTableName) { + sql(s"CREATE TABLE $sourceTableName (a int, b int) USING $format") + sql(s"CREATE TABLE $targetTableName $targetSchema USING $format") + val e = intercept[AnalysisException] { + sql(s"INSERT INTO $targetTableName BY NAME SELECT * FROM $sourceTableName") + } + assert(e.getErrorClass === expectedErrorClass) + } + } + + // NOTE: We use upper case in the target schema so that needsSchemaAdjustmentByName returns + // true (due to case sensitivity) so that we call resolveQueryColumnsByName and hit the right + // code path. + + // when the number of columns does not match, throw an arity mismatch error. + testInsertByNameError( + targetSchema = "(A int)", + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") + + // when the number of columns matches, but the names do not, throw a missing column error. + testInsertByNameError( + targetSchema = "(A int, c int)", + expectedErrorClass = "DELTA_MISSING_COLUMN") + } + + dynamicOverwriteTest("insertInto: dynamic overwrite by name") { + import testImplicits._ + val t1 = "tbl" + withTable(t1) { + sql( + s"CREATE TABLE $t1 (id bigint, data string, data2 string) " + + s"USING $v2Format PARTITIONED BY (id)") + sql(s"INSERT OVERWRITE $t1(id, data, data2) VALUES(1L, 'a', 'b')") + verifyTable(t1, Seq((1L, "a", "b")).toDF("id", "data", "data2")) + // Can be in a different order + sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('b', 'd', 2L)") + verifyTable(t1, Seq((1L, "a", "b"), (2L, "b", "d")).toDF("id", "data", "data2")) + // Can be casted automatically + sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)") + verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2")) + withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { + // Missing columns + assert(intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1)") + }.getMessage.contains("Column data2 is not specified in INSERT")) + // Missing columns with matching datatype + assert(intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1L)") + }.getMessage.contains("Column data2 is not specified in INSERT")) + } + // Duplicate columns + assert( + intercept[AnalysisException]( + sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")).getMessage.nonEmpty) + } + } + + test("insertInto: static partition column name should not be used in the column list") { + withTable("t") { + sql(s"CREATE TABLE t(i STRING, c string) USING $v2Format PARTITIONED BY (c)") + checkError( + intercept[AnalysisException] { + sql("INSERT OVERWRITE t PARTITION (c='1') (c) VALUES ('2')") + }, + "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST", + parameters = Map("staticName" -> "c") + ) + } + } + + Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach { + case (testName, values) => + test(s"INSERT OVERWRITE schema evolution works for array struct types - $testName") { + val sourceSchema = + "id INT, col2 STRING, col ARRAY<STRUCT<f1: STRING, f2: STRING, f3: DATE>>" + val sourceRecord = "1, '2022-11-01', array(struct('s1', 's2', DATE'2022-11-01'))" + val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: STRING, f2: STRING>>" + val targetRecord = "1, DATE'2022-11-02', array(struct('t1', 't2'))" + + runInsertOverwrite(sourceSchema, sourceRecord, targetSchema, targetRecord) { + (sourceTable, targetTable) => + sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM $sourceTable") + + // make sure table is still writeable + sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02', + | array(struct('s3', 's4', DATE'2022-11-02')))""".stripMargin) + sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03', + |array(struct('s5', 's6', NULL)))""".stripMargin) + val df = spark.sql("""SELECT 1 as id, DATE'2022-11-01' as col2, + | array(struct('s1', 's2', DATE'2022-11-01')) as col UNION + | SELECT 2 as id, DATE'2022-11-02' as col2, + | array(struct('s3', 's4', DATE'2022-11-02')) as col UNION + | SELECT 3 as id, DATE'2022-11-03' as col2, + | array(struct('s5', 's6', NULL)) as col""".stripMargin) + verifyTable(targetTable, df) + } + } + } + + Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach { + case (testName, values) => + test(s"INSERT OVERWRITE schema evolution works for array nested types - $testName") { + val sourceSchema = "id INT, col2 STRING, " + + "col ARRAY<STRUCT<f1: INT, f2: STRUCT<f21: STRING, f22: DATE>, f3: STRUCT<f31: STRING>>>" + val sourceRecord = "1, '2022-11-01', " + + "array(struct(1, struct('s1', DATE'2022-11-01'), struct('s1')))" + val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: INT, f2: STRUCT<f21: STRING>>>" + val targetRecord = "2, DATE'2022-11-02', array(struct(2, struct('s2')))" + + runInsertOverwrite(sourceSchema, sourceRecord, targetSchema, targetRecord) { + (sourceTable, targetTable) => + sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM $sourceTable") + + // make sure table is still writeable + sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02', + | array(struct(2, struct('s2', DATE'2022-11-02'), struct('s2'))))""".stripMargin) + sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03', + | array(struct(3, struct('s3', NULL), struct(NULL))))""".stripMargin) + val df = spark.sql( + """SELECT 1 as id, DATE'2022-11-01' as col2, + | array(struct(1, struct('s1', DATE'2022-11-01'), struct('s1'))) as col UNION + | SELECT 2 as id, DATE'2022-11-02' as col2, + | array(struct(2, struct('s2', DATE'2022-11-02'), struct('s2'))) as col UNION + | SELECT 3 as id, DATE'2022-11-03' as col2, + | array(struct(3, struct('s3', NULL), struct(NULL))) as col + |""".stripMargin) + verifyTable(targetTable, df) + } + } + } + + // Schema evolution for complex map type + test("insertInto schema evolution with map type - append mode: field renaming + new field") { + withTable("map_schema_evolution") { + val tableName = "map_schema_evolution" + val initialSchema = StructType( + Seq( + StructField("key", IntegerType, nullable = false), + StructField( + "metrics", + MapType( + StringType, + StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("value", IntegerType, nullable = false) + )))) + )) + + val initialData = Seq( + Row(1, Map("event" -> Row(1, 1))) + ) + + val initialRdd = spark.sparkContext.parallelize(initialData) + val initialDf = spark.createDataFrame(initialRdd, initialSchema) + + // Write initial data + initialDf.write + .option("overwriteSchema", "true") + .mode("overwrite") + .format("delta") + .saveAsTable(tableName) + + // Evolved schema with field renamed and additional field in map struct + val evolvedSchema = StructType( + Seq( + StructField("renamed_key", IntegerType, nullable = false), + StructField( + "metrics", + MapType( + StringType, + StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("value", IntegerType, nullable = false), + StructField("comment", StringType, nullable = true) + )) + ) + ) + )) + + val evolvedData = Seq( + Row(1, Map("event" -> Row(1, 1, "deprecated"))) + ) + + val evolvedRdd = spark.sparkContext.parallelize(evolvedData) + val evolvedDf = spark.createDataFrame(evolvedRdd, evolvedSchema) + + // insert data without schema evolution + val err = intercept[AnalysisException] { + evolvedDf.write + .mode("append") + .format("delta") + .insertInto(tableName) + } + checkErrorMatchPVals( + exception = err, + "_LEGACY_ERROR_TEMP_DELTA_0007", + parameters = Map( + "message" -> "A schema mismatch detected when writing to the Delta table(.|\\n)*" + ) + ) + + // insert data with schema evolution + withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" -> "true") { + evolvedDf.write + .mode("append") + .format("delta") + .insertInto(tableName) + + checkAnswer( + spark.sql(s"SELECT * FROM $tableName"), + Seq( + Row(1, Map("event" -> Row(1, 1, null))), + Row(1, Map("event" -> Row(1, 1, "deprecated"))) + )) + } + } + } + + test("not enough column in source to insert in nested map types") { + withTable("source", "target") { + sql("""CREATE TABLE source ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT>> + |) USING delta""".stripMargin) + + sql("""CREATE TABLE target ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>> + |) USING delta""".stripMargin) + + sql("INSERT INTO source VALUES (1, map('event', struct(1, 1)))") + + val e = intercept[AnalysisException] { + sql("INSERT INTO target SELECT * FROM source") + } + checkError( + exception = e, + "DELTA_INSERT_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "spark_catalog.default.target", + "columnName" -> "not enough nested fields in value", + "numColumns" -> "3", + "insertColumns" -> "2" + ) + ) + } + } + + // not enough nested fields in value + test("more columns in source to insert in nested map types") { + withTable("source", "target") { + sql("""CREATE TABLE source ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>> + |) USING delta""".stripMargin) + + sql("""CREATE TABLE target ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT>> + |) USING delta""".stripMargin) + + sql("INSERT INTO source VALUES (1, map('event', struct(1, 1, 'deprecated')))") + + val e = intercept[AnalysisException] { + sql("INSERT INTO target SELECT * FROM source") + } + checkErrorMatchPVals( + exception = e, + "_LEGACY_ERROR_TEMP_DELTA_0007", + parameters = Map( + "message" -> "A schema mismatch detected when writing to the Delta table(.|\\n)*" + ) + ) + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + sql("INSERT INTO target SELECT * FROM source") + checkAnswer( + spark.sql(s"SELECT * FROM source"), + Seq( + Row(1, Map("event" -> Row(1, 1, "deprecated"))) + )) + } + } + } + + test("more columns in source to insert in nested 2-level deep map types") { + withTable("source", "target") { + sql("""CREATE TABLE source ( + | id INT, + | metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT, comment: STRING>>> + |) USING delta""".stripMargin) + + sql("""CREATE TABLE target ( + | id INT, + | metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT>>> + |) USING delta""".stripMargin) + + sql("""INSERT INTO source VALUES + | (1, map('event', map('subEvent', struct(1, 1, 'deprecated')))) + """.stripMargin) + + val e = intercept[AnalysisException] { + sql("INSERT INTO target SELECT * FROM source") + } + checkErrorMatchPVals( + exception = e, + "_LEGACY_ERROR_TEMP_DELTA_0007", + parameters = Map( + "message" -> "A schema mismatch detected when writing to the Delta table(.|\\n)*" + ) + ) + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + sql("INSERT INTO target SELECT * FROM source") + checkAnswer( + spark.sql(s"SELECT * FROM source"), + Seq( + Row(1, Map("event" -> Map("subEvent" -> Row(1, 1, "deprecated")))) + )) + } + } + } + + test("insert map type with different data type in key") { + withTable("source", "target") { + sql("""CREATE TABLE source ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT>> + |) USING delta""".stripMargin) + + sql("""CREATE TABLE target ( + | id INT, + | metrics MAP<INT, STRUCT<id: INT, value: INT>> + |) USING delta""".stripMargin) + + sql("INSERT INTO source VALUES (1, map('1', struct(2, 3)))") + + sql("INSERT INTO target SELECT * FROM source") + + checkAnswer( + spark.sql("SELECT * FROM target"), + Seq( + Row(1, Map(1 -> Row(2, 3))) + )) + } + } + + test("insert map type with different data type in value") { + withTable("source", "target") { + sql("""CREATE TABLE source ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: LONG>> + |) USING delta""".stripMargin) + + sql("""CREATE TABLE target ( + | id INT, + | metrics MAP<STRING, STRUCT<id: INT, value: INT>> + |) USING delta""".stripMargin) + + sql("INSERT INTO source VALUES (1, map('m1', struct(2, 3L)))") + + sql("INSERT INTO target SELECT * FROM source") + + checkAnswer( + spark.sql("SELECT * FROM target"), + Seq( + Row(1, Map("m1" -> Row(2, 3))) + )) + } + } + + def runInsertOverwrite( + sourceSchema: String, + sourceRecord: String, + targetSchema: String, + targetRecord: String)(runAndVerify: (String, String) => Unit): Unit = { + val sourceTable = "source" + val targetTable = "target" + withTable(sourceTable) { + withTable(targetTable) { + withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" -> "true") { + // prepare source table + sql(s"""CREATE TABLE $sourceTable ($sourceSchema) + | USING DELTA""".stripMargin) + sql(s"INSERT INTO $sourceTable VALUES ($sourceRecord)") + // prepare target table + sql(s"""CREATE TABLE $targetTable ($targetSchema) + | USING DELTA""".stripMargin) + sql(s"INSERT INTO $targetTable VALUES ($targetRecord)") + runAndVerify(sourceTable, targetTable) + } + } + } + } +} + +class DeltaInsertIntoSQLByPathSuite + extends DeltaInsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true) + with DeltaSQLCommandTest { + override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { + val tmpView = "tmp_view" + withTempView(tmpView) { + insert.createOrReplaceTempView(tmpView) + val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO" + val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName) + val catalogTable = spark.sessionState.catalog.getTableMetadata(ident) + sql(s"INSERT $overwrite TABLE delta.`${catalogTable.location}` SELECT * FROM $tmpView") + } + } + + testQuietly("insertInto: cannot insert into a table that doesn't exist") { + import testImplicits._ + Seq(SaveMode.Append, SaveMode.Overwrite).foreach { + mode => + withTempDir { + dir => + val t1 = s"delta.`${dir.getCanonicalPath}`" + val tmpView = "tmp_view" + withTempView(tmpView) { + val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + df.createOrReplaceTempView(tmpView) + + intercept[AnalysisException] { + sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView") + } + + assert( + new File(dir, "_delta_log").mkdirs(), + "Failed to create a _delta_log directory") + intercept[AnalysisException] { + sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView") + } + } + } + } + } +} + +class DeltaInsertIntoDataFrameSuite + extends DeltaInsertIntoTestsWithTempViews( + supportsDynamicOverwrite = true, + includeSQLOnlyTests = false) + with DeltaSQLCommandTest { + override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { + val dfw = insert.write.format(v2Format) + if (mode != null) { + dfw.mode(mode) + } + dfw.insertInto(tableName) + } +} + +class DeltaInsertIntoDataFrameByPathSuite + extends DeltaInsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) + with DeltaSQLCommandTest { + override protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode): Unit = { + val dfw = insert.write.format(v2Format) + if (mode != null) { + dfw.mode(mode) + } + val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName) + val catalogTable = spark.sessionState.catalog.getTableMetadata(ident) + dfw.insertInto(s"delta.`${catalogTable.location}`") + } + + testQuietly("insertInto: cannot insert into a table that doesn't exist") { + import testImplicits._ + Seq(SaveMode.Append, SaveMode.Overwrite).foreach { + mode => + withTempDir { + dir => + val t1 = s"delta.`${dir.getCanonicalPath}`" + val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") + + intercept[AnalysisException] { + df.write.mode(mode).insertInto(t1) + } + + assert(new File(dir, "_delta_log").mkdirs(), "Failed to create a _delta_log directory") + intercept[AnalysisException] { + df.write.mode(mode).insertInto(t1) + } + + // Test DataFrameWriterV2 as well + val dfW2 = df.writeTo(t1) + if (mode == SaveMode.Append) { + intercept[AnalysisException] { + dfW2.append() + } + } else { + intercept[AnalysisException] { + dfW2.overwrite(lit(true)) + } + } + } + } + } +} + +trait DeltaInsertIntoColumnMappingSelectedTests extends DeltaColumnMappingSelectedTestMixin { + override protected def runOnlyTests = Seq( + "InsertInto: overwrite - mixed clause reordered - static mode", + "InsertInto: overwrite - multiple static partitions - dynamic mode" + ) +} + +class DeltaInsertIntoSQLNameColumnMappingSuite + extends DeltaInsertIntoSQLSuite + with DeltaColumnMappingEnableNameMode + with DeltaInsertIntoColumnMappingSelectedTests { + override protected def runOnlyTests: Seq[String] = super.runOnlyTests :+ + "insert overwrite should work with selecting constants" +} + +class DeltaInsertIntoSQLByPathNameColumnMappingSuite + extends DeltaInsertIntoSQLByPathSuite + with DeltaColumnMappingEnableNameMode + with DeltaInsertIntoColumnMappingSelectedTests + +class DeltaInsertIntoDataFrameNameColumnMappingSuite + extends DeltaInsertIntoDataFrameSuite + with DeltaColumnMappingEnableNameMode + with DeltaInsertIntoColumnMappingSelectedTests + +class DeltaInsertIntoDataFrameByPathNameColumnMappingSuite + extends DeltaInsertIntoDataFrameByPathSuite + with DeltaColumnMappingEnableNameMode + with DeltaInsertIntoColumnMappingSelectedTests + +abstract class DeltaInsertIntoTestsWithTempViews( + supportsDynamicOverwrite: Boolean, + includeSQLOnlyTests: Boolean) + extends DeltaInsertIntoTests(supportsDynamicOverwrite, includeSQLOnlyTests) + with DeltaTestUtilsForTempViews { + protected def testComplexTempViews(name: String)(text: String, expectedResult: Seq[Row]): Unit = { + testWithTempView(s"insertInto a temp view created on top of a table - $name") { + isSQLTempView => + import testImplicits._ + val t1 = "tbl" + sql(s"CREATE TABLE $t1 (key int, value int) USING $v2Format") + Seq(SaveMode.Append, SaveMode.Overwrite).foreach { + mode => + createTempViewFromSelect(text, isSQLTempView) + val df = Seq((0, 3), (1, 2)).toDF("key", "value") + try { + doInsert("v", df, mode) + checkAnswer(spark.table("v"), expectedResult) + } catch { + case e: AnalysisException => + assert( + e.getMessage.contains(INSERT_INTO_TMP_VIEW_ERROR_MSG) || + e.getMessage.contains("Inserting into an RDD-based table is not allowed") || + e.getMessage.contains("Table default.v not found") || + e.getMessage.contains("Table or view 'v' not found in database 'default'") || + e.getMessage.contains("The table or view `default`.`v` cannot be found") || + e.getMessage.contains( + "[UNSUPPORTED_INSERT.RDD_BASED] Can't insert into the target.")) + } + } + } + } + + testComplexTempViews("basic")( + "SELECT * FROM tbl", + Seq(Row(0, 3), Row(1, 2)) + ) + + testComplexTempViews("subset cols")( + "SELECT key FROM tbl", + Seq(Row(0), Row(1)) + ) + + testComplexTempViews("superset cols")( + "SELECT key, value, 1 FROM tbl", + Seq(Row(0, 3, 1), Row(1, 2, 1)) + ) + + testComplexTempViews("nontrivial projection")( + "SELECT value as key, key as value FROM tbl", + Seq(Row(3, 0), Row(2, 1)) + ) + + testComplexTempViews("view with too many internal aliases")( + "SELECT * FROM (SELECT * FROM tbl AS t1) AS t2", + Seq(Row(0, 3), Row(1, 2)) + ) + +} + +class DeltaColumnDefaultsInsertSuite extends InsertIntoSQLOnlyTests with DeltaSQLCommandTest { + + import testImplicits._ + + override val supportsDynamicOverwrite = true + override val includeSQLOnlyTests = true + + val tblPropertiesAllowDefaults = + """tblproperties ( + | 'delta.feature.allowColumnDefaults' = 'enabled', + | 'delta.columnMapping.mode' = 'name' + |)""".stripMargin + + // Ignored in Gluten: Results mismatch. Review Comment: Fixed in https://github.com/apache/incubator-gluten/pull/11117. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
