xushiyan commented on code in PR #5201:
URL: https://github.com/apache/hudi/pull/5201#discussion_r841269885


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -471,11 +471,10 @@ object DataSourceWriteOptions {
     .sinceVersion("0.9.0")
     .withDocumentation("This class is used by kafka client to deserialize the 
records")
 
-  val DROP_PARTITION_COLUMNS: ConfigProperty[String] = ConfigProperty
-    .key("hoodie.datasource.write.drop.partition.columns")
-    .defaultValue("false")
-    .withDocumentation("When set to true, will not write the partition columns 
into hudi. " +
-      "By default, false.")
+  val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty

Review Comment:
   why not just point this to `HoodieTableConfig.DROP_PARTITION_COLUMNS` ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -308,6 +349,38 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     // TODO(HUDI-3639) vectorized reader has to be disabled to make sure 
MORIncrementalRelation is working properly
     
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
   }
+
+  /**
+   * For enable hoodie.datasource.write.drop.partition.columns, need to create 
an InternalRow on partition values
+   * and pass this reader on parquet file. So that, we can query the partition 
columns.
+   */
+  protected def createPartitionInternalRow(file: FileStatus): InternalRow = {

Review Comment:
   this sounds clearer: `getPartitionColumnsAsInternalRow`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +166,51 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    if (metaClient.getTableConfig().getDropPartitionColumnsWhenWrite()) {

Review Comment:
   better extract this out to a separate static helper and UT it



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
     val fileSplits = collectFileSplits(partitionFilters, dataFilters)
 
-    val partitionSchema = StructType(Nil)
-    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
-    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString, requiredInternalSchema)
+    val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in
+      // data files.
+      StructType(partitionColumns.map(StructField(_, StringType)))
+    } else {
+      StructType(Nil)
+    }
 
+    val tableSchema = HoodieTableSchema(tableStructSchema, if 
(internalSchema.isEmptySchema) tableAvroSchema.toString else 
AvroInternalSchemaConverter.convert(internalSchema, 
tableAvroSchema.getName).toString, internalSchema)
+    val dataSchema = if (dropPartitionColumnsWhenWrite) {
+      val dataStructType = StructType(tableStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+      HoodieTableSchema(
+        dataStructType,
+        sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType, 
nullable = false, "record").toString()
+      )
+    } else {
+      tableSchema
+    }
+    val requiredSchema = if (dropPartitionColumnsWhenWrite) {
+      val requiredStructType = StructType(requiredStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+      HoodieTableSchema(
+        requiredStructType,
+        sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType, 
nullable = false, "record").toString()
+      )
+    } else {
+      HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, 
requiredInternalSchema)
+    }

Review Comment:
   better to UT these logic; pretty error-prone cases



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +166,51 @@ public Schema getTableAvroSchema() throws Exception {
    * @throws Exception
    */
   public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
+    Schema schema;
     Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(includeMetadataFields);
     if (schemaFromCommitMetadata.isPresent()) {
-      return schemaFromCommitMetadata.get();
-    }
-    Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
-    if (schemaFromTableConfig.isPresent()) {
-      if (includeMetadataFields) {
-        return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+      schema = schemaFromCommitMetadata.get();
+    } else {
+      Option<Schema> schemaFromTableConfig = 
metaClient.getTableConfig().getTableCreateSchema();
+      if (schemaFromTableConfig.isPresent()) {
+        if (includeMetadataFields) {
+          schema = 
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), 
hasOperationField);
+        } else {
+          schema = schemaFromTableConfig.get();
+        }
       } else {
-        return schemaFromTableConfig.get();
+        if (includeMetadataFields) {
+          schema = getTableAvroSchemaFromDataFile();
+        } else {
+          schema = 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+        }
       }
     }
-    if (includeMetadataFields) {
-      return getTableAvroSchemaFromDataFile();
-    } else {
-      return 
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+    if (metaClient.getTableConfig().getDropPartitionColumnsWhenWrite()) {
+      // when hoodie.datasource.write.drop.partition.columns is true, 
partition columns can't be persisted in data files.
+      // And there are no partition schema if the schema is parsed from data 
files.
+      // Here we create partition Fields for this case, and use StringType as 
the data type.
+      Option<String[]> partitionFieldsOpt = 
metaClient.getTableConfig().getPartitionFields();
+      if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 
0) {
+        List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
+
+        final Schema schema0 = schema;
+        boolean allPartitionColInSchema = partitionFields.stream().allMatch(
+            pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
+        );
+
+        if (!allPartitionColInSchema) {

Review Comment:
   the logic seems problematic: if 1 out of 3 partition fields not contained, 
then all 3 partition fields will be added as extra. we should see EITHER all 
partition fields are non-exist and all to be added as extra OR partition fields 
is empty and nothing extra added. Any other condition implies unexpected 
configs where we need to throw an InvalidConfigException ?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -140,6 +144,12 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
 
+  /**
+   * if true, need to deal with schema for creating file reader.
+   */
+  protected val dropPartitionColumnsWhenWrite: Boolean =
+    metaClient.getTableConfig.getDropPartitionColumnsWhenWrite && 
partitionColumns.nonEmpty

Review Comment:
   call partitionColumns.nonEmpty first to shortcut condition check ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -420,6 +425,9 @@ public static void create(FileSystem fs, Path 
metadataFolder, Properties propert
       if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
         
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
       }
+      if (!hoodieConfig.contains(DROP_PARTITION_COLUMNS)) {
+        hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
+      }

Review Comment:
   different topic: L421 set `BOOTSTRAP_BASE_PATH` looks like a bug?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -420,6 +425,9 @@ public static void create(FileSystem fs, Path 
metadataFolder, Properties propert
       if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
         
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
       }
+      if (!hoodieConfig.contains(DROP_PARTITION_COLUMNS)) {
+        hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
+      }

Review Comment:
   `setDefaultValue` already check existence for you. so the if check is 
unnecessary



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -593,6 +601,10 @@ public String getUrlEncodePartitioning() {
     return getString(URL_ENCODE_PARTITIONING);
   }
 
+  public Boolean getDropPartitionColumnsWhenWrite() {
+    return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
+  }

Review Comment:
   `getDropPartitionColumnsWhenWrite` name is a bit verbose? why not just 
`isDropPartitionColumns()` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to