xushiyan commented on code in PR #5201:
URL: https://github.com/apache/hudi/pull/5201#discussion_r841269885
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -471,11 +471,10 @@ object DataSourceWriteOptions {
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the
records")
- val DROP_PARTITION_COLUMNS: ConfigProperty[String] = ConfigProperty
- .key("hoodie.datasource.write.drop.partition.columns")
- .defaultValue("false")
- .withDocumentation("When set to true, will not write the partition columns
into hudi. " +
- "By default, false.")
+ val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty
Review Comment:
why not just point this to `HoodieTableConfig.DROP_PARTITION_COLUMNS` ?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -308,6 +349,38 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
// TODO(HUDI-3639) vectorized reader has to be disabled to make sure
MORIncrementalRelation is working properly
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
}
+
+ /**
+ * For enable hoodie.datasource.write.drop.partition.columns, need to create
an InternalRow on partition values
+ * and pass this reader on parquet file. So that, we can query the partition
columns.
+ */
+ protected def createPartitionInternalRow(file: FileStatus): InternalRow = {
Review Comment:
this sounds clearer: `getPartitionColumnsAsInternalRow`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +166,51 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws
Exception {
+ Schema schema;
Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(includeMetadataFields);
if (schemaFromCommitMetadata.isPresent()) {
- return schemaFromCommitMetadata.get();
- }
- Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
- if (schemaFromTableConfig.isPresent()) {
- if (includeMetadataFields) {
- return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ schema = schemaFromCommitMetadata.get();
+ } else {
+ Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
+ if (schemaFromTableConfig.isPresent()) {
+ if (includeMetadataFields) {
+ schema =
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ } else {
+ schema = schemaFromTableConfig.get();
+ }
} else {
- return schemaFromTableConfig.get();
+ if (includeMetadataFields) {
+ schema = getTableAvroSchemaFromDataFile();
+ } else {
+ schema =
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+ }
}
}
- if (includeMetadataFields) {
- return getTableAvroSchemaFromDataFile();
- } else {
- return
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+ if (metaClient.getTableConfig().getDropPartitionColumnsWhenWrite()) {
Review Comment:
better extract this out to a separate static helper and UT it
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -209,14 +219,37 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val fileSplits = collectFileSplits(partitionFilters, dataFilters)
- val partitionSchema = StructType(Nil)
- val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
- val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString, requiredInternalSchema)
+ val partitionSchema = if (dropPartitionColumnsWhenWrite) {
+ // when hoodie.datasource.write.drop.partition.columns is true,
partition columns can't be persisted in
+ // data files.
+ StructType(partitionColumns.map(StructField(_, StringType)))
+ } else {
+ StructType(Nil)
+ }
+ val tableSchema = HoodieTableSchema(tableStructSchema, if
(internalSchema.isEmptySchema) tableAvroSchema.toString else
AvroInternalSchemaConverter.convert(internalSchema,
tableAvroSchema.getName).toString, internalSchema)
+ val dataSchema = if (dropPartitionColumnsWhenWrite) {
+ val dataStructType = StructType(tableStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
+ HoodieTableSchema(
+ dataStructType,
+ sparkAdapter.getAvroSchemaConverters.toAvroType(dataStructType,
nullable = false, "record").toString()
+ )
+ } else {
+ tableSchema
+ }
+ val requiredSchema = if (dropPartitionColumnsWhenWrite) {
+ val requiredStructType = StructType(requiredStructSchema.filterNot(f =>
partitionColumns.contains(f.name)))
+ HoodieTableSchema(
+ requiredStructType,
+ sparkAdapter.getAvroSchemaConverters.toAvroType(requiredStructType,
nullable = false, "record").toString()
+ )
+ } else {
+ HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString,
requiredInternalSchema)
+ }
Review Comment:
better to UT these logic; pretty error-prone cases
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -159,23 +166,51 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws
Exception {
+ Schema schema;
Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(includeMetadataFields);
if (schemaFromCommitMetadata.isPresent()) {
- return schemaFromCommitMetadata.get();
- }
- Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
- if (schemaFromTableConfig.isPresent()) {
- if (includeMetadataFields) {
- return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ schema = schemaFromCommitMetadata.get();
+ } else {
+ Option<Schema> schemaFromTableConfig =
metaClient.getTableConfig().getTableCreateSchema();
+ if (schemaFromTableConfig.isPresent()) {
+ if (includeMetadataFields) {
+ schema =
HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(),
hasOperationField);
+ } else {
+ schema = schemaFromTableConfig.get();
+ }
} else {
- return schemaFromTableConfig.get();
+ if (includeMetadataFields) {
+ schema = getTableAvroSchemaFromDataFile();
+ } else {
+ schema =
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+ }
}
}
- if (includeMetadataFields) {
- return getTableAvroSchemaFromDataFile();
- } else {
- return
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+
+ if (metaClient.getTableConfig().getDropPartitionColumnsWhenWrite()) {
+ // when hoodie.datasource.write.drop.partition.columns is true,
partition columns can't be persisted in data files.
+ // And there are no partition schema if the schema is parsed from data
files.
+ // Here we create partition Fields for this case, and use StringType as
the data type.
+ Option<String[]> partitionFieldsOpt =
metaClient.getTableConfig().getPartitionFields();
+ if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length !=
0) {
+ List<String> partitionFields = Arrays.asList(partitionFieldsOpt.get());
+
+ final Schema schema0 = schema;
+ boolean allPartitionColInSchema = partitionFields.stream().allMatch(
+ pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt)
+ );
+
+ if (!allPartitionColInSchema) {
Review Comment:
the logic seems problematic: if 1 out of 3 partition fields not contained,
then all 3 partition fields will be added as extra. we should see EITHER all
partition fields are non-exist and all to be added as extra OR partition fields
is empty and nothing extra added. Any other condition implies unexpected
configs where we need to throw an InvalidConfigException ?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -140,6 +144,12 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected val partitionColumns: Array[String] =
tableConfig.getPartitionFields.orElse(Array.empty)
+ /**
+ * if true, need to deal with schema for creating file reader.
+ */
+ protected val dropPartitionColumnsWhenWrite: Boolean =
+ metaClient.getTableConfig.getDropPartitionColumnsWhenWrite &&
partitionColumns.nonEmpty
Review Comment:
call partitionColumns.nonEmpty first to shortcut condition check ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -420,6 +425,9 @@ public static void create(FileSystem fs, Path
metadataFolder, Properties propert
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
+ if (!hoodieConfig.contains(DROP_PARTITION_COLUMNS)) {
+ hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
+ }
Review Comment:
different topic: L421 set `BOOTSTRAP_BASE_PATH` looks like a bug?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -420,6 +425,9 @@ public static void create(FileSystem fs, Path
metadataFolder, Properties propert
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
+ if (!hoodieConfig.contains(DROP_PARTITION_COLUMNS)) {
+ hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS);
+ }
Review Comment:
`setDefaultValue` already check existence for you. so the if check is
unnecessary
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -593,6 +601,10 @@ public String getUrlEncodePartitioning() {
return getString(URL_ENCODE_PARTITIONING);
}
+ public Boolean getDropPartitionColumnsWhenWrite() {
+ return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
+ }
Review Comment:
`getDropPartitionColumnsWhenWrite` name is a bit verbose? why not just
`isDropPartitionColumns()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]