alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r969043938


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -113,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
     return writeMetadata;
   }
 
+  /**
+   * Execute clustering to write inputRecords into new files based on 
strategyParams.
+   * Different from {@link performClusteringWithRecordsRDD}, this method take 
{@link Dataset<Row>}
+   * as inputs.
+   */
+  public abstract HoodieData<WriteStatus> 
performClusteringWithRecordsRow(final Dataset<Row> inputRecords, final int 
numOutputGroups, final String instantTime,

Review Comment:
   This style of wrapping (while acceptable under recognized code style-guide) 
makes it quite hard to read on laptop screen: 
   
   <img width="822" alt="Screen Shot 2022-09-12 at 5 48 53 PM" 
src="https://user-images.githubusercontent.com/428277/189783507-51e4138e-e9fa-48ba-8b54-a798a31c200c.png";>
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  private final String[] orderByColumns;

Review Comment:
   Let's extract common base class `SpatialCurveSortPartitionerBase` so that we 
can reuse as much code as possible and avoid duplication



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -98,10 +106,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
     // execute clustering for each group async and collect WriteStatus
     Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
         clusteringPlan.getInputGroups().stream()
-        .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
-            clusteringPlan.getStrategy().getStrategyParams(),
-            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-            instantTime))
+            .map(inputGroup -> {
+              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
+                return runClusteringForGroupAsyncWithRow(inputGroup,
+                    clusteringPlan.getStrategy().getStrategyParams(),
+                    
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),

Review Comment:
   Please extract common expression (to `shouldPreserveMetadata`)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() {
   }
 
   public boolean isEmbeddedTimelineServerReuseEnabled() {
-    return 
Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
+    return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);

Review Comment:
   We need to do `getBooleanOrDefault`, otherwise it might NPE (due to unboxing)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertRowWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class SparkBulkInsertRowWriter {
+
+  /**
+   * Perform bulk insert for {@link Dataset<Row>}, will not change 
timeline/index, return
+   * information about write files.
+   */
+  public static HoodieData<WriteStatus> bulkInsert(Dataset<Row> dataset,
+                                            String instantTime,
+                                            HoodieTable table,
+                                            HoodieWriteConfig writeConfig,
+                                            
BulkInsertPartitioner<Dataset<Row>> partitioner,
+                                            int parallelism,
+                                            boolean preserveHoodieMetadata) {
+    Dataset<Row> repartitionedDataset = 
partitioner.repartitionRecords(dataset, parallelism);
+
+    boolean arePartitionRecordsSorted = 
partitioner.arePartitionRecordsSorted();
+    StructType schema = dataset.schema();
+    List<WriteStatus> writeStatuses = 
repartitionedDataset.queryExecution().toRdd().toJavaRDD().mapPartitions(
+        (FlatMapFunction<Iterator<InternalRow>, WriteStatus>) rowIterator -> {
+          TaskContextSupplier taskContextSupplier = 
table.getTaskContextSupplier();
+          int taskPartitionId = 
taskContextSupplier.getPartitionIdSupplier().get();
+          long taskId = taskContextSupplier.getStageIdSupplier().get();
+          long taskEpochId = taskContextSupplier.getAttemptIdSupplier().get();
+
+          final BulkInsertDataInternalWriterHelper writer =
+              new BulkInsertDataInternalWriterHelper(table, writeConfig, 
instantTime, taskPartitionId, taskId, taskEpochId,
+                  schema, writeConfig.populateMetaFields(), 
arePartitionRecordsSorted, preserveHoodieMetadata);
+          while (rowIterator.hasNext()) {
+            writer.write(rowIterator.next());
+          }
+          return writer.getWriteStatuses()
+              .stream()
+              .map(internalWriteStatus -> {
+                WriteStatus status = new WriteStatus(
+                    internalWriteStatus.isTrackSuccessRecords(), 
internalWriteStatus.getFailureFraction());
+                status.setFileId(internalWriteStatus.getFileId());
+                status.setTotalRecords(internalWriteStatus.getTotalRecords());
+                
status.setPartitionPath(internalWriteStatus.getPartitionPath());
+                status.setStat(internalWriteStatus.getStat());
+                return status;
+              }).iterator();
+        }).collect();

Review Comment:
   @nsivabalan FYI: this is what we've talked about last week -- we dereference 
RDD/Dataframe at a well-defined point in the workflow and the convert to a list 
of `WriteStatus`s



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -52,6 +55,27 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table,
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> 
inputRecords, int numOutputGroups,
+                                                                 String 
instantTime, Map<String, String> strategyParams, Schema schema,
+                                                                 
List<HoodieFileGroupId> fileGroupIdList, boolean preserveHoodieMetadata) {
+    LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
+    HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
+        .withBulkInsertParallelism(numOutputGroups)
+        .withProps(getWriteConfig().getProps()).build();
+
+    boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata;

Review Comment:
   Why do we need to override this here?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java:
##########
@@ -54,6 +57,35 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable 
table,
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> 
inputRecords,
+                                                                 int 
numOutputGroups,
+                                                                 String 
instantTime,
+                                                                 Map<String, 
String> strategyParams,
+                                                                 Schema schema,
+                                                                 
List<HoodieFileGroupId> fileGroupIdList,
+                                                                 boolean 
preserveHoodieMetadata) {
+    if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
+      throw new HoodieClusteringException("Expect only one file group for 
strategy: " + getClass().getName());
+    }
+    LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups 
+ " commit:" + instantTime);
+
+    HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
+        .withBulkInsertParallelism(numOutputGroups)
+        .withProps(getWriteConfig().getProps()).build();
+
+    boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata;
+    if (!newConfig.populateMetaFields() && preserveHoodieMetadata) {
+      LOG.warn("Will setting preserveHoodieMetadata to false as 
populateMetaFields is false");
+      shouldPreserveHoodieMetadata = false;
+    }
+
+    newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));

Review Comment:
   Let's duplicate the comment from the original method as well



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -98,10 +106,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
     // execute clustering for each group async and collect WriteStatus
     Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
         clusteringPlan.getInputGroups().stream()
-        .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
-            clusteringPlan.getStrategy().getStrategyParams(),
-            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-            instantTime))
+            .map(inputGroup -> {
+              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {

Review Comment:
   Let's abstract this as a method in WriteConfig



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertRowWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class SparkBulkInsertRowWriter {

Review Comment:
   Let's consolidate this one w/ `HoodieDatasetBulkInsertHelper`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java:
##########
@@ -118,7 +126,8 @@ public BulkInsertDataInternalWriterHelper(HoodieTable 
hoodieTable, HoodieWriteCo
   private Option<BuiltinKeyGenerator> getKeyGenerator(Properties properties) {
     TypedProperties typedProperties = new TypedProperties();
     typedProperties.putAll(properties);
-    if 
(properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()).equals(NonpartitionedKeyGenerator.class.getName()))
 {
+    if 
(Option.ofNullable(properties.get(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key()))

Review Comment:
   Why are we not instantiating Non-partitioned KG?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -70,6 +70,8 @@ public class HoodieRowCreateHandle implements Serializable {
   private final UTF8String commitTime;
   private final Function<Long, String> seqIdGenerator;
 
+  private final boolean preserveHoodieMetadata;

Review Comment:
   nit: `shouldPreserveHoodieMetadata`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getPartitioner(Map<Str
       switch (layoutOptStrategy) {
         case ZORDER:
         case HILBERT:
-          return new RDDSpatialCurveSortPartitioner(
+          return isRowPartitioner
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
+              : new RDDSpatialCurveSortPartitioner(
               (HoodieSparkEngineContext) getEngineContext(),
               orderByColumns,
               layoutOptStrategy,
               getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
               HoodieAvroUtils.addMetadataFields(schema));
         case LINEAR:
-          return new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
+          return isRowPartitioner
+              ? new RowCustomColumnsSortPartitioner(orderByColumns)
+              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
               getWriteConfig().isConsistentLogicalTimestampEnabled());
         default:
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
-    
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
+    }).orElse(isRowPartitioner ? 
BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig().getBulkInsertSortMode())
 :
+        
BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
   }
 
   /**
-   * Submit job to execute clustering for the group.
+   * Submit job to execute clustering for the group with RDD APIs.
    */
-  private CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, 
String> strategyParams,
-                                                                             
boolean preserveHoodieMetadata, String instantTime) {
+  private CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncWithRDD(HoodieClusteringGroup clusteringGroup, 
Map<String, String> strategyParams,

Review Comment:
   Let's stay consistent with the rest of the codebase how we identify row or 
avro workflows:
    - Avro/HoodieRecord keep their existing names
    - Row counterparts get `AsRow` suffix
   
   In this case suggest going w/ `runClusteringForGroupAsync` for existing one 
and `runClusteringForGroupAsyncAsRow` for the new one.
   
   WDYT?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  private final String[] orderByColumns;
+  private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
+  private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+
+  public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) {
+    this.layoutOptStrategy = config.getLayoutOptimizationStrategy();
+    if (config.getClusteringSortColumns() != null) {
+      this.orderByColumns = 
Arrays.stream(config.getClusteringSortColumns().split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      throw new IllegalArgumentException("The config "
+          + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() + " must 
be provided");
+    }
+    this.curveCompositionStrategyType = 
config.getLayoutOptimizationCurveBuildMethod();
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputPartitions) {
+    return reorder(records, outputPartitions);
+  }
+
+  private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {

Review Comment:
   This method for ex, is the same for both Avro/Row impls. We should certainly 
reuse it (instead of duplicating)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getPartitioner(Map<Str
       switch (layoutOptStrategy) {
         case ZORDER:
         case HILBERT:
-          return new RDDSpatialCurveSortPartitioner(
+          return isRowPartitioner
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
+              : new RDDSpatialCurveSortPartitioner(
               (HoodieSparkEngineContext) getEngineContext(),
               orderByColumns,
               layoutOptStrategy,
               getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
               HoodieAvroUtils.addMetadataFields(schema));
         case LINEAR:
-          return new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
+          return isRowPartitioner
+              ? new RowCustomColumnsSortPartitioner(orderByColumns)
+              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema),
               getWriteConfig().isConsistentLogicalTimestampEnabled());

Review Comment:
   Same as above



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getPartitioner(Map<Str
       switch (layoutOptStrategy) {
         case ZORDER:
         case HILBERT:
-          return new RDDSpatialCurveSortPartitioner(
+          return isRowPartitioner
+              ? new RowSpatialCurveSortPartitioner(getWriteConfig())
+              : new RDDSpatialCurveSortPartitioner(
               (HoodieSparkEngineContext) getEngineContext(),

Review Comment:
   Please fix alignment



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -52,6 +55,27 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table,
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> 
inputRecords, int numOutputGroups,

Review Comment:
   Let's create a Jira to implement `HoodieData` wrapping around `DataFrame` 
(you can assign one to myself)
   
   In that case we won't need to duplicate these methods and will simply be 
able to parameterize them.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -273,6 +330,60 @@ private HoodieData<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContex
         .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
+                                                   HoodieClusteringGroup 
clusteringGroup,
+                                                   String instantTime) {
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream()
+        .map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+    SQLContext sqlContext = new SQLContext(jsc.sc());
+
+    String[] baseFilePaths = clusteringOps
+        .stream()
+        .map(op -> {
+          ArrayList<String> readPaths = new ArrayList<>();
+          if (op.getBootstrapFilePath() != null) {
+            readPaths.add(op.getBootstrapFilePath());
+          }
+          if (op.getDataFilePath() != null) {
+            readPaths.add(op.getDataFilePath());
+          }
+          return readPaths;
+        })
+        .flatMap(Collection::stream)
+        .filter(path -> !path.isEmpty())
+        .toArray(String[]::new);
+    String[] deltaPaths = clusteringOps
+        .stream()
+        .filter(op -> !op.getDeltaFilePaths().isEmpty())
+        .flatMap(op -> op.getDeltaFilePaths().stream())
+        .toArray(String[]::new);
+
+    Dataset<Row> inputRecords;
+    if (hasLogFiles) {
+      String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
+          .orElse("0.75");
+      String[] paths = CollectionUtils.combine(baseFilePaths, deltaPaths);
+      inputRecords = sqlContext.read()

Review Comment:
   Same feedback as with the write-path: we can't use Spark DataSource in here 
for mostly the same reasons -- it violates layering and could lead to subtle 
bugs. 
   
   Instead let's extract following portion of the `createRelation` method and 
reuse it directly here:
   
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala#L125
   
   Like following:
   ```
   val relation = DefaultSource.createRelation(...)
   val df = sparkSession.baseRelationToDataFrame(relation)
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -273,6 +330,60 @@ private HoodieData<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContex
         .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
+                                                   HoodieClusteringGroup 
clusteringGroup,
+                                                   String instantTime) {
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream()
+        .map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+    SQLContext sqlContext = new SQLContext(jsc.sc());
+
+    String[] baseFilePaths = clusteringOps
+        .stream()
+        .map(op -> {
+          ArrayList<String> readPaths = new ArrayList<>();
+          if (op.getBootstrapFilePath() != null) {
+            readPaths.add(op.getBootstrapFilePath());
+          }
+          if (op.getDataFilePath() != null) {
+            readPaths.add(op.getDataFilePath());
+          }
+          return readPaths;
+        })
+        .flatMap(Collection::stream)
+        .filter(path -> !path.isEmpty())
+        .toArray(String[]::new);
+    String[] deltaPaths = clusteringOps
+        .stream()
+        .filter(op -> !op.getDeltaFilePaths().isEmpty())
+        .flatMap(op -> op.getDeltaFilePaths().stream())
+        .toArray(String[]::new);
+
+    Dataset<Row> inputRecords;
+    if (hasLogFiles) {
+      String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))

Review Comment:
   The only part that differs b/w these branches are the options composition. 
Let's extract the common part and only keep options composition under 
conditional



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner extends 
RowCustomColumnsSortPartitioner {
+
+  private final String[] orderByColumns;
+  private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
+  private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+
+  public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) {
+    super(config);
+    this.layoutOptStrategy = config.getLayoutOptimizationStrategy();
+    if (config.getClusteringSortColumns() != null) {
+      this.orderByColumns = 
Arrays.stream(config.getClusteringSortColumns().split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      this.orderByColumns = getSortColumnNames();
+    }
+    this.curveCompositionStrategyType = 
config.getLayoutOptimizationCurveBuildMethod();
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputPartitions) {
+    return reorder(records, outputPartitions);

Review Comment:
   SG



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -113,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
     return writeMetadata;
   }
 
+  /**
+   * Execute clustering to write inputRecords into new files based on 
strategyParams.
+   * Different from {@link performClusteringWithRecordsRDD}, this method take 
{@link Dataset<Row>}
+   * as inputs.
+   */
+  public abstract HoodieData<WriteStatus> 
performClusteringWithRecordsRow(final Dataset<Row> inputRecords, final int 
numOutputGroups, final String instantTime,

Review Comment:
   Stacked one for comparison:
   
   <img width="710" alt="Screen Shot 2022-09-12 at 5 50 34 PM" 
src="https://user-images.githubusercontent.com/428277/189783700-35472b54-224c-4bd1-8f39-42952d1b5bf4.png";>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to