This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new cfd0c1ee34 [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap (#6213) cfd0c1ee34 is described below commit cfd0c1ee34460332053491fd1e68c2607c14e958 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Thu Jul 28 15:36:03 2022 -0700 [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap (#6213) --- .../spark/sql/HoodieCatalystPlansUtils.scala | 23 +- .../hudi/common/table/TableSchemaResolver.java | 21 ++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 87 ++++---- .../spark/sql/hudi/HoodieSqlCommonUtils.scala | 4 +- .../command/InsertIntoHoodieTableCommand.scala | 243 +++++++++++++-------- .../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 18 +- .../apache/spark/sql/hudi/TestDeleteTable.scala | 39 +++- .../apache/spark/sql/hudi/TestInsertTable.scala | 31 +-- .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 2 +- .../apache/spark/sql/hudi/TestShowPartitions.scala | 20 +- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 29 ++- .../apache/spark/sql/hudi/TestUpdateTable.scala | 28 ++- .../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 12 +- .../spark/sql/HoodieSpark3CatalystPlanUtils.scala | 12 +- 14 files changed, 381 insertions(+), 188 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index c277dcb3e6..7566458b1b 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -19,12 +19,33 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.internal.SQLConf trait HoodieCatalystPlansUtils { + /** + * Resolves output of the provided [[query]] against the [[expected]] list of [[Attribute]], + * and returns new (reshaped) instance of the [[LogicalPlan]] + * + * @param tableName used purely for more human-readable error output (if any) + * @param expected list of attributes output of the query has to adhere to + * @param query query whose output has to be reshaped + * @param byName whether the matching should occur by-name or positionally + * @param conf instance of [[SQLConf]] + * @return [[LogicalPlan]] which output is aligned to match to that of [[expected]] + */ + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan + + /** + * Instantiates an [[Explain]] command + */ def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 5fc989e2e5..4ada97e35c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -316,6 +316,9 @@ public class TableSchemaResolver { * @param oldSchema Older schema to check. * @param newSchema Newer schema to check. * @return True if the schema validation is successful + * + * TODO revisit this method: it's implemented incorrectly as it might be applying different criteria + * to top-level record and nested record (for ex, if that nested record is contained w/in an array) */ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { @@ -366,13 +369,31 @@ public class TableSchemaResolver { return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); } + /** + * Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least + * a single commit) + * + * This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback + * to use table's schema used at creation + */ + public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception { + if (metaClient.isTimelineNonEmpty()) { + return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty())); + } + + return Option.empty(); + } + /** * Get latest schema either from incoming schema or table schema. * @param writeSchema incoming batch's write schema. * @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise. * @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required. * @return the latest schema. + * + * @deprecated will be removed (HUDI-4472) */ + @Deprecated public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace, Function1<Schema, Schema> converterFn) { Schema latestSchema = writeSchema; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7324a5ca5b..167001863d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{CommitUtils, StringUtils} +import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -72,8 +72,7 @@ object HoodieSparkSqlWriter { hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, - asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty - ) + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty) : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { @@ -241,39 +240,49 @@ object HoodieSparkSqlWriter { sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) - var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) + + // TODO(HUDI-4472) revisit and simplify schema handling + val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema) + + val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) - if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean - && internalSchemaOpt.isEmpty) { - // force apply full schema evolution. - internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) - } - if (reconcileSchema) { - schema = lastestSchema - } - if (internalSchemaOpt.isDefined) { - // Apply schema evolution. - val mergedSparkSchema = if (!reconcileSchema) { - AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema)) + + val writerSchema: Schema = + if (reconcileSchema) { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) { + internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) + } + + if (internalSchemaOpt.isDefined) { + // Apply schema evolution, by auto-merging write schema and read schema + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get) + AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName) + } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) { + // In case schema reconciliation is enabled and source and latest table schemas + // are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]], then we will + // pick latest table's schema as the writer's schema + latestTableSchema + } else { + // Otherwise fallback to original source's schema + sourceSchema + } } else { - // Auto merge write schema and read schema. - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get) - AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName)) + // In case reconciliation is disabled, we still have to do nullability attributes + // (minor) reconciliation, making sure schema of the incoming batch is in-line with + // the data already committed in the table + AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema) } - schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace) - } - if (reconcileSchema && internalSchemaOpt.isEmpty) { - schema = lastestSchema - } - validateSchemaForHoodieIsDeleted(schema) - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}") + validateSchemaForHoodieIsDeleted(writerSchema) + sparkContext.getConf.registerAvroSchemas(writerSchema) + log.info(s"Registered avro schema : ${writerSchema.toString(true)}") // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, - org.apache.hudi.common.util.Option.of(schema)) + org.apache.hudi.common.util.Option.of(writerSchema)) val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT) || parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), @@ -295,10 +304,10 @@ object HoodieSparkSqlWriter { hoodieRecord }).toJavaRDD() - val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema + val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema // Create a HoodieWriteClient & issue the write. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path, + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] @@ -388,14 +397,18 @@ object HoodieSparkSqlWriter { * @param schema incoming record's schema. * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. */ - def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = { - var latestSchema: Schema = schema + def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = { if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + val tableMetaClient = HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(basePath.toString) + .build() val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null) + + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) + } else { + None } - latestSchema } def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row], diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 8328882239..b02881bc3d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = { child match { case Literal(nul, NullType) => Literal(nul, dataType) - case _ => if (child.dataType != dataType) - Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child + case expr if child.dataType != dataType => Cast(expr, dataType, Option(conf.sessionLocalTimeZone)) + case _ => child } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index be1ad8e9b8..8bd81df3d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,27 +17,39 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.HoodieSparkSqlWriter +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql._ /** - * Command for insert into hoodie table. + * Command for insert into Hudi table. + * + * This is correspondent to Spark's native [[InsertIntoStatement]] + * + * @param logicalRelation the [[LogicalRelation]] representing the table to be writing into. + * @param query the logical plan representing data to be written + * @param partitionSpec a map from the partition key to the partition value (optional). + * If the value is missing, dynamic partition insert will be performed. + * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS` would have + * Map('a' -> Some('1'), 'b' -> Some('2')), + * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` + * would have Map('a' -> Some('1'), 'b' -> None). + * @param overwrite overwrite existing table or partitions. */ -case class InsertIntoHoodieTableCommand( - logicalRelation: LogicalRelation, - query: LogicalPlan, - partition: Map[String, Option[String]], - overwrite: Boolean) +case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation, + query: LogicalPlan, + partitionSpec: Map[String, Option[String]], + overwrite: Boolean) extends HoodieLeafRunnableCommand { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -45,18 +57,19 @@ case class InsertIntoHoodieTableCommand( assert(logicalRelation.catalogTable.isDefined, "Missing catalog table") val table = logicalRelation.catalogTable.get - InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, overwrite) + InsertIntoHoodieTableCommand.run(sparkSession, table, query, partitionSpec, overwrite) Seq.empty[Row] } } -object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { +object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig with SparkAdapterSupport { + /** * Run the insert query. We support both dynamic partition insert and static partition insert. * @param sparkSession The spark session. * @param table The insert table. * @param query The insert query. - * @param insertPartitions The specified insert partition map. + * @param partitionSpec The specified insert partition map. * e.g. "insert into h(dt = '2021') select id, name from src" * "dt" is the key in the map and "2021" is the partition value. If the * partition value has not specified(in the case of dynamic partition) @@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { * @param extraOptions Extra options for insert. */ def run(sparkSession: SparkSession, - table: CatalogTable, - query: LogicalPlan, - insertPartitions: Map[String, Option[String]], - overwrite: Boolean, - refreshTable: Boolean = true, - extraOptions: Map[String, String] = Map.empty): Boolean = { - - val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table) - val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions) - - val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) { - // insert overwrite non-partition table + table: CatalogTable, + query: LogicalPlan, + partitionSpec: Map[String, Option[String]], + overwrite: Boolean, + refreshTable: Boolean = true, + extraOptions: Map[String, String] = Map.empty): Boolean = { + val catalogTable = new HoodieCatalogTable(sparkSession, table) + val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions) + + // NOTE: In case of partitioned table we override specified "overwrite" parameter + // to instead append to the dataset + val mode = if (overwrite && catalogTable.partitionFields.isEmpty) { SaveMode.Overwrite } else { - // for insert into or insert overwrite partition we use append mode. SaveMode.Append } - val conf = sparkSession.sessionState.conf - val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf) - // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery), - // The nullable attribute of fields will lost. - // In order to pass the nullable attribute to the inputDF, we specify the schema - // of the rdd. - val inputDF = sparkSession.createDataFrame( - Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema) - val success = - HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1 - if (success) { - if (refreshTable) { - sparkSession.catalog.refreshTable(table.identifier.unquotedString) - } - true - } else { - false + + val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf) + + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery)) + + if (success && refreshTable) { + sparkSession.catalog.refreshTable(table.identifier.unquotedString) } + + success } /** - * Aligned the type and name of query's output fields with the result table's fields. - * @param query The insert query which to aligned. - * @param hoodieCatalogTable The result hoodie catalog table. - * @param insertPartitions The insert partition map. - * @param conf The SQLConf. - * @return + * Align provided [[query]]'s output with the expected [[catalogTable]] schema by + * + * <ul> + * <li>Performing type coercion (casting corresponding outputs, where needed)</li> + * <li>Adding aliases (matching column names) to corresponding outputs </li> + * </ul> + * + * @param query target query whose output is to be inserted + * @param catalogTable catalog table + * @param partitionsSpec partition spec specifying static/dynamic partition values + * @param conf Spark's [[SQLConf]] */ - private def alignOutputFields( - query: LogicalPlan, - hoodieCatalogTable: HoodieCatalogTable, - insertPartitions: Map[String, Option[String]], - conf: SQLConf): LogicalPlan = { - - val targetPartitionSchema = hoodieCatalogTable.partitionSchema - - val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get) - assert(staticPartitionValues.isEmpty || - insertPartitions.size == targetPartitionSchema.size, - s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " + - s"is: ${staticPartitionValues.mkString("," + "")}") - - val queryOutputWithoutMetaFields = removeMetaFields(query.output) - assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size - == hoodieCatalogTable.tableSchemaWithoutMetaFields.size, - s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + - s"Current select columns(including static partition column) count: " + - s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " + - s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})") - - val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType( - hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))) - val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields, - dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf) - - val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name)) - .map(f => { - val staticPartitionValue = staticPartitionValues.getOrElse(f.name, - s"Missing static partition value for: ${f.name}") - val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf) - Alias(castAttr, f.name)() - }) + private def alignQueryOutput(query: LogicalPlan, + catalogTable: HoodieCatalogTable, + partitionsSpec: Map[String, Option[String]], + conf: SQLConf): LogicalPlan = { + + val targetPartitionSchema = catalogTable.partitionSchema + val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) + + validate(removeMetaFields(query.schema), partitionsSpec, catalogTable) + // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway) + val cleanedQuery = stripMetaFields(query) + // To validate and align properly output of the query, we simply filter out partition columns with already + // provided static values from the table's schema + // + // NOTE: This is a crucial step: since coercion might rely on either of a) name-based or b) positional-based + // matching it's important to strip out partition columns, having static values provided in the partition spec, + // since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched + // positionally for example + val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) + val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) + + val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) + + Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput) + } + + private def coerceQueryOutputColumns(expectedSchema: StructType, + query: LogicalPlan, + catalogTable: HoodieCatalogTable, + conf: SQLConf): LogicalPlan = { + val planUtils = sparkAdapter.getCatalystPlanUtils + try { + planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf) + } catch { + // NOTE: In case matching by name didn't match the query output, we will attempt positional matching + case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") => + planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf) + } + } - Project(dataProjectsWithoutMetaFields ++ partitionProjects, query) + private def validate(queryOutputSchema: StructType, partitionsSpec: Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = { + // Validate that partition-spec has proper format (it could be empty if all of the partition values are dynamic, + // ie there are no static partition-values specified) + if (partitionsSpec.nonEmpty && partitionsSpec.size != catalogTable.partitionSchema.size) { + throw new HoodieException(s"Required partition schema is: ${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " + + s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}") + } + + val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) + val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType))) + + // Assert that query provides all the required columns + if (!conforms(fullQueryOutputSchema, catalogTable.tableSchemaWithoutMetaFields)) { + throw new HoodieException(s"Expected table's schema: ${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " + + s"query's output (including static partition values): ${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}") + } + } + + private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String], + partitionSchema: StructType, + conf: SQLConf): Seq[NamedExpression] = { + partitionSchema.fields + .filter(pf => staticPartitionValues.contains(pf.name)) + .map(pf => { + val staticPartitionValue = staticPartitionValues(pf.name) + val castExpr = castIfNeeded(Literal.create(staticPartitionValue), pf.dataType, conf) + + Alias(castExpr, pf.name)() + }) } - private def getTableFieldsAlias( - queryOutputWithoutMetaFields: Seq[Attribute], - schemaWithoutMetaFields: Seq[StructField], - conf: SQLConf): Seq[Alias] = { - queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) => - val targetAttrOption = if (dataAttr.name.startsWith("col")) { - None - } else { - queryOutputWithoutMetaFields.find(_.name.equals(dataField.name)) + private def conforms(sourceSchema: StructType, targetSchema: StructType): Boolean = { + if (sourceSchema.fields.length != targetSchema.fields.length) { + false + } else { + targetSchema.fields.zip(sourceSchema).forall { + case (targetColumn, sourceColumn) => + // Make sure we can cast source column to the target column type + Cast.canCast(sourceColumn.dataType, targetColumn.dataType) } - val targetAttr = targetAttrOption.getOrElse(dataAttr) - val castAttr = castIfNeeded(targetAttr.withNullability(dataField.nullable), - dataField.dataType, conf) - Alias(castAttr, dataField.name)() } } + + def stripMetaFields(query: LogicalPlan): LogicalPlan = { + val filteredOutput = query.output.filterNot(attr => isMetaField(attr.name)) + if (filteredOutput == query.output) { + query + } else { + Project(filteredOutput, query) + } + } + + private def filterStaticPartitionValues(partitionsSpec: Map[String, Option[String]]): Map[String, String] = + partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 5e2afd7490..e7848320ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -145,11 +145,23 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + def dropTypeLiteralPrefix(value: Any): Any = { + value match { + case s: String => + s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("X") + case _ => value + } + } - protected def removeQuotes(value: Any): Any = { + protected def extractRawValue(value: Any): Any = { value match { - case s: String => s.stripPrefix("'").stripSuffix("'") - case _=> value + case s: String => + // We need to strip out data-type prefixes like "DATE", "TIMESTAMP" + dropTypeLiteralPrefix(s) + .asInstanceOf[String] + .stripPrefix("'") + .stripSuffix("'") + case _ => value } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 4c7c626966..3ab52a0bac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkUtils.isSpark2 import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.spark.sql.SaveMode @@ -93,11 +94,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $tableName - |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000) - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $tableName + |values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000), (3, 'a2', cast(30.0 as double), 1000) + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $tableName + |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000) + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 20.0, 1000), @@ -132,11 +142,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $ptTableName - |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022") + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( Seq(1, "a1", 10.0, 1000, "2021"), Seq(2, "a2", 20.0, 1000, "2021"), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 8d21fe32ea..ced6fef72d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.internal.SQLConf import java.io.File @@ -396,8 +397,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ("string", "'1000'"), ("int", 1000), ("bigint", 10000), - ("timestamp", "'2021-05-20 00:00:00'"), - ("date", "'2021-05-20'") + ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), + ("date", "DATE'2021-05-20'") ) typeAndValue.foreach { case (partitionType, partitionValue) => val tableName = generateTableName @@ -409,8 +410,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test TimestampType Partition Column With Consistent Logical Timestamp Enabled") { withTempDir { tmp => val typeAndValue = Seq( - ("timestamp", "'2021-05-20 00:00:00'"), - ("date", "'2021-05-20'") + ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), + ("date", "DATE'2021-05-20'") ) typeAndValue.foreach { case (partitionType, partitionValue) => val tableName = generateTableName @@ -433,11 +434,12 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) - spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") + // NOTE: We have to drop type-literal prefix since Spark doesn't parse type literals appropriately + spark.sql(s"insert into $tableName partition(dt = ${dropTypeLiteralPrefix(partitionValue)}) select 1, 'a1', 10") spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( - Seq(1, "a1", 10, removeQuotes(partitionValue).toString), - Seq(2, "a2", 10, removeQuotes(partitionValue).toString) + Seq(1, "a1", 10, extractRawValue(partitionValue).toString), + Seq(2, "a2", 10, extractRawValue(partitionValue).toString) ) } @@ -481,14 +483,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) - checkException(s"insert into $tableName partition(dt = '2021-06-20')" + - s" select 1, 'a1', 10, '2021-06-20'") ( - "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + - " count: 5,columns: (1,a1,10,2021-06-20,dt)" + checkException(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") ( + "Expected table's schema: " + + "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + + "query's output (including static partition values): " + + "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), StructField(dt,StringType,true)]" ) checkException(s"insert into $tableName select 1, 'a1', 10")( - "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + - " count: 3,columns: (1,a1,10)" + "Expected table's schema: " + + "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + + "query's output (including static partition values): " + + "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false)]" ) spark.sql("set hoodie.sql.bulk.insert.enable = true") spark.sql("set hoodie.sql.insert.mode = strict") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index ac11f83d53..58c808d28a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase { | when not matched then insert * |""".stripMargin) checkAnswer(s"select id, name, cast(value as string), ts from $tableName")( - Seq(1, "a1", removeQuotes(dataValue), 1000) + Seq(1, "a1", extractRawValue(dataValue), 1000) ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala index 005d5fed71..59ee642861 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.HoodieSparkUtils.isSpark2 import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH class TestShowPartitions extends HoodieSparkSqlTestBase { @@ -84,11 +85,22 @@ class TestShowPartitions extends HoodieSparkSqlTestBase { checkAnswer(s"show partitions $tableName partition(dt='2021-01-02')")(Seq("dt=2021-01-02")) // Insert into null partition - spark.sql( - s""" - | insert into $tableName - | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt + if (isSpark2) { + // Spark 2 isn't able to convert NullType to any other type w/ appropriate nullability, so + // explicit cast is required + spark.sql( + s""" + | insert into $tableName + | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, cast(null as string) as dt """.stripMargin) + } else { + spark.sql( + s""" + | insert into $tableName + | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt + """.stripMargin) + } + checkAnswer(s"show partitions $tableName")( Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), Seq("dt=%s".format(DEFAULT_PARTITION_PATH)) ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index b64d386f1f..65357b903b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -55,11 +55,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql( s""" | insert into $tableName values - | (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25 12:01:01',true,'a01','2021-12-25'), - | (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002','2021-12-25','2021-12-25 12:02:02',true,'a02','2021-12-25'), - | (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003','2021-12-25','2021-12-25 12:03:03',false,'a03','2021-12-25'), - | (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004','2021-12-26','2021-12-26 12:04:04',true,'a04','2021-12-26'), - | (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26 12:05:05',false,'a05','2021-12-26') + | (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:01:01',true,X'a01',TIMESTAMP'2021-12-25'), + | (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:02:02',true,X'a02',TIMESTAMP'2021-12-25'), + | (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:03:03',false,X'a03',TIMESTAMP'2021-12-25'), + | (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:04:04',true,X'a04',TIMESTAMP'2021-12-26'), + | (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:05:05',false,X'a05',TIMESTAMP'2021-12-26') |""".stripMargin) } @@ -70,6 +70,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // date -> string -> date spark.sql(s"alter table $tableName alter column col6 type String") @@ -138,6 +141,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // float -> double -> decimal -> String spark.sql(s"alter table $tableName alter column col2 type double") @@ -172,6 +178,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // test set properties @@ -402,7 +411,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"alter table $tableName alter column members.value.a first") - spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStruct', 29, 100), 1000)") + spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100), 1000)") // rename column spark.sql(s"alter table ${tableName} rename column user to userx") @@ -424,7 +433,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { checkAnswer(spark.sql(s"select name, userx.name, userx.score from ${tableName}").collect())(Seq(null, null, null)) // insert again - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000)") + spark.sql(s"insert into ${tableName} values(2 , map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000)") // check again checkAnswer(spark.sql(s"select name, userx.name as uxname, userx.score as uxs from ${tableName} order by id").collect())( @@ -440,9 +449,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq(291, 2, "jacknew")) // test map value type change spark.sql(s"alter table ${tableName} add columns(mxp map<String, int>)") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))") spark.sql(s"alter table ${tableName} alter column mxp.value type double") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") spark.sql(s"select * from $tableName").show(false) checkAnswer(spark.sql(s"select mxp from ${tableName} order by id").collect())( Seq(null), @@ -453,7 +462,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"alter table ${tableName} rename column userx to us") spark.sql(s"alter table ${tableName} rename column us.age to age1") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show() checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())( Seq(null, 29), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 8c709ab37a..2d8d6ceca7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.HoodieSparkUtils.isSpark2 + class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test Update Table") { @@ -84,7 +86,12 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)") + if (isSpark2) { + spark.sql(s"insert into $tableName values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000)") + } else { + spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)") + } + checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 20.0, 1000) @@ -119,11 +126,20 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $ptTableName - |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022") + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( Seq(1, "a1", 10.0, 1000, "2021"), Seq(2, "a2", 20.0, 1000, "2021"), diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 2797b8caa1..cf54504d0d 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -18,14 +18,22 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.internal.SQLConf object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan = + SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName) + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, extended = extended) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index 0cdf5782c0..abece34dea 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -18,17 +18,25 @@ package org.apache.spark.sql import org.apache.hudi.spark3.internal.ReflectUtil -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} +import org.apache.spark.sql.internal.SQLConf abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan = + TableOutputResolver.resolveOutputColumns(tableName, expected, query, byName, conf) + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)