This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 55855cd68887c40f3666b854273722f2e7e8d430
Author: harshal <harshal.j.pa...@gmail.com>
AuthorDate: Wed Aug 23 12:16:47 2023 +0530

    [HUDI-6549] Add support for comma separated path format for spark.read.load 
(#9503)
---
 .../sources/helpers/CloudObjectsSelectorCommon.java          | 11 ++++++++++-
 .../utilities/sources/helpers/CloudStoreIngestionConfig.java | 12 ++++++++++++
 .../sources/helpers/TestCloudObjectsSelectorCommon.java      |  1 +
 3 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 4b95cc159cc..6791b47b129 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -53,6 +53,7 @@ import static 
org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.PATH_BASED_PARTITION_FIELDS;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT;
 import static org.apache.spark.sql.functions.input_file_name;
 import static org.apache.spark.sql.functions.split;
 
@@ -181,7 +182,15 @@ public class CloudObjectsSelectorCommon {
     totalSize *= 1.1;
     long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
     int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
-    Dataset<Row> dataset = reader.load(paths.toArray(new 
String[cloudObjectMetadata.size()])).coalesce(numPartitions);
+    boolean isCommaSeparatedPathFormat = 
props.getBoolean(SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT, false);
+
+    Dataset<Row> dataset;
+    if (isCommaSeparatedPathFormat) {
+      dataset = reader.load(String.join(",", paths));
+    } else {
+      dataset = reader.load(paths.toArray(new 
String[cloudObjectMetadata.size()]));
+    }
+    dataset = dataset.coalesce(numPartitions);
 
     // add partition column from source path if configured
     if (containsConfigProperty(props, PATH_BASED_PARTITION_FIELDS)) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
index fc8591e0cb9..66b94177b7b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java
@@ -102,4 +102,16 @@ public class CloudStoreIngestionConfig {
    */
   @Deprecated
   public static final String DATAFILE_FORMAT = 
CloudSourceConfig.DATAFILE_FORMAT.key();
+
+  /**
+   * A comma delimited list of path-based partition fields in the source file 
structure
+   */
+  public static final String PATH_BASED_PARTITION_FIELDS = 
"hoodie.deltastreamer.source.cloud.data.partition.fields.from.path";
+
+  /**
+   * boolean value for specifying path format in load args of 
spark.read.format("..").load("a.xml,b.xml,c.xml"),
+   * set true if path format needs to be comma separated string value, if 
false it's passed as array of strings like
+   * spark.read.format("..").load(new String[]{a.xml,b.xml,c.xml})
+   */
+  public static final String 
SPARK_DATASOURCE_READER_COMMA_SEPARATED_PATH_FORMAT = 
"hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format";
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index dd467146d51..13818d98c76 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -79,6 +79,7 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
   public void partitionKeyNotPresentInPath() {
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
     TypedProperties properties = new TypedProperties();
+    
properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format",
 "false");
     
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
 "unknown");
     Option<Dataset<Row>> result = 
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, 
"json");
     Assertions.assertTrue(result.isPresent());

Reply via email to