This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 55855cd68887c40f3666b854273722f2e7e8d430 Author: harshal <harshal.j.pa...@gmail.com> AuthorDate: Wed Aug 23 12:16:47 2023 +0530 [HUDI-6549] Add support for comma separated path format for spark.read.load (#9503) --- .../sources/helpers/CloudObjectsSelectorCommon.java | 11 ++++++++++- .../utilities/sources/helpers/CloudStoreIngestionConfig.java | 12 ++++++++++++ .../sources/helpers/TestCloudObjectsSelectorCommon.java | 1 + 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 4b95cc159cc..6791b47b129 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -53,6 +53,7 @@ import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT; import static org.apache.spark.sql.functions.input_file_name; import static org.apache.spark.sql.functions.split; @@ -181,7 +182,15 @@ public class CloudObjectsSelectorCommon { totalSize *= 1.1; long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue())); int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1); - Dataset<Row> dataset = reader.load(paths.toArray(new String[cloudObjectMetadata.size()])).coalesce(numPartitions); + boolean isCommaSeparatedPathFormat = props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false); + + Dataset<Row> dataset; + if (isCommaSeparatedPathFormat) { + dataset = reader.load(String.join(",", paths)); + } else { + dataset = reader.load(paths.toArray(new String[cloudObjectMetadata.size()])); + } + dataset = dataset.coalesce(numPartitions); // add partition column from source path if configured if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java index fc8591e0cb9..66b94177b7b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java @@ -102,4 +102,16 @@ public class CloudStoreIngestionConfig { */ @Deprecated public static final String DATAFILE_FORMAT = CloudSourceConfig.DATAFILE_FORMAT.key(); + + /** + * A comma delimited list of path-based partition fields in the source file structure + */ + public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path"; + + /** + * boolean value for specifying path format in load args of spark.read.format("..").load("a.xml,b.xml,c.xml"), + * set true if path format needs to be comma separated string value, if false it's passed as array of strings like + * spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml}) + */ + public static final String SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = "hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format"; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index dd467146d51..13818d98c76 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -79,6 +79,7 @@ public class TestCloudObjectsSelectorCommon extends HoodieSparkClientTestHarness public void partitionKeyNotPresentInPath() { List<CloudObjectMetadata> input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); TypedProperties properties = new TypedProperties(); + properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format", "false"); properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "unknown"); Option<Dataset<Row>> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); Assertions.assertTrue(result.isPresent());