nsivabalan commented on code in PR #9743:
URL: https://github.com/apache/hudi/pull/9743#discussion_r1372446787


##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -116,9 +116,24 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+  /**
+   * Validate whether the {@code targetSchema} is a valid evolution of {@code 
sourceSchema}.
+   * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type 
promotion in the
+   * opposite direction
+   */
+  public static boolean isValidEvolutionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return (sourceSchema.getType() == Schema.Type.NULL) || 
isProjectionOfInternal(sourceSchema, targetSchema,
+        AvroSchemaUtils::isAtomicSchemasCompatibleEvolution);
+  }
+
+  private static boolean isAtomicSchemasCompatibleEvolution(Schema 
oneAtomicType, Schema anotherAtomicType) {

Review Comment:
   can we write extensive docs on these methods. in general we have not been 
very comfortable in touching these part of code. may be meng tao and few others 
are, but rest of the PMCs generally have been very cautious. Can you add more 
docs around these methods so its easier for maintenance going forward



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java:
##########
@@ -79,6 +81,14 @@ public class HoodieCommonConfig extends HoodieConfig {
           + " operation will fail schema compatibility check. Set this option 
to true will make the newly added "
           + " column nullable to successfully complete the write operation.");
 
+  public static final ConfigProperty<String> ADD_NULL_FOR_DELETED_COLUMNS = 
ConfigProperty
+      .key("hoodie.datasource.add.null.for.deleted.columns")

Review Comment:
   "hoodie.datasource.set.null.for.missing.columns" 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -173,7 +175,12 @@ private RecordIterator(Schema readerSchema, Schema 
writerSchema, byte[] content)
         this.totalRecords = this.dis.readInt();
       }
 
-      this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+      if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, 
readerSchema)) {

Review Comment:
   We should try and unify our convention across the code base. 
   We use reader and writer schema here. we use table schema and source schema 
in outer layers. 
   we use prevSchema in some cases. 
   sourceSchema and targetSchema in few other places. 
   
   We should try to align all these and use a standard terminology throughout. 
   May be reader and writer in write handle classes. 
   and source Schema and targetSchema else where. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java:
##########
@@ -359,7 +360,8 @@ public Option<Pair<HoodieInstant, HoodieCommitMetadata>> 
getLastCommitMetadataWi
     return Option.fromJavaOptional(
         getCommitMetadataStream()
             .filter(instantCommitMetadataPair ->
-                
!StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)))
+                
!StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+            && 
!WriteOperationType.schemaCantChange(instantCommitMetadataPair.getRight().getOperationType()))

Review Comment:
   minor. can you switch the order of conditions. 
   lets first check for operation type. and then check for SCHEMA_KEY in extra 
metadata



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.internal.schema.InternalSchema
+
+/**
+ * Util methods for Schema evolution in Hudi
+ */
+object HoodieSchemaUtils {
+  /**
+   * get latest internalSchema from table
+   *
+   * @param config          instance of {@link HoodieConfig}
+   * @param tableMetaClient instance of HoodieTableMetaClient
+   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
+   */
+  def getLatestTableInternalSchema(config: HoodieConfig,
+                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
+    if 
(!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
+      None
+    } else {
+      try {
+        val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)

Review Comment:
   should we fix this one also to not look into every commit metadata. 
   (getTableInternalSchemaFromCommitMetadata). for eg, we should skip 
compaction, clustering commits etc. 
   



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -111,18 +117,22 @@ public static InternalSchema reconcileSchema(Schema 
incomingSchema, InternalSche
     return 
SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns, 
typeChange);
   }
 
+  public static Schema reconcileSchema(Schema incomingSchema, Schema 
oldTableSchema) {
+    return convert(reconcileSchema(incomingSchema, convert(oldTableSchema)), 
oldTableSchema.getFullName());
+  }
+
   /**
-   * Reconciles nullability requirements b/w {@code source} and {@code target} 
schemas,
+   * Reconciles nullability and datatype requirements b/w {@code source} and 
{@code target} schemas,
    * by adjusting these of the {@code source} schema to be in-line with the 
ones of the
    * {@code target} one
    *
    * @param sourceSchema source schema that needs reconciliation
    * @param targetSchema target schema that source schema will be reconciled 
against
-   * @param opts config options
-   * @return schema (based off {@code source} one) that has nullability 
constraints reconciled
+   * @param opts         config options
+   * @return schema (based off {@code source} one) that has nullability 
constraints and datatypes reconciled
    */
-  public static Schema reconcileNullability(Schema sourceSchema, Schema 
targetSchema, Map<String, String> opts) {
-    if (sourceSchema.getFields().isEmpty() || 
targetSchema.getFields().isEmpty()) {
+  public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema 
targetSchema, Map<String, String> opts) {

Review Comment:
   reconcile is not very apparent to everyone. can you add more java docs as to 
what exactly we mean by reconcile here. 
   do we add missing columns. or do we inject just defaults etc. 
   



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java:
##########
@@ -116,9 +116,24 @@ public static String getAvroRecordQualifiedName(String 
tableName) {
     return "hoodie." + sanitizedTableName + "." + sanitizedTableName + 
"_record";
   }
 
+  /**
+   * Validate whether the {@code targetSchema} is a valid evolution of {@code 
sourceSchema}.
+   * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type 
promotion in the
+   * opposite direction
+   */
+  public static boolean isValidEvolutionOf(Schema sourceSchema, Schema 
targetSchema) {
+    return (sourceSchema.getType() == Schema.Type.NULL) || 
isProjectionOfInternal(sourceSchema, targetSchema,
+        AvroSchemaUtils::isAtomicSchemasCompatibleEvolution);
+  }
+
+  private static boolean isAtomicSchemasCompatibleEvolution(Schema 
oneAtomicType, Schema anotherAtomicType) {

Review Comment:
   also for method in L174. even you could write some example illustrations if 
need be. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -33,8 +33,9 @@ object HoodieParquetFileFormatHelper {
     val sparkRequestStructFields = requiredSchema.map(f => {
       val requiredType = f.dataType
       if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, 
fileStructMap(f.name))) {
-        implicitTypeChangeInfo.put(new 
Integer(requiredSchema.fieldIndex(f.name)), 
org.apache.hudi.common.util.collection.Pair.of(requiredType, 
fileStructMap(f.name)))
-        StructField(f.name, fileStructMap(f.name), f.nullable)
+        val readerType = addMissingFields(requiredType, fileStructMap(f.name))

Review Comment:
   can you add java docs. if possible some example illustrations



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -523,9 +526,15 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       throw new UnsupportedOperationException("Spark record only support 
parquet log.");
     }
 
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))

Review Comment:
   syncOnce() impl already has an instance of HoodieTableMetaClient just after 
refreshing the timeline. we should try to re-use that instead of creating new 
ones. 



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -41,18 +42,23 @@ public class AvroSchemaEvolutionUtils {
    * 1) incoming data has missing columns that were already defined in the 
table –> null values will be injected into missing columns
    * 2) incoming data contains new columns not defined yet in the table -> 
columns will be added to the table schema (incoming dataframe?)
    * 3) incoming data has missing columns that are already defined in the 
table and new columns not yet defined in the table ->
-   *     new columns will be added to the table schema, missing columns will 
be injected with null values
+   * new columns will be added to the table schema, missing columns will be 
injected with null values
    * 4) support type change
    * 5) support nested schema change.
    * Notice:
-   *    the incoming schema should not have delete/rename semantics.
-   *    for example: incoming schema:  int a, int b, int d;   oldTableSchema 
int a, int b, int c, int d
-   *    we must guarantee the column c is missing semantic, instead of delete 
semantic.
+   * the incoming schema should not have delete/rename semantics.
+   * for example: incoming schema:  int a, int b, int d;   oldTableSchema int 
a, int b, int c, int d
+   * we must guarantee the column c is missing semantic, instead of delete 
semantic.
+   *
    * @param incomingSchema implicitly evolution of avro when hoodie write 
operation
    * @param oldTableSchema old internalSchema
    * @return reconcile Schema
    */
   public static InternalSchema reconcileSchema(Schema incomingSchema, 
InternalSchema oldTableSchema) {
+    /* If incoming schema is null, we fall back on table schema. */

Review Comment:
   am curious as to how new incoming could have NULL schema? 
   if someone tries to write emptydataframe to hudi is it? 
   why we are accounting for NULL schemas in all these places. 



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -131,20 +141,34 @@ public static Schema reconcileNullability(Schema 
sourceSchema, Schema targetSche
 
     List<String> colNamesSourceSchema = 
sourceInternalSchema.getAllColsFullName();
     List<String> colNamesTargetSchema = 
targetInternalSchema.getAllColsFullName();
-    List<String> candidateUpdateCols = colNamesSourceSchema.stream()
+    List<String> nullableUpdateCols = colNamesSourceSchema.stream()
         .filter(f -> 
(("true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key())) && 
!colNamesTargetSchema.contains(f))
                 || colNamesTargetSchema.contains(f) && 
sourceInternalSchema.findField(f).isOptional() != 
targetInternalSchema.findField(f).isOptional()
             )
         ).collect(Collectors.toList());
+    List<String> typeUpdateCols = colNamesSourceSchema.stream()

Review Comment:
   can we do L 144 to 150 using single for loop. 



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java:
##########
@@ -131,20 +141,34 @@ public static Schema reconcileNullability(Schema 
sourceSchema, Schema targetSche
 
     List<String> colNamesSourceSchema = 
sourceInternalSchema.getAllColsFullName();
     List<String> colNamesTargetSchema = 
targetInternalSchema.getAllColsFullName();
-    List<String> candidateUpdateCols = colNamesSourceSchema.stream()
+    List<String> nullableUpdateCols = colNamesSourceSchema.stream()

Review Comment:
   nullableUpdateColsInSource



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
+    val addNullForDeletedColumns = 
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),

Review Comment:
   if you happened to change the config key, lets change the respective var 
names as well 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
+    val addNullForDeletedColumns = 
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
+      
DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean
     val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
     val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
       HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
 
     latestTableSchemaOpt match {
       // In case table schema is empty we're just going to use the source 
schema as a
-      // writer's schema. No additional handling is required
-      case None => sourceSchema
+      // writer's schema.
+      case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema)

Review Comment:
   wrt impl of AvroInternalSchemaConverter.fixNullOrdering,
   why can't we just reconstruct avro schema. why do we have to convert avro -> 
internal schema and then back to avro again. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -624,17 +635,25 @@ class HoodieSparkSqlWriterInternal {
           } else {
             if (!shouldValidateSchemasCompatibility) {
               // if no validation is enabled, check for col drop
-              // if col drop is allowed, go ahead. if not, check for 
projection, so that we do not allow dropping cols
-              if (allowAutoEvolutionColumnDrop || 
canProject(latestTableSchema, canonicalizedSourceSchema)) {
+              if (allowAutoEvolutionColumnDrop) {
                 canonicalizedSourceSchema
               } else {
-                log.error(
-                  s"""Incoming batch schema is not compatible with the table's 
one.
-                   |Incoming schema ${sourceSchema.toString(true)}
-                   |Incoming schema (canonicalized) 
${canonicalizedSourceSchema.toString(true)}
-                   |Table's schema ${latestTableSchema.toString(true)}
-                   |""".stripMargin)
-                throw new SchemaCompatibilityException("Incoming batch schema 
is not compatible with the table's one")
+                val reconciledSchema = if (addNullForDeletedColumns) {

Review Comment:
   I feel we should just simplify all these. its getting hard to maintain,
   may be in next release, we should try to do something as follows. (my below 
suggestion is for OOB schema evolution). Schema on read will take a diff route 
as usual. 
   
   proposal: 
   we will never allow auto dropping of cols w/ regular writes. If one has to 
delete/drop columns, they have to go to spark-sql and trigger alter column 
command. 
   
   So, if we keep that out of the way, then our schema evolution for regular 
ingestion might be way simpler. 
   
   for missing cols in incoming compared to table schema, inject nulls.
   promote data type if need be. 
   add new cols from incoming which are missing in target. 
   
   we can get rid of cananicalize, reconcile etc. 
   there won't be multiple if else blocks and it will all be one logic for any 
writer. 
   
   spark-sql alter table command can infer configs like inject nulls for 
missing etc and deduce final table schema accordingly. but for regular writers, 
we can avoid all these diff permutation/combinations.
   
   anyways, something to keep in mind for future. guess we may not do it in 
this patch. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -94,8 +94,8 @@ object HoodieSparkSqlWriter {
    *
    * NOTE: This is an internal config that is not exposed to the public
    */
-  val CANONICALIZE_NULLABLE: ConfigProperty[Boolean] =
-    ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
+  val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] =

Review Comment:
   this is internal config right



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
+    val addNullForDeletedColumns = 
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
+      
DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean
     val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
     val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
       HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
 
     latestTableSchemaOpt match {
       // In case table schema is empty we're just going to use the source 
schema as a
-      // writer's schema. No additional handling is required
-      case None => sourceSchema
+      // writer's schema.

Review Comment:
   can you throw some light on this. for spark-sql writer, guess 
deduceWriterSchema is a no-op right. all manipulation happens in the write 
handle. or is my understanding wrong. 
   if my understanding is right, how does adding new columns using MIT works. 
can you help me understand



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -565,35 +573,24 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
         }
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
-        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // Deduce proper target (writer's) schema for the input dataset, 
reconciling its

Review Comment:
   so block of lines 551 to 574 does not need to invoke 
getDeducedSchemaProvider() is it? 
   why is it so. can you help me understand. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/LazyCastingIterator.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.Iterator;
+
+public class LazyCastingIterator extends 
LazyIterableIterator<GenericRecord,GenericRecord> {

Review Comment:
   do we have UTs for these.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -33,8 +33,9 @@ object HoodieParquetFileFormatHelper {
     val sparkRequestStructFields = requiredSchema.map(f => {
       val requiredType = f.dataType
       if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, 
fileStructMap(f.name))) {
-        implicitTypeChangeInfo.put(new 
Integer(requiredSchema.fieldIndex(f.name)), 
org.apache.hudi.common.util.collection.Pair.of(requiredType, 
fileStructMap(f.name)))
-        StructField(f.name, fileStructMap(f.name), f.nullable)
+        val readerType = addMissingFields(requiredType, fileStructMap(f.name))

Review Comment:
   we should try and write UTs for these instead of e2e. our run tests run time 
will keep spiking if we end up writing e2e for all combinations. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal {
                          latestTableSchemaOpt: Option[Schema],
                          internalSchemaOpt: Option[InternalSchema],
                          opts: Map[String, String]): Schema = {
+    val addNullForDeletedColumns = 
opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(),
+      
DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean
     val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
     val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
       HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
 
     latestTableSchemaOpt match {
       // In case table schema is empty we're just going to use the source 
schema as a
-      // writer's schema. No additional handling is required
-      case None => sourceSchema
+      // writer's schema.

Review Comment:
   may not be related to this patch. but just curious



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.internal.schema.InternalSchema
+
+/**
+ * Util methods for Schema evolution in Hudi
+ */
+object HoodieSchemaUtils {
+  /**
+   * get latest internalSchema from table
+   *
+   * @param config          instance of {@link HoodieConfig}
+   * @param tableMetaClient instance of HoodieTableMetaClient
+   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
+   */
+  def getLatestTableInternalSchema(config: HoodieConfig,
+                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
+    if 
(!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
+      None
+    } else {
+      try {
+        val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)

Review Comment:
   btw, I saw that you have introduce cantChangeSchema method. 
   lets ensure that schema on read is not broken. esply, when someone triggers 
alter schema from spark-sql layer, guess the latest commit metadata is expected 
to contain the latest table schema. 
   with refactoring in this patch, lets ensure we don't break anything.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to