This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aacfe6de806 [HUDI-5296] Allow disable schema on read after enabling 
(#7421)
aacfe6de806 is described below

commit aacfe6de806e045a4a02f5f4a15910fb60d40fa7
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Mon Dec 12 11:39:18 2022 -0800

    [HUDI-5296] Allow disable schema on read after enabling (#7421)
    
    If someone has enabled schema on read by mistake and never really renamed 
or dropped a column. it should be feasible to disable schema on read. This 
patch fixes that. essentially both on read and write path, if 
"hoodie.schema.on.read.enable" config is not set, it will fallback to regular 
code path. It might fail or users might miss data if any they have performed 
any irrevocable changes like renames. But for rest, this should work.
---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 20 ++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 90 +++++++++-------------
 .../org/apache/hudi/IncrementalRelation.scala      |  6 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 54 ++++++++++++-
 4 files changed, 105 insertions(+), 65 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index afc0781eb1b..9c984b96fb2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -140,7 +140,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    */
   protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val internalSchemaOpt = if (!isSchemaEvolutionEnabled) {
+    val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, 
sparkSession)) {
       None
     } else {
       Try {
@@ -639,15 +639,6 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   private def prunePartitionColumns(dataStructSchema: StructType): StructType =
     StructType(dataStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
-
-  private def isSchemaEvolutionEnabled = {
-    // NOTE: Schema evolution could be configured both t/h optional parameters 
vehicle as well as
-    //       t/h Spark Session configuration (for ex, for Spark SQL)
-    optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean 
||
-      sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-        
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
-  }
 }
 
 object HoodieBaseRelation extends SparkAdapterSupport {
@@ -749,4 +740,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
         })
     }
   }
+
+  def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], 
sparkSession: SparkSession): Boolean = {
+    // NOTE: Schema evolution could be configured both t/h optional parameters 
vehicle as well as
+    //       t/h Spark Session configuration (for ex, for Spark SQL)
+    optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+      
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean 
||
+      sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
+        
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 1427d0c7c9e..f0ede2b2b82 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -150,14 +150,17 @@ object HoodieSparkSqlWriter {
       // Handle various save modes
       handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, 
tblName, operation, fs)
       val partitionColumns = 
SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
-      // Create the table if not present
-      if (!tableExists) {
+      val tableMetaClient = if (tableExists) {
+        HoodieTableMetaClient.builder
+          .setConf(sparkContext.hadoopConfiguration)
+          .setBasePath(path)
+          .build()
+      } else {
         val baseFileFormat = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
         val archiveLogFolder = 
hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
         val populateMetaFields = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
         val useBaseFormatMetaFile = 
hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT);
-
-        val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
+        HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
           .setDatabaseName(databaseName)
           .setTableName(tblName)
@@ -180,8 +183,8 @@ object HoodieSparkSqlWriter {
           
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
           .initTable(sparkContext.hadoopConfiguration, path)
-        tableConfig = tableMetaClient.getTableConfig
-      }
+        }
+      tableConfig = tableMetaClient.getTableConfig
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
 
@@ -191,8 +194,7 @@ object HoodieSparkSqlWriter {
           classOf[Schema]))
 
       val shouldReconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
-
-      val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, 
tableIdentifier, sparkContext.hadoopConfiguration)
+      val latestTableSchemaOpt = getLatestTableSchema(spark, tableIdentifier, 
tableMetaClient)
       // NOTE: We need to make sure that upon conversion of the schemas b/w 
Catalyst's [[StructType]] and
       //       Avro's [[Schema]] we're preserving corresponding "record-name" 
and "record-namespace" that
       //       play crucial role in establishing compatibility b/w schemas
@@ -200,19 +202,15 @@ object HoodieSparkSqlWriter {
         .getOrElse(getAvroRecordNameAndNamespace(tblName))
 
       val sourceSchema = convertStructTypeToAvroSchema(df.schema, 
avroRecordName, avroRecordNamespace)
-      val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext).orElse {
-        val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
-          
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
-        // In case we need to reconcile the schema and schema evolution is 
enabled,
-        // we will force-apply schema evolution to the writer's schema
-        if (shouldReconcileSchema && schemaEvolutionEnabled) {
-          val shouldRemoveMetaDataFromInternalSchema = 
sourceSchema.getFields().filter(f => 
f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
-          // in case sourceSchema contains HoodieRecord.HOODIE_META_COLUMNS
-          val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
-          
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
 allowOperationMetaDataField)))
-        } else {
-          None
-        }
+      val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, 
tableMetaClient).orElse {
+          // In case we need to reconcile the schema and schema evolution is 
enabled,
+          // we will force-apply schema evolution to the writer's schema
+          if (shouldReconcileSchema && 
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
 {
+            val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
+            
Some(AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
 allowOperationMetaDataField)))
+          } else {
+            None
+          }
       }
 
       // NOTE: Target writer's schema is deduced based on
@@ -252,7 +250,7 @@ object HoodieSparkSqlWriter {
             }
 
             // Create a HoodieWriteClient & issue the delete.
-            val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
+            val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, 
tableMetaClient)
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               null, path, tblName,
               mapAsJavaMap(addSchemaEvolutionParameters(parameters, 
internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
@@ -294,8 +292,6 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-
-
           case _ =>
             // Here all other (than DELETE, DELETE_PARTITION) write operations 
are handled
             //
@@ -562,25 +558,24 @@ object HoodieSparkSqlWriter {
   }
 
   /**
-    * get latest internalSchema from table
-    *
-    * @param fs           instance of FileSystem.
-    * @param basePath     base path.
-    * @param sparkContext instance of spark context.
-    * @return Pair of(boolean, table schema), where first entry will be true 
only if schema conversion is required.
-    */
-  def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, 
sparkContext: SparkContext): Option[InternalSchema] = {
-    try {
-      if (FSUtils.isTableExists(basePath.toString, fs)) {
-        val tableMetaClient = 
HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
+   * get latest internalSchema from table
+   *
+   * @param config instance of {@link HoodieConfig}
+   * @param tableMetaClient instance of HoodieTableMetaClient
+   * @return Pair of(boolean, table schema), where first entry will be true 
only if schema conversion is required.
+   */
+  def getLatestTableInternalSchema(config: HoodieConfig,
+                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
+    if 
(!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
+       Option.empty[InternalSchema]
+    } else {
+      try {
         val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
         val internalSchemaOpt = 
tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
         if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else 
None
-      } else {
-        None
+      } catch {
+        case _: Exception => None
       }
-    } catch {
-      case _: Exception => None
     }
   }
 
@@ -589,22 +584,11 @@ object HoodieSparkSqlWriter {
   }
 
   private def getLatestTableSchema(spark: SparkSession,
-                                   tableBasePath: Path,
                                    tableId: TableIdentifier,
-                                   hadoopConf: Configuration): Option[Schema] 
= {
-    val fs = tableBasePath.getFileSystem(hadoopConf)
+                                   tableMetaClient: HoodieTableMetaClient): 
Option[Schema] = {
+    val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
     val latestTableSchemaFromCommitMetadata =
-      if (FSUtils.isTableExists(tableBasePath.toString, fs)) {
-        val tableMetaClient = HoodieTableMetaClient.builder
-          .setConf(hadoopConf)
-          .setBasePath(tableBasePath.toString)
-          .build()
-        val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-        
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
-      } else {
-        None
-      }
-
+      
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
     latestTableSchemaFromCommitMetadata.orElse {
       getCatalogTable(spark, tableId).map { catalogTable =>
         val (structName, namespace) = 
getAvroRecordNameAndNamespace(tableId.table)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 4c763b054ad..80ee3dde5b0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.hadoop.fs.{GlobPattern, Path}
+import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.fs.FSUtils
@@ -47,6 +48,7 @@ import scala.collection.mutable
  * Relation, that implements the Hoodie incremental view.
  *
  * Implemented for Copy_on_write storage.
+ * TODO: rebase w/ HoodieBaseRelation HUDI-5362
  *
  */
 class IncrementalRelation(val sqlContext: SQLContext,
@@ -90,7 +92,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
   val (usedSchema, internalSchema) = {
     log.info("Inferring schema..")
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) {
+    val iSchema : InternalSchema = if 
(!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
+      InternalSchema.getEmptyInternalSchema
+    } else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
       
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
 metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
     } else {
       schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index ed9db3a5aa4..a84ff0fa3f8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieSparkUtils}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
 import org.apache.spark.sql.types.StringType
@@ -174,6 +174,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Enable and Disable Schema on read") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      if (HoodieSparkUtils.gteqSpark3_1) {
+        spark.sql("set hoodie.schema.on.read.enable=true")
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '$tablePath'
+             | tblproperties (
+             |  type = 'cow',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+       """.stripMargin)
+
+        // Insert data to the new table.
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000)
+        )
+
+        // add column
+        spark.sql(s"alter table $tableName add columns(new_col string)")
+        val catalogTable = spark.sessionState.catalog.getTableMetadata(new 
TableIdentifier(tableName))
+        assertResult(Seq("id", "name", "price", "ts", "new_col")) {
+          
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
+        }
+        checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+          Seq(1, "a1", 10.0, 1000, null)
+        )
+        // disable schema on read.
+        spark.sql("set hoodie.schema.on.read.enable=false")
+        spark.sql(s"refresh table $tableName")
+        // Insert data to the new table.
+        spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
+        // write should succeed. and subsequent read should succeed as well.
+        checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+          Seq(1, "a1", 10.0, 1000, null),
+          Seq(2, "a2", 12.0, 2000, "e0")
+        )
+      }
+    }
+  }
+
   test("Test Partition Table alter ") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>

Reply via email to