This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new aacfe6de806 [HUDI-5296] Allow disable schema on read after enabling (#7421) aacfe6de806 is described below commit aacfe6de806e045a4a02f5f4a15910fb60d40fa7 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Mon Dec 12 11:39:18 2022 -0800 [HUDI-5296] Allow disable schema on read after enabling (#7421) If someone has enabled schema on read by mistake and never really renamed or dropped a column. it should be feasible to disable schema on read. This patch fixes that. essentially both on read and write path, if "hoodie.schema.on.read.enable" config is not set, it will fallback to regular code path. It might fail or users might miss data if any they have performed any irrevocable changes like renames. But for rest, this should work. --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 20 ++--- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 90 +++++++++------------- .../org/apache/hudi/IncrementalRelation.scala | 6 +- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 54 ++++++++++++- 4 files changed, 105 insertions(+), 65 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index afc0781eb1b..9c984b96fb2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { val schemaResolver = new TableSchemaResolver(metaClient) - val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { + val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) { None } else { Try { @@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def prunePartitionColumns(dataStructSchema: StructType): StructType = StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) - - private def isSchemaEvolutionEnabled = { - // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as - // t/h Spark Session configuration (for ex, for Spark SQL) - optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - } } object HoodieBaseRelation extends SparkAdapterSupport { @@ -749,4 +740,13 @@ object HoodieBaseRelation extends SparkAdapterSupport { }) } } + + def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1427d0c7c9e..f0ede2b2b82 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -150,14 +150,17 @@ object HoodieSparkSqlWriter { // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) - // Create the table if not present - if (!tableExists) { + val tableMetaClient = if (tableExists) { + HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(path) + .build() + } else { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); - - val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() + HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setDatabaseName(databaseName) .setTableName(tblName) @@ -180,8 +183,8 @@ object HoodieSparkSqlWriter { .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) - tableConfig = tableMetaClient.getTableConfig - } + } + tableConfig = tableMetaClient.getTableConfig val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) @@ -191,8 +194,7 @@ object HoodieSparkSqlWriter { classOf[Schema])) val shouldReconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean - - val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration) + val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, tableMetaClient) // NOTE: We need to make sure that upon conversion of the schemas b/w Catalyst's [[StructType]] and // Avro's [[Schema]] we're preserving corresponding "record-name" and "record-namespace" that // play crucial role in establishing compatibility b/w schemas @@ -200,19 +202,15 @@ object HoodieSparkSqlWriter { .getOrElse(getAvroRecordNameAndNamespace(tblName)) val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace) - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext).orElse { - val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - // In case we need to reconcile the schema and schema evolution is enabled, - // we will force-apply schema evolution to the writer's schema - if (shouldReconcileSchema && schemaEvolutionEnabled) { - val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty - // in case sourceSchema contains HoodieRecord.HOODIE_META_COLUMNS - val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean - Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField))) - } else { - None - } + val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean + Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema), allowOperationMetaDataField))) + } else { + None + } } // NOTE: Target writer's schema is deduced based on @@ -252,7 +250,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -294,8 +292,6 @@ object HoodieSparkSqlWriter { client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) (writeStatuses, client) - - case _ => // Here all other (than DELETE, DELETE_PARTITION) write operations are handled // @@ -562,25 +558,24 @@ object HoodieSparkSqlWriter { } /** - * get latest internalSchema from table - * - * @param fs instance of FileSystem. - * @param basePath base path. - * @param sparkContext instance of spark context. - * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. - */ - def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { - try { - if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + * get latest internalSchema from table + * + * @param config instance of {@link HoodieConfig} + * @param tableMetaClient instance of HoodieTableMetaClient + * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. + */ + def getLatestTableInternalSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Option.empty[InternalSchema] + } else { + try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None - } else { - None + } catch { + case _: Exception => None } - } catch { - case _: Exception => None } } @@ -589,22 +584,11 @@ object HoodieSparkSqlWriter { } private def getLatestTableSchema(spark: SparkSession, - tableBasePath: Path, tableId: TableIdentifier, - hadoopConf: Configuration): Option[Schema] = { - val fs = tableBasePath.getFileSystem(hadoopConf) + tableMetaClient: HoodieTableMetaClient): Option[Schema] = { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val latestTableSchemaFromCommitMetadata = - if (FSUtils.isTableExists(tableBasePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder - .setConf(hadoopConf) - .setBasePath(tableBasePath.toString) - .build() - val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) - } else { - None - } - + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) latestTableSchemaFromCommitMetadata.orElse { getCatalogTable(spark, tableId).map { catalogTable => val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 4c763b054ad..80ee3dde5b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils @@ -47,6 +48,7 @@ import scala.collection.mutable * Relation, that implements the Hoodie incremental view. * * Implemented for Copy_on_write storage. + * TODO: rebase w/ HoodieBaseRelation HUDI-5362 * */ class IncrementalRelation(val sqlContext: SQLContext, @@ -90,7 +92,9 @@ class IncrementalRelation(val sqlContext: SQLContext, val (usedSchema, internalSchema) = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) { + val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) { + InternalSchema.getEmptyInternalSchema + } else if (useEndInstantSchema && !commitsToReturn.isEmpty) { InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) } else { schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index ed9db3a5aa4..a84ff0fa3f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit} import org.apache.spark.sql.types.StringType @@ -174,6 +174,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } + test("Test Enable and Disable Schema on read") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + if (HoodieSparkUtils.gteqSpark3_1) { + spark.sql("set hoodie.schema.on.read.enable=true") + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data to the new table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // add column + spark.sql(s"alter table $tableName add columns(new_col string)") + val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult(Seq("id", "name", "price", "ts", "new_col")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // disable schema on read. + spark.sql("set hoodie.schema.on.read.enable=false") + spark.sql(s"refresh table $tableName") + // Insert data to the new table. + spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") + // write should succeed. and subsequent read should succeed as well. + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 2000, "e0") + ) + } + } + } + test("Test Partition Table alter ") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType =>