This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.14.1-hotfix
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4d429ac918dae3116fbc9df420e0baccdf79ea0b
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Mon Mar 11 17:25:41 2024 -0700

    [HUDI-7489] Avoid collecting WriteStatus to driver in row writer code path 
(#10836)
    
    * get rid of collect in row writer clustering
    
    * fix race condition
    
    * add logging
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../index/bucket/ConsistentBucketIndexUtils.java   | 11 ++-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 89 +++++++++++-----------
 2 files changed, 55 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index f8befee9bf9..1e8abe09ecc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -208,7 +208,16 @@ public class ConsistentBucketIndexUtils {
     if (fs.exists(fullPath)) {
       return;
     }
-    FileIOUtils.createFileInPath(fs, fullPath, 
Option.of(StringUtils.EMPTY_STRING.getBytes()));
+    //prevent exception from race condition. We are ok with the file being 
created in another thread, so we should
+    // check for the marker after catching the exception and we don't need to 
fail if the file exists
+    try {
+      FileIOUtils.createFileInPath(fs, fullPath, 
Option.of(getUTF8Bytes(StringUtils.EMPTY_STRING)));
+    } catch (HoodieIOException e) {
+      if (!fs.exists(fullPath)) {
+        throw e;
+      }
+      LOG.warn("Failed to create marker but " + fullPath + " exists", e);
+    }
   }
 
   /***
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 9623d4d1c09..95302e9ab37 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
 import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
@@ -149,53 +150,53 @@ object HoodieDatasetBulkInsertHelper
                  arePartitionRecordsSorted: Boolean,
                  shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
     val schema = dataset.schema
-    val writeStatuses = 
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
-      val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
-      val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
-      val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
-      val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
+    HoodieJavaRDD.of(
+      injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
+        val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
+        val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
+        val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
+        val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get
 
-      val writer = writeConfig.getIndexType match {
-        case HoodieIndex.IndexType.BUCKET if 
writeConfig.getBucketIndexEngineType
-          == BucketIndexEngineType.CONSISTENT_HASHING =>
-          new ConsistentBucketBulkInsertDataInternalWriterHelper(
-            table,
-            writeConfig,
-            instantTime,
-            taskPartitionId,
-            taskId,
-            taskEpochId,
-            schema,
-            writeConfig.populateMetaFields,
-            arePartitionRecordsSorted,
-            shouldPreserveHoodieMetadata)
-        case _ =>
-          new BulkInsertDataInternalWriterHelper(
-            table,
-            writeConfig,
-            instantTime,
-            taskPartitionId,
-            taskId,
-            taskEpochId,
-            schema,
-            writeConfig.populateMetaFields,
-            arePartitionRecordsSorted,
-            shouldPreserveHoodieMetadata)
-      }
+        val writer = writeConfig.getIndexType match {
+          case HoodieIndex.IndexType.BUCKET if 
writeConfig.getBucketIndexEngineType
+            == BucketIndexEngineType.CONSISTENT_HASHING =>
+            new ConsistentBucketBulkInsertDataInternalWriterHelper(
+              table,
+              writeConfig,
+              instantTime,
+              taskPartitionId,
+              taskId,
+              taskEpochId,
+              schema,
+              writeConfig.populateMetaFields,
+              arePartitionRecordsSorted,
+              shouldPreserveHoodieMetadata)
+          case _ =>
+            new BulkInsertDataInternalWriterHelper(
+              table,
+              writeConfig,
+              instantTime,
+              taskPartitionId,
+              taskId,
+              taskEpochId,
+              schema,
+              writeConfig.populateMetaFields,
+              arePartitionRecordsSorted,
+              shouldPreserveHoodieMetadata)
+        }
 
-      try {
-        iter.foreach(writer.write)
-      } catch {
-        case t: Throwable =>
-          writer.abort()
-          throw t
-      } finally {
-        writer.close()
-      }
+        try {
+          iter.foreach(writer.write)
+        } catch {
+          case t: Throwable =>
+            writer.abort()
+            throw t
+        } finally {
+          writer.close()
+        }
 
-      writer.getWriteStatuses.asScala.iterator
-    }), SQLConf.get).collect()
-    table.getContext.parallelize(writeStatuses.toList.asJava)
+        writer.getWriteStatuses.asScala.iterator
+      }), SQLConf.get).toJavaRDD())
   }
 
   private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, 
preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {

Reply via email to