This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.14.1-hotfix in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4d429ac918dae3116fbc9df420e0baccdf79ea0b Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Mon Mar 11 17:25:41 2024 -0700 [HUDI-7489] Avoid collecting WriteStatus to driver in row writer code path (#10836) * get rid of collect in row writer clustering * fix race condition * add logging --------- Co-authored-by: Jonathan Vexler <=> --- .../index/bucket/ConsistentBucketIndexUtils.java | 11 ++- .../hudi/HoodieDatasetBulkInsertHelper.scala | 89 +++++++++++----------- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index f8befee9bf9..1e8abe09ecc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -208,7 +208,16 @@ public class ConsistentBucketIndexUtils { if (fs.exists(fullPath)) { return; } - FileIOUtils.createFileInPath(fs, fullPath, Option.of(StringUtils.EMPTY_STRING.getBytes())); + //prevent exception from race condition. We are ok with the file being created in another thread, so we should + // check for the marker after catching the exception and we don't need to fail if the file exists + try { + FileIOUtils.createFileInPath(fs, fullPath, Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING))); + } catch (HoodieIOException e) { + if (!fs.exists(fullPath)) { + throw e; + } + LOG.warn("Failed to create marker but " + fullPath + " exists", e); + } } /*** diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 9623d4d1c09..95302e9ab37 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory} @@ -149,53 +150,53 @@ object HoodieDatasetBulkInsertHelper arePartitionRecordsSorted: Boolean, shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = { val schema = dataset.schema - val writeStatuses = injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => { - val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier - val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get - val taskId = taskContextSupplier.getStageIdSupplier.get.toLong - val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get + HoodieJavaRDD.of( + injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => { + val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier + val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get + val taskId = taskContextSupplier.getStageIdSupplier.get.toLong + val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get - val writer = writeConfig.getIndexType match { - case HoodieIndex.IndexType.BUCKET if writeConfig.getBucketIndexEngineType - == BucketIndexEngineType.CONSISTENT_HASHING => - new ConsistentBucketBulkInsertDataInternalWriterHelper( - table, - writeConfig, - instantTime, - taskPartitionId, - taskId, - taskEpochId, - schema, - writeConfig.populateMetaFields, - arePartitionRecordsSorted, - shouldPreserveHoodieMetadata) - case _ => - new BulkInsertDataInternalWriterHelper( - table, - writeConfig, - instantTime, - taskPartitionId, - taskId, - taskEpochId, - schema, - writeConfig.populateMetaFields, - arePartitionRecordsSorted, - shouldPreserveHoodieMetadata) - } + val writer = writeConfig.getIndexType match { + case HoodieIndex.IndexType.BUCKET if writeConfig.getBucketIndexEngineType + == BucketIndexEngineType.CONSISTENT_HASHING => + new ConsistentBucketBulkInsertDataInternalWriterHelper( + table, + writeConfig, + instantTime, + taskPartitionId, + taskId, + taskEpochId, + schema, + writeConfig.populateMetaFields, + arePartitionRecordsSorted, + shouldPreserveHoodieMetadata) + case _ => + new BulkInsertDataInternalWriterHelper( + table, + writeConfig, + instantTime, + taskPartitionId, + taskId, + taskEpochId, + schema, + writeConfig.populateMetaFields, + arePartitionRecordsSorted, + shouldPreserveHoodieMetadata) + } - try { - iter.foreach(writer.write) - } catch { - case t: Throwable => - writer.abort() - throw t - } finally { - writer.close() - } + try { + iter.foreach(writer.write) + } catch { + case t: Throwable => + writer.abort() + throw t + } finally { + writer.close() + } - writer.getWriteStatuses.asScala.iterator - }), SQLConf.get).collect() - table.getContext.parallelize(writeStatuses.toList.asJava) + writer.getWriteStatuses.asScala.iterator + }), SQLConf.get).toJavaRDD()) } private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {