This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3eb6de6d00b7f71faf74d37ce55f79c3b4e25d60
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Mon Aug 28 07:17:45 2023 -0400

    [HUDI-4631] Adding retries to spark datasource writes on conflict failures 
(#6854)
    
    Added a retry functionality to spark datasource writes automatically incase 
of conflict failures.
    User experience w/ multi-writers will be improved with these automatic 
retries.
    
    ---------
    
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../org/apache/hudi/config/HoodieLockConfig.java   | 16 ++++--
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  6 ++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 40 +++++++++++--
 .../apache/hudi/functional/TestCOWDataSource.scala | 66 +++++++++++++++++++++-
 4 files changed, 116 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index 1d5b09629e4..b24aecf46c1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -217,16 +217,24 @@ public class HoodieLockConfig extends HoodieConfig {
       .withDocumentation("Lock provider class name, this should be subclass of 
"
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
 
-  /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} 
and its methods instead */
+  /**
+   * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} 
and its methods instead
+   */
   @Deprecated
   public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = 
WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key();
-  /** @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} 
and its methods instead */
+  /**
+   * @deprecated Use {@link #WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME} 
and its methods instead
+   */
   @Deprecated
   public static final String DEFAULT_WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS 
= WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.defaultValue();
-  /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods 
instead */
+  /**
+   * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
+   */
   @Deprecated
   public static final String LOCK_PROVIDER_CLASS_PROP = 
LOCK_PROVIDER_CLASS_NAME.key();
-  /** @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods 
instead */
+  /**
+   * @deprecated Use {@link #LOCK_PROVIDER_CLASS_NAME} and its methods instead
+   */
   @Deprecated
   public static final String DEFAULT_LOCK_PROVIDER_CLASS = 
LOCK_PROVIDER_CLASS_NAME.defaultValue();
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ba94d80d674..01b8fa55948 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -558,6 +558,12 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
       .withDocumentation(WriteConcurrencyMode.class);
 
+  public static final ConfigProperty<Integer> NUM_RETRIES_ON_CONFLICT_FAILURES 
= ConfigProperty
+      .key("hoodie.write.num.retries.on.conflict.failures")
+      .defaultValue(0)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Maximum number of times to retry a batch on conflict 
failure.");
+
   public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = 
ConfigProperty
       .key("hoodie.write.schema")
       .noDefaultValue()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e98d72d8284..57baba29c92 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.AutoRecordKeyGenerationUtils.{isAutoGenerateRecordKeys, 
mayBeValidateParamsForAutoGenerationOfRecordKeys}
+import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
 import 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
@@ -48,17 +48,15 @@ import org.apache.hudi.common.util.{CommitUtils, 
StringUtils, Option => HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import 
org.apache.hudi.config.HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieInternalConfig, 
HoodieWriteConfig}
-import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
+import org.apache.hudi.exception.{HoodieException, 
HoodieWriteConflictException, SchemaCompatibilityException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
-import org.apache.hudi.index.HoodieIndex
-import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
 import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName
-import org.apache.hudi.keygen.{BaseKeyGenerator, KeyGenUtils, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
+import org.apache.hudi.keygen.{BaseKeyGenerator, 
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
 import org.apache.hudi.metrics.Metrics
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.hudi.sync.common.util.SyncUtilHelpers
@@ -122,6 +120,38 @@ object HoodieSparkSqlWriter {
             sourceDf: DataFrame,
             streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
             hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+
+  (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
+    var succeeded = false
+    var counter = 0
+    val maxRetry: Integer = 
Integer.parseInt(optParams.getOrElse(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(),
 HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.defaultValue().toString))
+    var toReturn: (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = null
+
+    while (counter <= maxRetry && !succeeded) {
+      try {
+        toReturn = writeInternal(sqlContext, mode, optParams, sourceDf, 
streamingWritesParamsOpt, hoodieWriteClient)
+        log.warn(s"Succeeded with attempt no $counter")
+        succeeded = true
+      } catch {
+        case e: HoodieWriteConflictException =>
+          val writeConcurrencyMode = 
optParams.getOrElse(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.defaultValue())
+          if 
(writeConcurrencyMode.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
 && counter < maxRetry) {
+            counter += 1
+            log.warn(s"Conflict found. Retrying again for attempt no $counter")
+          } else {
+            throw e
+          }
+      }
+    }
+    toReturn
+  }
+
+  def writeInternal(sqlContext: SQLContext,
+                    mode: SaveMode,
+                    optParams: Map[String, String],
+                    sourceDf: DataFrame,
+                    streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
+                    hoodieWriteClient: Option[SparkRDDWriteClient[_]] = 
Option.empty):
   (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
 
     assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), 
"'path' must be set")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bb36b9cdd27..104996d5c4f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -23,11 +23,11 @@ import 
org.apache.hudi.DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
+import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, TimelineUtils}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -59,6 +59,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
 
 import java.sql.{Date, Timestamp}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.function.Consumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -555,11 +556,70 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     assertEquals(snapshotDF2.count(), 80)
   }
 
+  /**
+   * Test retries on conflict failures.
+   */
+  @ParameterizedTest
+  @ValueSource(ints = Array(0, 2))
+  def testCopyOnWriteConcurrentUpdates(numRetries: Integer): Unit = {
+    initTestDataGenerator()
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 
1000)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control")
+      .option("hoodie.cleaner.policy.failed.writes", "LAZY")
+      .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val snapshotDF1 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(1000, snapshotDF1.count())
+
+    val countDownLatch = new CountDownLatch(2)
+    for (x <- 1 to 2) {
+      val thread = new Thread(new UpdateThread(dataGen, spark, commonOpts, 
basePath, x + "00", countDownLatch, numRetries))
+      thread.setName((x + "00_THREAD").toString())
+      thread.start()
+    }
+    countDownLatch.await(1, TimeUnit.MINUTES)
+
+    val snapshotDF2 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    if (numRetries > 0) {
+      assertEquals(snapshotDF2.count(), 3000)
+      assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size(), 3)
+    } else {
+      // only one among two threads will succeed and hence 2000
+      assertEquals(snapshotDF2.count(), 2000)
+      assertEquals(HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
"000").size(), 2)
+    }
+  }
+
+  class UpdateThread(dataGen: HoodieTestDataGenerator, spark: SparkSession, 
commonOpts: Map[String, String], basePath: String,
+                     instantTime: String, countDownLatch: CountDownLatch, 
numRetries: Integer = 0) extends Runnable {
+    override def run() {
+      val updateRecs = 
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 500)).toList
+      val insertRecs = recordsToStrings(dataGen.generateInserts(instantTime, 
1000)).toList
+      val updateDf = 
spark.read.json(spark.sparkContext.parallelize(updateRecs, 2))
+      val insertDf = 
spark.read.json(spark.sparkContext.parallelize(insertRecs, 2))
+      updateDf.union(insertDf).write.format("org.apache.hudi")
+        .options(commonOpts)
+        .option("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control")
+        .option("hoodie.cleaner.policy.failed.writes", "LAZY")
+        .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider")
+        .option(HoodieWriteConfig.NUM_RETRIES_ON_CONFLICT_FAILURES.key(), 
numRetries.toString)
+        .mode(SaveMode.Append)
+        .save(basePath)
+      countDownLatch.countDown()
+    }
+  }
+
   @ParameterizedTest
   @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
   def testOverWriteModeUseReplaceAction(recordType: HoodieRecordType): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts(recordType)
-
     val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
     val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
     inputDF1.write.format("org.apache.hudi")

Reply via email to