This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ccd70a7 [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert (#1149) ccd70a7 is described below commit ccd70a7e486850ca6ce4d7fcc806e396ff0b6aa0 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Fri Jul 31 06:52:42 2020 -0700 [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert (#1149) * [HUDI-472] Introduce the configuration and new modes of record sorting for bulk_insert(#1149). Three sorting modes are implemented: global sort ("global_sort"), local sort inside each RDD partition ("partition_sort") and no sort ("none") --- .../org/apache/hudi/client/HoodieWriteClient.java | 14 +-- .../org/apache/hudi/config/HoodieWriteConfig.java | 16 +++ .../hudi/execution/CopyOnWriteInsertHandler.java | 116 ++++++++++++++++++++ .../apache/hudi/execution/LazyInsertIterable.java | 72 +++--------- .../BulkInsertInternalPartitionerFactory.java | 48 ++++++++ .../{ => bulkinsert}/BulkInsertMapFunction.java | 14 ++- .../bulkinsert/GlobalSortPartitioner.java | 57 ++++++++++ .../bulkinsert/NonSortPartitioner.java} | 24 +++- .../bulkinsert/RDDPartitionSortPartitioner.java | 69 ++++++++++++ ...Partitioner.java => BulkInsertPartitioner.java} | 14 ++- .../apache/hudi/table/HoodieCopyOnWriteTable.java | 8 +- .../apache/hudi/table/HoodieMergeOnReadTable.java | 8 +- .../java/org/apache/hudi/table/HoodieTable.java | 4 +- .../commit/BulkInsertCommitActionExecutor.java | 10 +- .../hudi/table/action/commit/BulkInsertHelper.java | 26 ++--- .../BulkInsertPreppedCommitActionExecutor.java | 10 +- .../BulkInsertDeltaCommitActionExecutor.java | 6 +- ...BulkInsertPreppedDeltaCommitActionExecutor.java | 6 +- .../deltacommit/DeltaCommitActionExecutor.java | 8 +- .../TestBulkInsertInternalPartitioner.java | 121 +++++++++++++++++++++ .../commit/TestCopyOnWriteActionExecutor.java | 72 +++++++++--- .../main/java/org/apache/hudi/DataSourceUtils.java | 8 +- .../apache/hudi/testutils/DataSourceTestUtils.java | 9 +- 23 files changed, 598 insertions(+), 142 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 2486d91..b2ad315 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -48,7 +48,7 @@ import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.MarkerFiles; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.action.savepoint.SavepointHelpers; @@ -247,21 +247,21 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See - * {@link UserDefinedBulkInsertPartitioner}. + * {@link BulkInsertPartitioner}. * * @param records HoodieRecords to insert * @param instantTime Instant time of the commit - * @param bulkInsertPartitioner If specified then it will be used to partition input records before they are inserted + * @param userDefinedBulkInsertPartitioner If specified then it will be used to partition input records before they are inserted * into hoodie. * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String instantTime, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT); this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); - HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, bulkInsertPartitioner); + HoodieWriteMetadata result = table.bulkInsert(jsc,instantTime, records, userDefinedBulkInsertPartitioner); return postWrite(result, instantTime, table); } @@ -273,7 +273,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * the numbers of files with less memory compared to the {@link HoodieWriteClient#insert(JavaRDD, String)}. Optionally * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See - * {@link UserDefinedBulkInsertPartitioner}. + * {@link BulkInsertPartitioner}. * * @param preppedRecords HoodieRecords to insert * @param instantTime Instant time of the commit @@ -282,7 +282,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String instantTime, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> bulkInsertPartitioner) { HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED); table.validateInsertSchema(); setOperationType(WriteOperationType.BULK_INSERT_PREPPED); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index d51832d..9aecdf7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; @@ -87,6 +88,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; + public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT + .toString(); public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; @@ -251,6 +255,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP)); } + public BulkInsertSortMode getBulkInsertSortMode() { + String sortMode = props.getProperty(BULKINSERT_SORT_MODE); + return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); + } + /** * compaction properties. */ @@ -826,6 +835,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withBulkInsertSortMode(String mode) { + props.setProperty(BULKINSERT_SORT_MODE, mode); + return this; + } + public Builder withAllowMultiWriteOnSameInstant(boolean allow) { props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow)); return this; @@ -871,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP), FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED); setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); + setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE), + BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java new file mode 100644 index 0000000..ae61d80 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.table.HoodieTable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles. + */ +public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload> + extends BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> { + + private HoodieWriteConfig config; + private String instantTime; + private boolean areRecordsSorted; + private HoodieTable<T> hoodieTable; + private String idPrefix; + private SparkTaskContextSupplier sparkTaskContextSupplier; + private WriteHandleFactory<T> writeHandleFactory; + + private final List<WriteStatus> statuses = new ArrayList<>(); + // Stores the open HoodieWriteHandle for each table partition path + // If the records are consumed in order, there should be only one open handle in this mapping. + // Otherwise, there may be multiple handles. + private Map<String, HoodieWriteHandle> handles = new HashMap<>(); + + public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime, + boolean areRecordsSorted, HoodieTable<T> hoodieTable, String idPrefix, + SparkTaskContextSupplier sparkTaskContextSupplier, + WriteHandleFactory<T> writeHandleFactory) { + this.config = config; + this.instantTime = instantTime; + this.areRecordsSorted = areRecordsSorted; + this.hoodieTable = hoodieTable; + this.idPrefix = idPrefix; + this.sparkTaskContextSupplier = sparkTaskContextSupplier; + this.writeHandleFactory = writeHandleFactory; + } + + @Override + public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) { + final HoodieRecord insertPayload = payload.record; + String partitionPath = insertPayload.getPartitionPath(); + HoodieWriteHandle handle = handles.get(partitionPath); + if (handle == null) { + // If the records are sorted, this means that we encounter a new partition path + // and the records for the previous partition path are all written, + // so we can safely closely existing open handle to reduce memory footprint. + if (areRecordsSorted) { + closeOpenHandles(); + } + // Lazily initialize the handle, for the first time + handle = writeHandleFactory.create(config, instantTime, hoodieTable, + insertPayload.getPartitionPath(), idPrefix, sparkTaskContextSupplier); + handles.put(partitionPath, handle); + } + + if (!handle.canWrite(payload.record)) { + // Handle is full. Close the handle and add the WriteStatus + statuses.add(handle.close()); + // Open new handle + handle = writeHandleFactory.create(config, instantTime, hoodieTable, + insertPayload.getPartitionPath(), idPrefix, sparkTaskContextSupplier); + handles.put(partitionPath, handle); + } + handle.write(insertPayload, payload.insertValue, payload.exception); + } + + @Override + public void finish() { + closeOpenHandles(); + assert statuses.size() > 0; + } + + @Override + public List<WriteStatus> getResult() { + return statuses; + } + + private void closeOpenHandles() { + for (HoodieWriteHandle handle : handles.values()) { + statuses.add(handle.close()); + } + handles.clear(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java index fe0d5c4..572956d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/LazyInsertIterable.java @@ -25,18 +25,15 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; -import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.io.WriteHandleFactory; -import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.function.Function; @@ -49,6 +46,7 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> protected final HoodieWriteConfig hoodieConfig; protected final String instantTime; + protected boolean areRecordsSorted; protected final HoodieTable<T> hoodieTable; protected final String idPrefix; protected SparkTaskContextSupplier sparkTaskContextSupplier; @@ -57,15 +55,22 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, String idPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) { - this(sortedRecordItr, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier, + this(sortedRecordItr, true, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier); + } + + public LazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted, + HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, + String idPrefix, SparkTaskContextSupplier sparkTaskContextSupplier) { + this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, sparkTaskContextSupplier, new CreateHandleFactory<>()); } - public LazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config, - String instantTime, HoodieTable<T> hoodieTable, String idPrefix, - SparkTaskContextSupplier sparkTaskContextSupplier, + public LazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean areRecordsSorted, + HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, + String idPrefix, SparkTaskContextSupplier sparkTaskContextSupplier, WriteHandleFactory<T> writeHandleFactory) { - super(sortedRecordItr); + super(recordItr); + this.areRecordsSorted = areRecordsSorted; this.hoodieConfig = config; this.instantTime = instantTime; this.hoodieTable = hoodieTable; @@ -75,7 +80,7 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> } // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread. - static class HoodieInsertValueGenResult<T extends HoodieRecord> { + public static class HoodieInsertValueGenResult<T extends HoodieRecord> { public T record; public Option<IndexedRecord> insertValue; // It caches the exception seen while fetching insert value. @@ -128,52 +133,7 @@ public class LazyInsertIterable<T extends HoodieRecordPayload> protected void end() {} protected CopyOnWriteInsertHandler getInsertHandler() { - return new CopyOnWriteInsertHandler(); - } - - /** - * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles. - */ - protected class CopyOnWriteInsertHandler - extends BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> { - - protected final List<WriteStatus> statuses = new ArrayList<>(); - protected HoodieWriteHandle handle; - - @Override - protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) { - final HoodieRecord insertPayload = payload.record; - // lazily initialize the handle, for the first time - if (handle == null) { - handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - idPrefix, sparkTaskContextSupplier); - } - - if (handle.canWrite(payload.record)) { - // write the payload, if the handle has capacity - handle.write(insertPayload, payload.insertValue, payload.exception); - } else { - // handle is full. - statuses.add(handle.close()); - // Need to handle the rejected payload & open new handle - handle = writeHandleFactory.create(hoodieConfig, instantTime, hoodieTable, insertPayload.getPartitionPath(), - idPrefix, sparkTaskContextSupplier); - handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload. - } - } - - @Override - protected void finish() { - if (handle != null) { - statuses.add(handle.close()); - } - handle = null; - assert statuses.size() > 0; - } - - @Override - protected List<WriteStatus> getResult() { - return statuses; - } + return new CopyOnWriteInsertHandler(hoodieConfig, instantTime, areRecordsSorted, hoodieTable, idPrefix, + sparkTaskContextSupplier, writeHandleFactory); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java new file mode 100644 index 0000000..ef4ffb6 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.BulkInsertPartitioner; + +/** + * A factory to generate built-in partitioner to repartition input records into at least + * expected number of output spark partitions for bulk insert operation. + */ +public abstract class BulkInsertInternalPartitionerFactory { + + public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) { + switch (sortMode) { + case NONE: + return new NonSortPartitioner(); + case GLOBAL_SORT: + return new GlobalSortPartitioner(); + case PARTITION_SORT: + return new RDDPartitionSortPartitioner(); + default: + throw new HoodieException("The bulk insert mode \"" + sortMode.name() + "\" is not supported."); + } + } + + public enum BulkInsertSortMode { + NONE, + GLOBAL_SORT, + PARTITION_SORT + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java similarity index 77% rename from hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java rename to hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 67c1d75..71c10ed 100644 --- a/hudi-client/src/main/java/org/apache/hudi/execution/BulkInsertMapFunction.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.hudi.execution; +package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.LazyInsertIterable; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -30,27 +31,30 @@ import java.util.Iterator; import java.util.List; /** - * Map function that handles a sorted stream of HoodieRecords. + * Map function that handles a stream of HoodieRecords. */ public class BulkInsertMapFunction<T extends HoodieRecordPayload> implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>> { private String instantTime; + private boolean areRecordsSorted; private HoodieWriteConfig config; private HoodieTable<T> hoodieTable; private List<String> fileIDPrefixes; - public BulkInsertMapFunction(String instantTime, HoodieWriteConfig config, HoodieTable<T> hoodieTable, + public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, + HoodieWriteConfig config, HoodieTable<T> hoodieTable, List<String> fileIDPrefixes) { this.instantTime = instantTime; + this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; this.fileIDPrefixes = fileIDPrefixes; } @Override - public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) { - return new LazyInsertIterable<>(sortedRecordItr, config, instantTime, hoodieTable, + public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) { + return new LazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, fileIDPrefixes.get(partition), hoodieTable.getSparkTaskContextSupplier()); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java new file mode 100644 index 0000000..3f4077a --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.api.java.JavaRDD; + +/** + * A built-in partitioner that does global sorting for the input records across partitions + * after repartition for bulk insert operation, corresponding to the + * {@code BulkInsertSortMode.GLOBAL_SORT} mode. + * + * @param <T> HoodieRecordPayload type + */ +public class GlobalSortPartitioner<T extends HoodieRecordPayload> + implements BulkInsertPartitioner<T> { + + @Override + public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, + int outputSparkPartitions) { + // Now, sort the records and line them up nicely for loading. + return records.sortBy(record -> { + // Let's use "partitionPath + key" as the sort key. Spark, will ensure + // the records split evenly across RDD partitions, such that small partitions fit + // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions + return new StringBuilder() + .append(record.getPartitionPath()) + .append("+") + .append(record.getRecordKey()) + .toString(); + }, true, outputSparkPartitions); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java similarity index 57% copy from hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java copy to hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 411918a..571b8aa 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -16,19 +16,31 @@ * limitations under the License. */ -package org.apache.hudi.table; +package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; /** - * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - - * Output spark partition will have records from only one hoodie partition. - Average records per output spark - * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. + * A built-in partitioner that only does coalesce for input records for bulk insert operation, + * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * + * @param <T> HoodieRecordPayload type */ -public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> { +public class NonSortPartitioner<T extends HoodieRecordPayload> + implements BulkInsertPartitioner<T> { + + @Override + public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, + int outputSparkPartitions) { + return records.coalesce(outputSparkPartitions); + } - JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions); + @Override + public boolean arePartitionRecordsSorted() { + return false; + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java new file mode 100644 index 0000000..9fc91a4 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import scala.Tuple2; + +/** + * A built-in partitioner that does local sorting for each RDD partition + * after coalesce for bulk insert operation, corresponding to the + * {@code BulkInsertSortMode.PARTITION_SORT} mode. + * + * @param <T> HoodieRecordPayload type + */ +public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload> + implements BulkInsertPartitioner<T> { + + @Override + public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, + int outputSparkPartitions) { + return records.coalesce(outputSparkPartitions) + .mapToPair(record -> + new Tuple2<>( + new StringBuilder() + .append(record.getPartitionPath()) + .append("+") + .append(record.getRecordKey()) + .toString(), record)) + .mapPartitions(partition -> { + // Sort locally in partition + List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>(); + for (; partition.hasNext(); ) { + recordList.add(partition.next()); + } + Collections.sort(recordList, (o1, o2) -> o1._1.compareTo(o2._1)); + return recordList.stream().map(e -> e._2).iterator(); + }); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java b/hudi-client/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java similarity index 74% rename from hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java rename to hudi-client/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 411918a..53aee2f 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -28,7 +28,19 @@ import org.apache.spark.api.java.JavaRDD; * Output spark partition will have records from only one hoodie partition. - Average records per output spark * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. */ -public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> { +public interface BulkInsertPartitioner<T extends HoodieRecordPayload> { + /** + * Repartitions the input records into at least expected number of output spark partitions. + * + * @param records Input Hoodie records in RDD + * @param outputSparkPartitions Expected number of output RDD partitions + * @return + */ JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions); + + /** + * @return {@code true} if the records within a RDD partition are sorted; {@code false} otherwise. + */ + boolean arePartitionRecordsSorted(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java index 9f3bb82..849673e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java @@ -96,9 +96,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi @Override public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { return new BulkInsertCommitActionExecutor<>(jsc, config, - this, instantTime, records, bulkInsertPartitioner).execute(); + this, instantTime, records, userDefinedBulkInsertPartitioner).execute(); } @Override @@ -120,9 +120,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi @Override public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime, - JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { return new BulkInsertPreppedCommitActionExecutor<>(jsc, config, - this, instantTime, preppedRecords, bulkInsertPartitioner).execute(); + this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute(); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index 8496ea4..785efa5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -83,9 +83,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi @Override public HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, JavaRDD<HoodieRecord<T>> records, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { return new BulkInsertDeltaCommitActionExecutor<>(jsc, config, - this, instantTime, records, bulkInsertPartitioner).execute(); + this, instantTime, records, userDefinedBulkInsertPartitioner).execute(); } @Override @@ -107,9 +107,9 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi @Override public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime, - JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { return new BulkInsertPreppedDeltaCommitActionExecutor<>(jsc, config, - this, instantTime, preppedRecords, bulkInsertPartitioner).execute(); + this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute(); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 14dd168..748091e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -159,7 +159,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri * @return HoodieWriteMetadata */ public abstract HoodieWriteMetadata bulkInsert(JavaSparkContext jsc, String instantTime, - JavaRDD<HoodieRecord<T>> records, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner); + JavaRDD<HoodieRecord<T>> records, Option<BulkInsertPartitioner> bulkInsertPartitioner); /** * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be @@ -207,7 +207,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri * @return HoodieWriteMetadata */ public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime, - JavaRDD<HoodieRecord<T>> preppedRecords, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner); + JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner); public HoodieWriteConfig getConfig() { return config; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java index 9f5468e..ee93f06 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertCommitActionExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; @@ -35,22 +35,22 @@ public class BulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends CommitActionExecutor<T> { private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; - private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner; + private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner; public BulkInsertCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT); this.inputRecordsRDD = inputRecordsRDD; - this.bulkInsertPartitioner = bulkInsertPartitioner; + this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; } @Override public HoodieWriteMetadata execute() { try { return BulkInsertHelper.bulkInsert(inputRecordsRDD, instantTime, (HoodieTable<T>) table, config, - this, true, bulkInsertPartitioner); + this, true, userDefinedBulkInsertPartitioner); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java index 782b9aa..29bedd7 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java @@ -26,10 +26,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.execution.BulkInsertMapFunction; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; +import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; - +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; @@ -43,7 +43,7 @@ public class BulkInsertHelper<T extends HoodieRecordPayload<T>> { JavaRDD<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable<T> table, HoodieWriteConfig config, CommitActionExecutor<T> executor, boolean performDedupe, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { HoodieWriteMetadata result = new HoodieWriteMetadata(); // De-dupe/merge if needed @@ -56,17 +56,10 @@ public class BulkInsertHelper<T extends HoodieRecordPayload<T>> { final JavaRDD<HoodieRecord<T>> repartitionedRecords; final int parallelism = config.getBulkInsertShuffleParallelism(); - if (bulkInsertPartitioner.isPresent()) { - repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism); - } else { - // Now, sort the records and line them up nicely for loading. - repartitionedRecords = dedupedRecords.sortBy(record -> { - // Let's use "partitionPath + key" as the sort key. Spark, will ensure - // the records split evenly across RDD partitions, such that small partitions fit - // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions - return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey()); - }, true, parallelism); - } + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() + ? userDefinedBulkInsertPartitioner.get() + : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); + repartitionedRecords = partitioner.repartitionRecords(dedupedRecords, parallelism); // generate new file ID prefixes for each output partition final List<String> fileIDPrefixes = @@ -77,7 +70,8 @@ public class BulkInsertHelper<T extends HoodieRecordPayload<T>> { config.shouldAllowMultiWriteOnSameInstant()); JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords - .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true) + .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime, + partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes), true) .flatMap(List::iterator); executor.updateIndexAndCommitIfNeeded(writeStatusRDD, result); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java index 3d80a07..0f78481 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertPreppedCommitActionExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; @@ -35,22 +35,22 @@ public class BulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload extends CommitActionExecutor<T> { private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd; - private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner; + private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner; public BulkInsertPreppedCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) { super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT); this.preppedInputRecordRdd = preppedInputRecordRdd; - this.bulkInsertPartitioner = bulkInsertPartitioner; + this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; } @Override public HoodieWriteMetadata execute() { try { return BulkInsertHelper.bulkInsert(preppedInputRecordRdd, instantTime, (HoodieTable<T>) table, config, - this, false, bulkInsertPartitioner); + this, false, userDefinedBulkInsertPartitioner); } catch (Throwable e) { if (e instanceof HoodieInsertException) { throw e; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java index 5e4b915..61eb612 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertDeltaCommitActionExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.commit.BulkInsertHelper; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -36,12 +36,12 @@ public class BulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T extends DeltaCommitActionExecutor<T> { private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; - private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner; + private final Option<BulkInsertPartitioner> bulkInsertPartitioner; public BulkInsertDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> bulkInsertPartitioner) { super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT); this.inputRecordsRDD = inputRecordsRDD; this.bulkInsertPartitioner = bulkInsertPartitioner; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java index 5a3fe7a..7ebd432 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/BulkInsertPreppedDeltaCommitActionExecutor.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.action.commit.BulkInsertHelper; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -36,12 +36,12 @@ public class BulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPa extends DeltaCommitActionExecutor<T> { private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd; - private final Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner; + private final Option<BulkInsertPartitioner> bulkInsertPartitioner; public BulkInsertPreppedDeltaCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable table, String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd, - Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) { + Option<BulkInsertPartitioner> bulkInsertPartitioner) { super(jsc, config, table, instantTime, WriteOperationType.BULK_INSERT); this.preppedInputRecordRdd = preppedInputRecordRdd; this.bulkInsertPartitioner = bulkInsertPartitioner; diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java index 8c24afd..9429069 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/deltacommit/DeltaCommitActionExecutor.java @@ -25,12 +25,12 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.LazyInsertIterable; -import org.apache.hudi.io.HoodieAppendHandle; import org.apache.hudi.io.AppendHandleFactory; +import org.apache.hudi.io.HoodieAppendHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; - import org.apache.hudi.table.action.commit.CommitActionExecutor; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; @@ -85,8 +85,8 @@ public abstract class DeltaCommitActionExecutor<T extends HoodieRecordPayload<T> throws Exception { // If canIndexLogFiles, write inserts to log files else write inserts to base files if (table.getIndex().canIndexLogFiles()) { - return new LazyInsertIterable<>(recordItr, config, instantTime, (HoodieTable<T>)table, idPfx, - sparkTaskContextSupplier, new AppendHandleFactory<>()); + return new LazyInsertIterable<>(recordItr, true, config, instantTime, (HoodieTable<T>) table, + idPfx, sparkTaskContextSupplier, new AppendHandleFactory<>()); } else { return super.handleInsert(idPfx, recordItr); } diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java new file mode 100644 index 0000000..3b9df5f --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { + + public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc) { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + // RDD partition 1 + List<HoodieRecord> records1 = dataGenerator.generateInserts("0", 100); + // RDD partition 2 + List<HoodieRecord> records2 = dataGenerator.generateInserts("0", 150); + return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1)); + } + + public static Map<String, Long> generateExpectedPartitionNumRecords(JavaRDD<HoodieRecord> records) { + return records.map(record -> record.getPartitionPath()).countByValue(); + } + + private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc) + throws Exception { + return generateTestRecordsForBulkInsert(jsc).union(generateTestRecordsForBulkInsert(jsc)) + .union(generateTestRecordsForBulkInsert(jsc)); + } + + private static Stream<Arguments> configParams() { + Object[][] data = new Object[][] { + {BulkInsertInternalPartitionerFactory.BulkInsertSortMode.GLOBAL_SORT, true, true}, + {BulkInsertInternalPartitionerFactory.BulkInsertSortMode.PARTITION_SORT, false, true}, + {BulkInsertInternalPartitionerFactory.BulkInsertSortMode.NONE, false, false} + }; + return Stream.of(data).map(Arguments::of); + } + + private void verifyRecordAscendingOrder(List<HoodieRecord> records) { + List<HoodieRecord> expectedRecords = new ArrayList<>(records); + Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey()))); + assertEquals(expectedRecords, records); + } + + private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, + JavaRDD<HoodieRecord> records, + boolean isGloballySorted, boolean isLocallySorted, + Map<String, Long> expectedPartitionNumRecords) { + int numPartitions = 2; + JavaRDD<HoodieRecord> actualRecords = partitioner.repartitionRecords(records, numPartitions); + assertEquals(numPartitions, actualRecords.getNumPartitions()); + List<HoodieRecord> collectedActualRecords = actualRecords.collect(); + if (isGloballySorted) { + // Verify global order + verifyRecordAscendingOrder(collectedActualRecords); + } else if (isLocallySorted) { + // Verify local order + actualRecords.mapPartitions(partition -> { + List<HoodieRecord> partitionRecords = new ArrayList<>(); + partition.forEachRemaining(partitionRecords::add); + verifyRecordAscendingOrder(partitionRecords); + return Collections.emptyList().iterator(); + }).collect(); + } + + // Verify number of records per partition path + Map<String, Long> actualPartitionNumRecords = new HashMap<>(); + for (HoodieRecord record : collectedActualRecords) { + String partitionPath = record.getPartitionPath(); + actualPartitionNumRecords.put(partitionPath, + actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + 1); + } + assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); + } + + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("configParams") + public void testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.BulkInsertSortMode sortMode, + boolean isGloballySorted, boolean isLocallySorted) + throws Exception { + JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); + JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); + testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode), + records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); + testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode), + records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 4d16e57..7ab1470 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; @@ -52,16 +53,23 @@ import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.TaskContext; +import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords; +import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -290,6 +298,21 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { assertEquals("6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000")); } + private void verifyStatusResult(List<WriteStatus> statuses, Map<String, Long> expectedPartitionNumRecords) { + Map<String, Long> actualPartitionNumRecords = new HashMap<>(); + + for (int i = 0; i < statuses.size(); i++) { + WriteStatus writeStatus = statuses.get(i); + String partitionPath = writeStatus.getPartitionPath(); + actualPartitionNumRecords.put( + partitionPath, + actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + writeStatus.getTotalRecords()); + assertEquals(0, writeStatus.getFailedRecords().size()); + } + + assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); + } + @Test public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); @@ -312,12 +335,10 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { // TODO: check the actual files and make sure 11 records, total were written. assertEquals(2, returnedStatuses.size()); - assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath()); - assertEquals(0, returnedStatuses.get(0).getFailedRecords().size()); - assertEquals(10, returnedStatuses.get(0).getTotalRecords()); - assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath()); - assertEquals(0, returnedStatuses.get(0).getFailedRecords().size()); - assertEquals(1, returnedStatuses.get(1).getTotalRecords()); + Map<String, Long> expectedPartitionNumRecords = new HashMap<>(); + expectedPartitionNumRecords.put("2016/01/31", 10L); + expectedPartitionNumRecords.put("2016/02/01", 1L); + verifyStatusResult(returnedStatuses, expectedPartitionNumRecords); // Case 2: // 1 record for partition 1, 5 record for partition 2, 1 records for partition 3. @@ -334,14 +355,11 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); assertEquals(3, returnedStatuses.size()); - assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath()); - assertEquals(1, returnedStatuses.get(0).getTotalRecords()); - - assertEquals("2016/02/01", returnedStatuses.get(1).getPartitionPath()); - assertEquals(5, returnedStatuses.get(1).getTotalRecords()); - - assertEquals("2016/02/02", returnedStatuses.get(2).getPartitionPath()); - assertEquals(1, returnedStatuses.get(2).getTotalRecords()); + expectedPartitionNumRecords.clear(); + expectedPartitionNumRecords.put("2016/01/31", 1L); + expectedPartitionNumRecords.put("2016/02/01", 5L); + expectedPartitionNumRecords.put("2016/02/02", 1L); + verifyStatusResult(returnedStatuses, expectedPartitionNumRecords); } @Test @@ -399,7 +417,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { metaClient.getFs().create(new Path(Paths.get(basePath, ".hoodie", "000.commit").toString())).close(); final List<HoodieRecord> updates = dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts); - String partitionPath = updates.get(0).getPartitionPath(); + String partitionPath = writeStatus.getPartitionPath(); long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count(); CommitActionExecutor newActionExecutor = new UpsertCommitActionExecutor(jsc, config, table, instantTime, jsc.parallelize(updates)); @@ -408,4 +426,28 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect(); assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); } + + public void testBulkInsertRecords(String bulkInsertMode) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) + .withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build(); + String instantTime = HoodieTestUtils.makeNewCommitTime(); + HoodieWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(instantTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); + + // Insert new records + final JavaRDD<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert(jsc); + BulkInsertCommitActionExecutor bulkInsertExecutor = new BulkInsertCommitActionExecutor( + jsc, config, table, instantTime, inputRecords, Option.empty()); + List<WriteStatus> returnedStatuses = bulkInsertExecutor.execute().getWriteStatuses().collect(); + verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords)); + } + + @ParameterizedTest(name = "[{index}] {0}") + @ValueSource(strings = {"global_sort", "partition_sort", "none"}) + public void testBulkInsertRecordsWithGlobalSort(String bulkInsertMode) throws Exception { + testBulkInsertRecords(bulkInsertMode); + } } diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 36212d0..a4e7472 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -42,7 +42,7 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.parser.HoodieDateTimeParser; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -200,13 +200,13 @@ public class DataSourceUtils { * if the class name of UserDefinedBulkInsertPartitioner is configured through the HoodieWriteConfig. * @see HoodieWriteConfig#getUserDefinedBulkInsertPartitionerClass() */ - private static Option<UserDefinedBulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config) + private static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitioner(HoodieWriteConfig config) throws HoodieException { String bulkInsertPartitionerClass = config.getUserDefinedBulkInsertPartitionerClass(); try { return StringUtils.isNullOrEmpty(bulkInsertPartitionerClass) ? Option.empty() : - Option.of((UserDefinedBulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); + Option.of((BulkInsertPartitioner) ReflectionUtils.loadClass(bulkInsertPartitionerClass)); } catch (Throwable e) { throw new HoodieException("Could not create UserDefinedBulkInsertPartitioner class " + bulkInsertPartitionerClass, e); } @@ -258,7 +258,7 @@ public class DataSourceUtils { public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords, String instantTime, String operation) throws HoodieException { if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { - Option<UserDefinedBulkInsertPartitioner> userDefinedBulkInsertPartitioner = + Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner = createUserDefinedBulkInsertPartitioner(client.getConfig()); return client.bulkInsert(hoodieRecords, instantTime, userDefinedBulkInsertPartitioner); } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index 14e66a5..c687352 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.Option; -import org.apache.hudi.table.UserDefinedBulkInsertPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; @@ -60,12 +60,17 @@ public class DataSourceTestUtils { } public static class NoOpBulkInsertPartitioner<T extends HoodieRecordPayload> - implements UserDefinedBulkInsertPartitioner<T> { + implements BulkInsertPartitioner<T> { @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { return records; } + + @Override + public boolean arePartitionRecordsSorted() { + return false; + } } }