vinothchandar commented on code in PR #10876:
URL: https://github.com/apache/hudi/pull/10876#discussion_r1534279513


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -480,6 +480,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation(BulkInsertSortMode.class);
 
+  public static final ConfigProperty<Boolean> INSERT_SORT = ConfigProperty
+      .key("hoodie.insert.sort")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("Determines whether the insert operation should sort 
the input records. The sorting for insert is always"
+          + " global (among all input records in a batch)");
+
+  public static final ConfigProperty<String> INSERT_USER_DEFINED_SORT_COLUMNS 
= ConfigProperty
+      .key("hoodie.insert.user.defined.sort.columns")

Review Comment:
   lets make sure its consistent in naming with bulk_insert 's config.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -230,6 +236,10 @@ protected Partitioner getPartitioner(WorkloadProfile 
profile) {
   }
 
   private HoodieData<WriteStatus> 
mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner 
partitioner) {
+    if (operationRequiresSorting()) {

Review Comment:
   so technically - this works for both insert and upsert operations? or just 
insert? If both, then we can't name the configs just around `insert`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);

Review Comment:
   ok here, we are skipping upserts. but should this be done for upserts too? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {

Review Comment:
   lets UT this method?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))
+          .map(String::trim).toArray(String[]::new);
+    }
+
+    // Get the record's schema from the write config
+    SerializableSchema serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+
+    JavaRDD<HoodieRecord<T>> javaRdd = 
HoodieJavaRDD.getJavaRDD(dedupedRecords);
+    JavaRDD<HoodieRecord<T>> sortedRecords = javaRdd.sortBy(record -> {
+      if (isNullOrEmpty(customSortColField)) {
+        // If sorting based on record-key, extract it directly using 
record.getRecordKey()
+        return new StringBuilder()
+            .append(record.getPartitionPath())

Review Comment:
   comments are bit misleading. its partitionpath + recordkey. right the sort 
key. lets make that clear in docs and comments/



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {

Review Comment:
   also rename? this is performing the actual write . `sortIfNeededAndWrite` ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -480,6 +480,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation(BulkInsertSortMode.class);
 
+  public static final ConfigProperty<Boolean> INSERT_SORT = ConfigProperty
+      .key("hoodie.insert.sort")
+      .defaultValue(false)
+      .markAdvanced()
+      .withDocumentation("Determines whether the insert operation should sort 
the input records. The sorting for insert is always"
+          + " global (among all input records in a batch)");
+
+  public static final ConfigProperty<String> INSERT_USER_DEFINED_SORT_COLUMNS 
= ConfigProperty
+      .key("hoodie.insert.user.defined.sort.columns")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("Columns to sort the data by when hoodie.insert.sort 
is set to true. If not specified, record-key is used for sorting."

Review Comment:
   should n't we be sorting by partitionpath + recordkey  ?  Sorted record keys 
across partitions can belong to different partition paths right



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))
+          .map(String::trim).toArray(String[]::new);
+    }
+
+    // Get the record's schema from the write config
+    SerializableSchema serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+
+    JavaRDD<HoodieRecord<T>> javaRdd = 
HoodieJavaRDD.getJavaRDD(dedupedRecords);
+    JavaRDD<HoodieRecord<T>> sortedRecords = javaRdd.sortBy(record -> {
+      if (isNullOrEmpty(customSortColField)) {
+        // If sorting based on record-key, extract it directly using 
record.getRecordKey()
+        return new StringBuilder()
+            .append(record.getPartitionPath())
+            .append("+")
+            .append(record.getRecordKey())
+            .toString();
+      } else {
+        // Extract the sort columns from the record and return it as  string 
(prepended with partition-path)
+        Object[] columnValues = 
record.getColumnValues(serializableSchema.get(), sortColumns, false);
+        String sortColString = 
Arrays.stream(columnValues).map(Object::toString).collect(Collectors.joining());
+        return new StringBuilder()
+            .append(record.getPartitionPath())
+            .append("+")
+            .append(sortColString)
+            .toString();
+      }
+    }, true, 0);
+
+    // Assign index to each record in the RDD
+    JavaRDD<Pair<HoodieRecord<T>, Long>> indexedRecords = 
sortedRecords.zipWithIndex()

Review Comment:
   this is its own job and adds cost to assign a unique number, right? is this 
needed



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))
+          .map(String::trim).toArray(String[]::new);
+    }
+
+    // Get the record's schema from the write config
+    SerializableSchema serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+
+    JavaRDD<HoodieRecord<T>> javaRdd = 
HoodieJavaRDD.getJavaRDD(dedupedRecords);
+    JavaRDD<HoodieRecord<T>> sortedRecords = javaRdd.sortBy(record -> {

Review Comment:
   instead of this additional sort stage, can't we use 
`repartitionAndSortWithinPartitions` on the existing upsert partitioner ? this 
is an extra shuffle of data, which we should avoid. yeah?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -480,6 +480,20 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation(BulkInsertSortMode.class);
 
+  public static final ConfigProperty<Boolean> INSERT_SORT = ConfigProperty

Review Comment:
   can we tie this to the same sort modes in bulk_insert? I want to see if we 
can make the configs uniform across bulk_insert or insert. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then

Review Comment:
   +1. shuffle will give sorting within partition for free.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {

Review Comment:
   there should be a comparator lambda in another class that we can re-use, no? 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -394,6 +404,12 @@ public Partitioner getUpsertPartitioner(WorkloadProfile 
profile) {
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
+
+    if (operationRequiresSorting()) {
+      // Return UpsertSortPartitioner if the input records are going to be 
sorted
+      return new UpsertSortPartitioner<>(profile, context, table, config);
+    }

Review Comment:
   nit: switch to if-else block



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));

Review Comment:
   `true` is probably right. we need the partitioning preserved.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))

Review Comment:
   do we need the `.split(,")`. here



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))

Review Comment:
   left a comment already. idk how this works for partitioned tables?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))
+          .map(String::trim).toArray(String[]::new);
+    }
+
+    // Get the record's schema from the write config
+    SerializableSchema serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+
+    JavaRDD<HoodieRecord<T>> javaRdd = 
HoodieJavaRDD.getJavaRDD(dedupedRecords);
+    JavaRDD<HoodieRecord<T>> sortedRecords = javaRdd.sortBy(record -> {
+      if (isNullOrEmpty(customSortColField)) {
+        // If sorting based on record-key, extract it directly using 
record.getRecordKey()
+        return new StringBuilder()
+            .append(record.getPartitionPath())
+            .append("+")
+            .append(record.getRecordKey())
+            .toString();
+      } else {
+        // Extract the sort columns from the record and return it as  string 
(prepended with partition-path)
+        Object[] columnValues = 
record.getColumnValues(serializableSchema.get(), sortColumns, false);
+        String sortColString = 
Arrays.stream(columnValues).map(Object::toString).collect(Collectors.joining());
+        return new StringBuilder()
+            .append(record.getPartitionPath())
+            .append("+")
+            .append(sortColString)
+            .toString();
+      }
+    }, true, 0);
+
+    // Assign index to each record in the RDD
+    JavaRDD<Pair<HoodieRecord<T>, Long>> indexedRecords = 
sortedRecords.zipWithIndex()

Review Comment:
   Rdd suffix,



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -411,4 +427,90 @@ public Partitioner getLayoutPartitioner(WorkloadProfile 
profile, String layoutPa
   protected void 
runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> 
writeMetadata) {
     SparkValidatorUtils.runValidators(config, writeMetadata, context, table, 
instantTime);
   }
+
+  private HoodieData<WriteStatus> 
sortAndMapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, 
Partitioner partitioner) {
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> mappedRDD = 
getSortedIndexedRecords(dedupedRecords);
+    JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> partitionedRDD;
+    if (table.requireSortedRecords()) {
+      // Partition and sort within each partition as a single step. This is 
faster than partitioning first and then
+      // applying a sort.
+      Comparator<Tuple2<HoodieKey, Long>> comparator = 
(Comparator<Tuple2<HoodieKey, Long>> & Serializable) (t1, t2) -> {
+        HoodieKey key1 = t1._1();
+        HoodieKey key2 = t2._1();
+        return key1.getRecordKey().compareTo(key2.getRecordKey());
+      };
+      partitionedRDD = 
mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
+    } else {
+      // Partition only
+      partitionedRDD = mappedRDD.partitionBy(partitioner);
+    }
+
+    return 
HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition,
 recordItr) -> {
+      if (WriteOperationType.isChangingRecords(operationType)) {
+        return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
+      } else {
+        return handleInsertPartition(instantTime, partition, recordItr, 
partitioner);
+      }
+    }, true).flatMap(List::iterator));
+  }
+
+  private boolean operationRequiresSorting() {
+    return operationType == WriteOperationType.INSERT && 
config.getBoolean(INSERT_SORT);
+  }
+
+  private JavaPairRDD<Tuple2<HoodieKey, Long>, HoodieRecord<T>> 
getSortedIndexedRecords(HoodieData<HoodieRecord<T>> dedupedRecords) {
+    // Get any user specified sort columns
+    String customSortColField = 
config.getString(INSERT_USER_DEFINED_SORT_COLUMNS);
+
+    String[] sortColumns;
+    if (!isNullOrEmpty(customSortColField)) {
+      // Extract user specified sort-column fields as an array
+      sortColumns = Arrays.stream(customSortColField.split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      // Use record-key as sort column
+      sortColumns = 
Arrays.stream(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName().split(","))
+          .map(String::trim).toArray(String[]::new);
+    }
+
+    // Get the record's schema from the write config
+    SerializableSchema serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+
+    JavaRDD<HoodieRecord<T>> javaRdd = 
HoodieJavaRDD.getJavaRDD(dedupedRecords);
+    JavaRDD<HoodieRecord<T>> sortedRecords = javaRdd.sortBy(record -> {

Review Comment:
   add `Rdd` suffix, to denote that this is an RDD. makes it easier to 
read/follow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to