alexeykudinkin commented on code in PR #5733:
URL: https://github.com/apache/hudi/pull/5733#discussion_r889401027


##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -176,86 +124,25 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
-    Schema schema;
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
-    if (schemaFromCommitMetadata.isPresent()) {
-      schema = schemaFromCommitMetadata.get();
-    } else {
-      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-      if (schemaFromTableConfig.isPresent()) {
-        if (includeMetadataFields) {
-          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-        } else {
-          schema = schemaFromTableConfig.get();
-        }
-      } else {
-        if (includeMetadataFields) {
-          schema = getTableAvroSchemaFromDataFile();
-        } else {
-          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
-        }
-      }
-    }
-
-    Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
-    if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
-      schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, 
schema);
-    }
-    return schema;
+    return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  public static Schema recreateSchemaWhenDropPartitionColumns(Option<String[]> 
partitionFieldsOpt, Schema originSchema) {
-    // when hoodie.datasource.write.drop.partition.columns is true, partition 
columns can't be persisted in data files.
-    // And there are no partition schema if the schema is parsed from data 
files.
-    // Here we create partition Fields for this case, and use StringType as 
the data type.
-    Schema schema = originSchema;
-    if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
-      List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
-
-      final Schema schema0 = originSchema;
-      boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch(
-          pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      boolean hasPartitionColInSchema = partitionFields.stream().anyMatch(
-          pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
-      );
-      if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
-        throw new HoodieIncompatibleSchemaException(
-            "Not support: Partial partition fields are still in the schema "
-                + "when enable 
hoodie.datasource.write.drop.partition.columns");
-      }
-
-      if (hasPartitionColNotInSchema) {
-        // when hasPartitionColNotInSchema is true and hasPartitionColInSchema 
is false, all partition columns
-        // are not in originSchema. So we create and add them.
-        List<Field> newFields = new ArrayList<>();
-        for (String partitionField: partitionFields) {
-          newFields.add(new Schema.Field(
-              partitionField, createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE));
-        }
-        schema = appendFieldsToSchema(schema, newFields);
-      }
-    }
-    return schema;
+  /**
+   * Fetches tables schema in Avro format as of the given instant
+   *
+   * @param instant as of which table's schema will be fetched
+   */
+  public Schema getTableAvroSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
+    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.of(instant));
   }
 
   /**
    * Gets full schema (user + metadata) for a hoodie table in Parquet format.
    *
    * @return Parquet schema for the table
-   * @throws Exception
    */
   public MessageType getTableParquetSchema() throws Exception {
-    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
-    if (schemaFromCommitMetadata.isPresent()) {
-      return convertAvroSchemaToParquet(schemaFromCommitMetadata.get());
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
-      return convertAvroSchemaToParquet(schema);
-    }
-    return getTableParquetSchemaFromDataFile();
+    return convertAvroSchemaToParquet(getTableAvroSchema(true));

Review Comment:
   It was not handled correctly before -- this config has to be handled in all 
code-paths



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to