alexeykudinkin commented on code in PR #6046: URL: https://github.com/apache/hudi/pull/6046#discussion_r951930671
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -98,10 +110,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood // execute clustering for each group async and collect WriteStatus Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() - .map(inputGroup -> runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), - instantTime)) + .map(inputGroup -> { + if (Boolean.parseBoolean(getWriteConfig().getString(HoodieClusteringConfig.CLUSTERING_AS_ROW))) { Review Comment: Let's abstract this as a method in `WriteConfig` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public class RowSpatialCurveSortPartitioner extends RowCustomColumnsSortPartitioner { Review Comment: Why do we inherit from `RowCustomColumnsSortPartitioner` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -273,6 +398,62 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex .map(record -> transform(record, writeConfig))); } + /** + * Get dataset of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, + HoodieClusteringGroup clusteringGroup, + String instantTime) { + List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream() + .map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + SQLContext sqlContext = new SQLContext(jsc.sc()); + + String[] baseFilePaths = clusteringOps + .stream() + .map(op -> { + ArrayList<String> pairs = new ArrayList<>(); + if (op.getBootstrapFilePath() != null) { + pairs.add(op.getBootstrapFilePath()); + } + if (op.getDataFilePath() != null) { + pairs.add(op.getDataFilePath()); + } + return pairs; + }) + .flatMap(Collection::stream) + .filter(path -> !path.isEmpty()) + .toArray(String[]::new); + String[] deltaPaths = clusteringOps + .stream() + .filter(op -> !op.getDeltaFilePaths().isEmpty()) + .flatMap(op -> op.getDeltaFilePaths().stream()) + .toArray(String[]::new); + + Dataset<Row> inputRecords; + if (hasLogFiles) { + String compactionFractor = Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction")) + .orElse("0.75"); + String[] paths = new String[baseFilePaths.length + deltaPaths.length]; + System.arraycopy(baseFilePaths, 0, paths, 0, baseFilePaths.length); + System.arraycopy(deltaPaths, 0, paths, baseFilePaths.length, deltaPaths.length); Review Comment: You can use `CollectionUtils.combine` ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatusCoordinator.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class HoodieInternalWriteStatusCoordinator { Review Comment: I appreciate the intent, but this component doesn't really make sense (it's essentially a global buffer allowing us to facilitate data flow we can't organize otherwise) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public class RowSpatialCurveSortPartitioner extends RowCustomColumnsSortPartitioner { + + private final String[] orderByColumns; + private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; + private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + + public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) { + super(config); + this.layoutOptStrategy = config.getLayoutOptimizationStrategy(); + if (config.getClusteringSortColumns() != null) { + this.orderByColumns = Arrays.stream(config.getClusteringSortColumns().split(",")) + .map(String::trim).toArray(String[]::new); + } else { + this.orderByColumns = getSortColumnNames(); + } + this.curveCompositionStrategyType = config.getLayoutOptimizationCurveBuildMethod(); + } + + @Override + public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) { + return reorder(records, outputPartitions); Review Comment: We need to separate out handling of partitioned table (for partitioned tables there's no point of ordering the records _across_ partitions, we should be ordering only w/in respective partitions; take a look at `RowCustomColumnsSortPartitioner` for an example) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java: ########## @@ -153,13 +157,22 @@ private void writeRow(InternalRow row) { // over again) UTF8String recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD); UTF8String partitionPath = row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD); - // This is the only meta-field that is generated dynamically, hence conversion b/w - // [[String]] and [[UTF8String]] is unavoidable - UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement())); - - InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey, - partitionPath, fileName, row, true); + InternalRow updatedRow; + if (preserveMetadata) { + updatedRow = new HoodieInternalRow(row.getUTF8String(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD), Review Comment: You can reduce conditional to only the portion that differs (seqNo, commitTime) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -131,6 +161,53 @@ public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final Ho final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata); + protected HoodieData<WriteStatus> performRowWrite(Dataset<Row> inputRecords, Map<String, String> parameters) { + String uuid = UUID.randomUUID().toString(); + parameters.put(HoodieWriteConfig.BULKINSERT_ROW_IDENTIFY_ID.key(), uuid); + try { + inputRecords.write() + .format("hudi") + .options(JavaConverters.mapAsScalaMapConverter(parameters).asScala()) + .mode(SaveMode.Append) + .save(getWriteConfig().getBasePath()); Review Comment: We shouldn't be using DataSource for writing in this case: First of all, we're violating the layering of the integration -- Strategy is an internal component of Spark DataSource integration, and as such should not be referencing component that encompasses it (DS). Rule of thumb is usually that component can interface with other components w/in the same or adjacent layers. On top of that, it's actually not strictly necessary -- since we're we're trying to bulk-insert back (while reshaping its layout) only the data that was already persisted and NOT a new data, we can bypass all of the handling that occurs in Spark DS and write the data directly using `HoodieBulkInsertDataInternalWriter`. If you would take a look at the task that is actually doing the writing on the Spark side ([WriteToDataSourceV2Exec](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala#L418)) it's actually very simple and most of the complications stem from the need to commit the results of the operations, which aren't relevant to us in this case. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java: ########## @@ -138,6 +138,13 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("Config to control frequency of async clustering"); + public static final ConfigProperty<Boolean> CLUSTERING_AS_ROW = ConfigProperty Review Comment: I think we should reuse existing config `hoodie.datasource.write.row.writer.enable` to control whether we go down row-writing path or not to avoid users confusion (since original config is generic) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org