alexeykudinkin commented on code in PR #6046: URL: https://github.com/apache/hudi/pull/6046#discussion_r969043938
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -113,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood return writeMetadata; } + /** + * Execute clustering to write inputRecords into new files based on strategyParams. + * Different from {@link performClusteringWithRecordsRDD}, this method take {@link Dataset<Row>} + * as inputs. + */ + public abstract HoodieData<WriteStatus> performClusteringWithRecordsRow(final Dataset<Row> inputRecords, final int numOutputGroups, final String instantTime, Review Comment: This style of wrapping (while acceptable under recognized code style-guide) makes it quite hard to read on laptop screen: <img width="822" alt="Screen Shot 2022-09-12 at 5 48 53 PM" src="https://user-images.githubusercontent.com/428277/189783507-51e4138e-e9fa-48ba-8b54-a798a31c200c.png"> ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public class RowSpatialCurveSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> { + + private final String[] orderByColumns; Review Comment: Let's extract common base class `SpatialCurveSortPartitionerBase` so that we can reuse as much code as possible and avoid duplication ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -98,10 +106,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood // execute clustering for each group async and collect WriteStatus Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() - .map(inputGroup -> runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), - instantTime)) + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { + return runClusteringForGroupAsyncWithRow(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), Review Comment: Please extract common expression (to `shouldPreserveMetadata`) ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ########## @@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() { } public boolean isEmbeddedTimelineServerReuseEnabled() { - return Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED)); + return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED); Review Comment: We need to do `getBooleanOrDefault`, otherwise it might NPE (due to unboxing) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertRowWriter.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.util.Iterator; +import java.util.List; + +public class SparkBulkInsertRowWriter { + + /** + * Perform bulk insert for {@link Dataset<Row>}, will not change timeline/index, return + * information about write files. + */ + public static HoodieData<WriteStatus> bulkInsert(Dataset<Row> dataset, + String instantTime, + HoodieTable table, + HoodieWriteConfig writeConfig, + BulkInsertPartitioner<Dataset<Row>> partitioner, + int parallelism, + boolean preserveHoodieMetadata) { + Dataset<Row> repartitionedDataset = partitioner.repartitionRecords(dataset, parallelism); + + boolean arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted(); + StructType schema = dataset.schema(); + List<WriteStatus> writeStatuses = repartitionedDataset.queryExecution().toRdd().toJavaRDD().mapPartitions( + (FlatMapFunction<Iterator<InternalRow>, WriteStatus>) rowIterator -> { + TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier(); + int taskPartitionId = taskContextSupplier.getPartitionIdSupplier().get(); + long taskId = taskContextSupplier.getStageIdSupplier().get(); + long taskEpochId = taskContextSupplier.getAttemptIdSupplier().get(); + + final BulkInsertDataInternalWriterHelper writer = + new BulkInsertDataInternalWriterHelper(table, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, + schema, writeConfig.populateMetaFields(), arePartitionRecordsSorted, preserveHoodieMetadata); + while (rowIterator.hasNext()) { + writer.write(rowIterator.next()); + } + return writer.getWriteStatuses() + .stream() + .map(internalWriteStatus -> { + WriteStatus status = new WriteStatus( + internalWriteStatus.isTrackSuccessRecords(), internalWriteStatus.getFailureFraction()); + status.setFileId(internalWriteStatus.getFileId()); + status.setTotalRecords(internalWriteStatus.getTotalRecords()); + status.setPartitionPath(internalWriteStatus.getPartitionPath()); + status.setStat(internalWriteStatus.getStat()); + return status; + }).iterator(); + }).collect(); Review Comment: @nsivabalan FYI: this is what we've talked about last week -- we dereference RDD/Dataframe at a well-defined point in the workflow and the convert to a list of `WriteStatus`s ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java: ########## @@ -52,6 +55,27 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table, super(table, engineContext, writeConfig); } + @Override + public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> inputRecords, int numOutputGroups, + String instantTime, Map<String, String> strategyParams, Schema schema, + List<HoodieFileGroupId> fileGroupIdList, boolean preserveHoodieMetadata) { + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withBulkInsertParallelism(numOutputGroups) + .withProps(getWriteConfig().getProps()).build(); + + boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata; Review Comment: Why do we need to override this here? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java: ########## @@ -54,6 +57,35 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable table, super(table, engineContext, writeConfig); } + @Override + public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> inputRecords, + int numOutputGroups, + String instantTime, + Map<String, String> strategyParams, + Schema schema, + List<HoodieFileGroupId> fileGroupIdList, + boolean preserveHoodieMetadata) { + if (numOutputGroups != 1 || fileGroupIdList.size() != 1) { + throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName()); + } + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withBulkInsertParallelism(numOutputGroups) + .withProps(getWriteConfig().getProps()).build(); + + boolean shouldPreserveHoodieMetadata = preserveHoodieMetadata; + if (!newConfig.populateMetaFields() && preserveHoodieMetadata) { + LOG.warn("Will setting preserveHoodieMetadata to false as populateMetaFields is false"); + shouldPreserveHoodieMetadata = false; + } + + newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE)); Review Comment: Let's duplicate the comment from the original method as well ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -98,10 +106,18 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood // execute clustering for each group async and collect WriteStatus Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() - .map(inputGroup -> runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), - instantTime)) + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { Review Comment: Let's abstract this as a method in WriteConfig ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertRowWriter.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.util.Iterator; +import java.util.List; + +public class SparkBulkInsertRowWriter { Review Comment: Let's consolidate this one w/ `HoodieDatasetBulkInsertHelper` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java: ########## @@ -118,7 +126,8 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo private Option<BuiltinKeyGenerator> getKeyGenerator(Properties properties) { TypedProperties typedProperties = new TypedProperties(); typedProperties.putAll(properties); - if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()).equals(NonpartitionedKeyGenerator.class.getName())) { + if (Option.ofNullable(properties.get(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key())) Review Comment: Why are we not instantiating Non-partitioned KG? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java: ########## @@ -70,6 +70,8 @@ public class HoodieRowCreateHandle implements Serializable { private final UTF8String commitTime; private final Function<Long, String> seqIdGenerator; + private final boolean preserveHoodieMetadata; Review Comment: nit: `shouldPreserveHoodieMetadata` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<Str switch (layoutOptStrategy) { case ZORDER: case HILBERT: - return new RDDSpatialCurveSortPartitioner( + return isRowPartitioner + ? new RowSpatialCurveSortPartitioner(getWriteConfig()) + : new RDDSpatialCurveSortPartitioner( (HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy, getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema)); case LINEAR: - return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), + return isRowPartitioner + ? new RowCustomColumnsSortPartitioner(orderByColumns) + : new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()); default: throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } - }).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode())); + }).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig().getBulkInsertSortMode()) : + BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode())); } /** - * Submit job to execute clustering for the group. + * Submit job to execute clustering for the group with RDD APIs. */ - private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, - boolean preserveHoodieMetadata, String instantTime) { + private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncWithRDD(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams, Review Comment: Let's stay consistent with the rest of the codebase how we identify row or avro workflows: - Avro/HoodieRecord keep their existing names - Row counterparts get `AsRow` suffix In this case suggest going w/ `runClusteringForGroupAsync` for existing one and `runClusteringForGroupAsyncAsRow` for the new one. WDYT? ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public class RowSpatialCurveSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> { + + private final String[] orderByColumns; + private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; + private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + + public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) { + this.layoutOptStrategy = config.getLayoutOptimizationStrategy(); + if (config.getClusteringSortColumns() != null) { + this.orderByColumns = Arrays.stream(config.getClusteringSortColumns().split(",")) + .map(String::trim).toArray(String[]::new); + } else { + throw new IllegalArgumentException("The config " + + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() + " must be provided"); + } + this.curveCompositionStrategyType = config.getLayoutOptimizationCurveBuildMethod(); + } + + @Override + public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) { + return reorder(records, outputPartitions); + } + + private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) { Review Comment: This method for ex, is the same for both Avro/Row impls. We should certainly reuse it (instead of duplicating) ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<Str switch (layoutOptStrategy) { case ZORDER: case HILBERT: - return new RDDSpatialCurveSortPartitioner( + return isRowPartitioner + ? new RowSpatialCurveSortPartitioner(getWriteConfig()) + : new RDDSpatialCurveSortPartitioner( (HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy, getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema)); case LINEAR: - return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), + return isRowPartitioner + ? new RowCustomColumnsSortPartitioner(orderByColumns) + : new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()); Review Comment: Same as above ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -148,29 +184,34 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<Str switch (layoutOptStrategy) { case ZORDER: case HILBERT: - return new RDDSpatialCurveSortPartitioner( + return isRowPartitioner + ? new RowSpatialCurveSortPartitioner(getWriteConfig()) + : new RDDSpatialCurveSortPartitioner( (HoodieSparkEngineContext) getEngineContext(), Review Comment: Please fix alignment ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java: ########## @@ -52,6 +55,27 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table, super(table, engineContext, writeConfig); } + @Override + public HoodieData<WriteStatus> performClusteringWithRecordsRow(Dataset<Row> inputRecords, int numOutputGroups, Review Comment: Let's create a Jira to implement `HoodieData` wrapping around `DataFrame` (you can assign one to myself) In that case we won't need to duplicate these methods and will simply be able to parameterize them. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -273,6 +330,60 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex .map(record -> transform(record, writeConfig))); } + /** + * Get dataset of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, + HoodieClusteringGroup clusteringGroup, + String instantTime) { + List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream() + .map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + SQLContext sqlContext = new SQLContext(jsc.sc()); + + String[] baseFilePaths = clusteringOps + .stream() + .map(op -> { + ArrayList<String> readPaths = new ArrayList<>(); + if (op.getBootstrapFilePath() != null) { + readPaths.add(op.getBootstrapFilePath()); + } + if (op.getDataFilePath() != null) { + readPaths.add(op.getDataFilePath()); + } + return readPaths; + }) + .flatMap(Collection::stream) + .filter(path -> !path.isEmpty()) + .toArray(String[]::new); + String[] deltaPaths = clusteringOps + .stream() + .filter(op -> !op.getDeltaFilePaths().isEmpty()) + .flatMap(op -> op.getDeltaFilePaths().stream()) + .toArray(String[]::new); + + Dataset<Row> inputRecords; + if (hasLogFiles) { + String compactionFractor = Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction")) + .orElse("0.75"); + String[] paths = CollectionUtils.combine(baseFilePaths, deltaPaths); + inputRecords = sqlContext.read() Review Comment: Same feedback as with the write-path: we can't use Spark DataSource in here for mostly the same reasons -- it violates layering and could lead to subtle bugs. Instead let's extract following portion of the `createRelation` method and reuse it directly here: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala#L125 Like following: ``` val relation = DefaultSource.createRelation(...) val df = sparkSession.baseRelationToDataFrame(relation) ``` ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -273,6 +330,60 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContex .map(record -> transform(record, writeConfig))); } + /** + * Get dataset of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, + HoodieClusteringGroup clusteringGroup, + String instantTime) { + List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream() + .map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + SQLContext sqlContext = new SQLContext(jsc.sc()); + + String[] baseFilePaths = clusteringOps + .stream() + .map(op -> { + ArrayList<String> readPaths = new ArrayList<>(); + if (op.getBootstrapFilePath() != null) { + readPaths.add(op.getBootstrapFilePath()); + } + if (op.getDataFilePath() != null) { + readPaths.add(op.getDataFilePath()); + } + return readPaths; + }) + .flatMap(Collection::stream) + .filter(path -> !path.isEmpty()) + .toArray(String[]::new); + String[] deltaPaths = clusteringOps + .stream() + .filter(op -> !op.getDeltaFilePaths().isEmpty()) + .flatMap(op -> op.getDeltaFilePaths().stream()) + .toArray(String[]::new); + + Dataset<Row> inputRecords; + if (hasLogFiles) { + String compactionFractor = Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction")) Review Comment: The only part that differs b/w these branches are the options composition. Let's extract the common part and only keep options composition under conditional ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public class RowSpatialCurveSortPartitioner extends RowCustomColumnsSortPartitioner { + + private final String[] orderByColumns; + private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; + private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + + public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) { + super(config); + this.layoutOptStrategy = config.getLayoutOptimizationStrategy(); + if (config.getClusteringSortColumns() != null) { + this.orderByColumns = Arrays.stream(config.getClusteringSortColumns().split(",")) + .map(String::trim).toArray(String[]::new); + } else { + this.orderByColumns = getSortColumnNames(); + } + this.curveCompositionStrategyType = config.getLayoutOptimizationCurveBuildMethod(); + } + + @Override + public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) { + return reorder(records, outputPartitions); Review Comment: SG ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java: ########## @@ -113,6 +129,15 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood return writeMetadata; } + /** + * Execute clustering to write inputRecords into new files based on strategyParams. + * Different from {@link performClusteringWithRecordsRDD}, this method take {@link Dataset<Row>} + * as inputs. + */ + public abstract HoodieData<WriteStatus> performClusteringWithRecordsRow(final Dataset<Row> inputRecords, final int numOutputGroups, final String instantTime, Review Comment: Stacked one for comparison: <img width="710" alt="Screen Shot 2022-09-12 at 5 50 34 PM" src="https://user-images.githubusercontent.com/428277/189783700-35472b54-224c-4bd1-8f39-42952d1b5bf4.png"> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org