This is an automated email from the ASF dual-hosted git repository. leesf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 84dd3ca [HUDI-2053] Insert Static Partition With DateType Return Incorrect Partition Value (#3133) 84dd3ca is described below commit 84dd3ca18b62921a60c3970a0f26614d8bafc89d Author: pengzhiwei <pengzhiwei2...@icloud.com> AuthorDate: Thu Jun 24 19:09:37 2021 +0800 [HUDI-2053] Insert Static Partition With DateType Return Incorrect Partition Value (#3133) --- .../org/apache/spark/sql/hudi/HoodieSqlUtils.scala | 7 ++- .../command/InsertIntoHoodieTableCommand.scala | 7 ++- .../apache/spark/sql/hudi/TestHoodieSqlBase.scala | 9 +++ .../apache/spark/sql/hudi/TestInsertTable.scala | 64 ++++++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index a1c38bb..27846e1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hudi import scala.collection.JavaConverters._ import java.net.URI import java.util.Locale - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.SparkAdapterSupport @@ -30,7 +29,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SQLConf @@ -106,6 +105,10 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } + def removeMetaFields(attrs: Seq[Attribute]): Seq[Attribute] = { + attrs.filterNot(attr => isMetaField(attr.name)) + } + /** * Get the table location. * @param tableId diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2ad9a68..c7fac53 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -128,6 +128,11 @@ object InsertIntoHoodieTableCommand { s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " + s"is: ${staticPartitionValues.mkString("," + "")}") + assert(staticPartitionValues.size + query.output.size == table.schema.size, + s"Required select columns count: ${removeMetaFields(table.schema).size}, " + + s"Current select columns(including static partition column) count: " + + s"${staticPartitionValues.size + removeMetaFields(query.output).size},columns: " + + s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})") val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition query.output.dropRight(targetPartitionSchema.fields.length) } else { // insert static partition @@ -156,7 +161,7 @@ object InsertIntoHoodieTableCommand { targetPartitionSchema.fields.map(f => { val staticPartitionValue = staticPartitionValues.getOrElse(f.name, s"Missing static partition value for: ${f.name}") - val castAttr = Literal.create(staticPartitionValue, f.dataType) + val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf) Alias(castAttr, f.name)() }) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index 610c71f..067e49a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -78,4 +78,13 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { protected def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = { assertResult(expects.map(row => Row(row: _*)).toArray)(spark.sql(sql).collect()) } + + protected def checkException(sql: String)(errorMsg: String): Unit = { + try { + spark.sql(sql) + } catch { + case e: Throwable => + assertResult(errorMsg)(e.getMessage) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 98d095b..945ccf5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -220,4 +220,68 @@ class TestInsertTable extends TestHoodieSqlBase { ) } } + + test("Test Different Type of Partition Column") { + withTempDir { tmp => + val typeAndValue = Seq( + ("string", "'1000'"), + ("int", 1000), + ("bigint", 10000), + ("timestamp", "'2021-05-20 00:00:00'"), + ("date", "'2021-05-20'") + ) + typeAndValue.foreach { case (partitionType, partitionValue) => + val tableName = generateTableName + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt $partitionType + |) using hudi + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + + spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") + spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") + checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( + Seq(1, "a1", 10, removeQuotes(partitionValue).toString), + Seq(2, "a2", 10, removeQuotes(partitionValue).toString) + ) + } + } + } + + test("Test Insert Exception") { + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | partitioned by (dt) + """.stripMargin) + checkException(s"insert into $tableName partition(dt = '2021-06-20')" + + s" select 1, 'a1', 10, '2021-06-20'") ( + "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + + " count: 5,columns: (1,a1,10,2021-06-20,dt)" + ) + checkException(s"insert into $tableName select 1, 'a1', 10")( + "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + + " count: 3,columns: (1,a1,10)" + ) + } + + private def removeQuotes(value: Any): Any = { + value match { + case s: String => s.stripPrefix("'").stripSuffix("'") + case _=> value + } + } }