yihua commented on code in PR #5664: URL: https://github.com/apache/hudi/pull/5664#discussion_r889380010
########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java: ########## @@ -87,6 +89,7 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo this.populateMetaFields = populateMetaFields; this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); + this.isHiveStylePartitioning = writeConfig.isHiveStylePartitioningEnabled(); Review Comment: nit: `writeConfig` is saved inside this helper so we don't need to have another member variable `isHiveStylePartitioning`? ########## hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java: ########## @@ -109,6 +109,48 @@ public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) t } } + @Test + public void testDataInternalWriterHiveStylePartitioning() throws Exception { + boolean sorted = true; + boolean populateMetaFields = false; + // init config and table + HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true"); + HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); + for (int i = 0; i < 1; i++) { + String instantTime = "00" + i; + // init writer + HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), + STRUCT_TYPE, populateMetaFields, sorted); + + int size = 10 + RANDOM.nextInt(1000); + // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file + int batches = 3; + Dataset<Row> totalInputRows = null; + + for (int j = 0; j < batches; j++) { + String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; + Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false); + writeRows(inputRows, writer); + if (totalInputRows == null) { + totalInputRows = inputRows; + } else { + totalInputRows = totalInputRows.union(inputRows); + } + } + + BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) writer.commit(); + Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>()); + Option<List<String>> fileNames = Option.of(new ArrayList<>()); + + // verify write statuses + assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, sorted, fileAbsPaths, fileNames); + + // verify rows + Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0])); + assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields); Review Comment: Do we want to validate the hive-style partition path value somewhere? ########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java: ########## @@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException { if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen partitionPath = ""; } else if (simpleKeyGen) { // SimpleKeyGen - partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString(); + Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); + partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; + if (isHiveStylePartitioning) { + partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath; Review Comment: For `SimpleKeyGenerator`, there could be only one partition path field. Is that correct? ########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java: ########## @@ -128,7 +133,11 @@ public void write(InternalRow record) throws IOException { if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen partitionPath = ""; } else if (simpleKeyGen) { // SimpleKeyGen - partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString(); + Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); + partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; + if (isHiveStylePartitioning) { + partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath; Review Comment: @nsivabalan could you simply leverage `SimpleKeyGenerator::getPartitionPath(GenericRecord record)` or `KeyGenUtils::getPartitionPath` API instead of hardcoding the value construction here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org