alexeykudinkin commented on code in PR #6046:
URL: https://github.com/apache/hudi/pull/6046#discussion_r951930671


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -98,10 +110,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
     // execute clustering for each group async and collect WriteStatus
     Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
         clusteringPlan.getInputGroups().stream()
-        .map(inputGroup -> runClusteringForGroupAsync(inputGroup,
-            clusteringPlan.getStrategy().getStrategyParams(),
-            
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
-            instantTime))
+            .map(inputGroup -> {
+              if 
(Boolean.parseBoolean(getWriteConfig().getString(HoodieClusteringConfig.CLUSTERING_AS_ROW)))
 {

Review Comment:
   Let's abstract this as a method in `WriteConfig`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner extends 
RowCustomColumnsSortPartitioner {

Review Comment:
   Why do we inherit from `RowCustomColumnsSortPartitioner`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -273,6 +398,62 @@ private HoodieData<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContex
         .map(record -> transform(record, writeConfig)));
   }
 
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
+                                                   HoodieClusteringGroup 
clusteringGroup,
+                                                   String instantTime) {
+    List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream()
+        .map(ClusteringOperation::create).collect(Collectors.toList());
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+    SQLContext sqlContext = new SQLContext(jsc.sc());
+
+    String[] baseFilePaths = clusteringOps
+        .stream()
+        .map(op -> {
+          ArrayList<String> pairs = new ArrayList<>();
+          if (op.getBootstrapFilePath() != null) {
+            pairs.add(op.getBootstrapFilePath());
+          }
+          if (op.getDataFilePath() != null) {
+            pairs.add(op.getDataFilePath());
+          }
+          return pairs;
+        })
+        .flatMap(Collection::stream)
+        .filter(path -> !path.isEmpty())
+        .toArray(String[]::new);
+    String[] deltaPaths = clusteringOps
+        .stream()
+        .filter(op -> !op.getDeltaFilePaths().isEmpty())
+        .flatMap(op -> op.getDeltaFilePaths().stream())
+        .toArray(String[]::new);
+
+    Dataset<Row> inputRecords;
+    if (hasLogFiles) {
+      String compactionFractor = 
Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction"))
+          .orElse("0.75");
+      String[] paths = new String[baseFilePaths.length + deltaPaths.length];
+      System.arraycopy(baseFilePaths, 0, paths, 0, baseFilePaths.length);
+      System.arraycopy(deltaPaths, 0, paths, baseFilePaths.length, 
deltaPaths.length);

Review Comment:
   You can use `CollectionUtils.combine`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatusCoordinator.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HoodieInternalWriteStatusCoordinator {

Review Comment:
   I appreciate the intent, but this component doesn't really make sense (it's 
essentially a global buffer allowing us to facilitate data flow we can't 
organize otherwise)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.sort.SpaceCurveSortingHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class RowSpatialCurveSortPartitioner extends 
RowCustomColumnsSortPartitioner {
+
+  private final String[] orderByColumns;
+  private final HoodieClusteringConfig.LayoutOptimizationStrategy 
layoutOptStrategy;
+  private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType;
+
+  public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) {
+    super(config);
+    this.layoutOptStrategy = config.getLayoutOptimizationStrategy();
+    if (config.getClusteringSortColumns() != null) {
+      this.orderByColumns = 
Arrays.stream(config.getClusteringSortColumns().split(","))
+          .map(String::trim).toArray(String[]::new);
+    } else {
+      this.orderByColumns = getSortColumnNames();
+    }
+    this.curveCompositionStrategyType = 
config.getLayoutOptimizationCurveBuildMethod();
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputPartitions) {
+    return reorder(records, outputPartitions);

Review Comment:
   We need to separate out handling of partitioned table (for partitioned 
tables there's no point of ordering the records _across_ partitions, we should 
be ordering only w/in respective partitions; take a look at 
`RowCustomColumnsSortPartitioner` for an example)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java:
##########
@@ -153,13 +157,22 @@ private void writeRow(InternalRow row) {
       //          over again)
       UTF8String recordKey = 
row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
       UTF8String partitionPath = 
row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
-      // This is the only meta-field that is generated dynamically, hence 
conversion b/w
-      // [[String]] and [[UTF8String]] is unavoidable
-      UTF8String seqId = 
UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
-
-      InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId, 
recordKey,
-          partitionPath, fileName, row, true);
 
+      InternalRow updatedRow;
+      if (preserveMetadata) {
+        updatedRow = new 
HoodieInternalRow(row.getUTF8String(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD),

Review Comment:
   You can reduce conditional to only the portion that differs (seqNo, 
commitTime)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -131,6 +161,53 @@ public abstract HoodieData<WriteStatus> 
performClusteringWithRecordsRDD(final Ho
                                                                        final 
Map<String, String> strategyParams, final Schema schema,
                                                                        final 
List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
 
+  protected HoodieData<WriteStatus> performRowWrite(Dataset<Row> inputRecords, 
Map<String, String> parameters) {
+    String uuid = UUID.randomUUID().toString();
+    parameters.put(HoodieWriteConfig.BULKINSERT_ROW_IDENTIFY_ID.key(), uuid);
+    try {
+      inputRecords.write()
+          .format("hudi")
+          .options(JavaConverters.mapAsScalaMapConverter(parameters).asScala())
+          .mode(SaveMode.Append)
+          .save(getWriteConfig().getBasePath());

Review Comment:
   We shouldn't be using DataSource for writing in this case:
   
   First of all, we're violating the layering of the integration -- Strategy is 
an internal component of Spark DataSource integration, and as such should not 
be referencing component that encompasses it (DS). Rule of thumb is usually 
that component can interface with other components w/in the same or adjacent 
layers.
   
   On top of that, it's actually not strictly necessary -- since we're we're 
trying to bulk-insert back (while reshaping its layout) only the data that was 
already persisted and NOT a new data, we can bypass all of the handling that 
occurs in Spark DS and write the data directly using 
`HoodieBulkInsertDataInternalWriter`. If you would take a look at the task that 
is actually doing the writing on the Spark side 
([WriteToDataSourceV2Exec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L418))
 it's actually very simple and most of the complications stem from the need to 
commit the results of the operations, which aren't relevant to us in this case.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -138,6 +138,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.9.0")
       .withDocumentation("Config to control frequency of async clustering");
 
+  public static final ConfigProperty<Boolean> CLUSTERING_AS_ROW = 
ConfigProperty

Review Comment:
   I think we should reuse existing config 
`hoodie.datasource.write.row.writer.enable` to control whether we go down 
row-writing path or not to avoid users confusion (since original config is 
generic)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to