ChiehFu opened a new issue #3782:
URL: https://github.com/apache/hudi/issues/3782


   Hello,
   
   We are running jobs on AWS EMR to compact tables stored in S3 and 
maintaining Athena tables through Hudi Hive sync. 
   
   Recently we started exploring Hudi multi writer and we were experiencing 
some issues when running concurrent Hudi upsert jobs with Hudi OCC where each 
job save data into a distinct partition. 
   
   The errors seem happen pretty randomly and get more frequently as the number 
of concurrent jobs increases.
   
   
   **Environment Description**
   
   * Hudi version : 0.8.0
   
   * Spark version : 2.4.7
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.10.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   * AWS EMR: 5.33.0
   
   ---
   
   **Errors**
   Type 1 error: 
   
   `FileAlreadyExistsException`
   ```
   User class threw exception: org.apache.hudi.exception.HoodieIOException: 
Failed to create file 
s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:526)
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:487)
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstant(HoodieActiveTimeline.java:147)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.startCommit(AbstractHoodieWriteClient.java:714)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:700)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:691)
   at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185)
   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
   at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 
   ...
   Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already 
exists:s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested
   at 
com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
 
   ```
   
   Type 2 error:
   
   `IllegalArgumentException`
   ```
   java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   ```
   
   
   Type 3 error:
   
   `HoodieRollbackException`
   ```
    User class threw exception: 
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback 
s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs commits 
20211011064933
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:593)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:808)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:797)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.lambda$clean$1cda88ee$1(AbstractHoodieWriteClient.java:648)
   at 
org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:135)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:647)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:630)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:661)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.autoCleanOnCommit(AbstractHoodieWriteClient.java:494)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.postCommit(AbstractHoodieWriteClient.java:431)
   at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:186)
   at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121)
   at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476)
   at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222)
   at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) 
   ...
    Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete 
instant [==>20211011064933__commit__REQUESTED]
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:196)
   at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:173)
   at 
org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:225)
   at 
   ```
   
   Type 4 error:
   
   Athena table wasn't updated by Hudi to include new partition properly.
   
   Our understanding is that with `hoodie.datasource.hive_sync.enable` set to 
true, Hudi is suppose to keep the corresponding Athena table up-to-date for any 
schema change and and new partitions. 
   However, we found that sometime the partition is missing in Athena table 
after a job completed successfully.
   
   We were able to query the data from the missing partition by in a Hudi 
snapshot table, which suggests that the data is there it's just that for some 
reason Athena table didn't pick up that partition.
   
   
   ---
   
   **Error Reproduction**
   To reproduce the errors, I built a Scala jar that contains the logic to 
generate a testing data and save with `optimistic_concurrency_control` enabled, 
and ran 8 spark steps concurrently.
   
   ```Scala
   package com.hudioccdebug
   
   import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
   import org.apache.hadoop.hive.conf.HiveConf
   import org.apache.spark.sql.functions.{current_timestamp, lit, udf}
   
   object HudiOccDebug {
     val spark: SparkSession = 
SparkSession.builder.enableHiveSupport().getOrCreate()
     val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext)
     import sqlContext.implicits._
   
     def main(args: Array[String]): Unit = {
       val dbName = "test_db"
       val tableName = args(0)
       val rowCount = args(1).toInt
       val batchNum = args(2).toInt
       val lockProvider = args(3)
   
       val s3Location = "s3://hudi_debug"
       val r = scala.util.Random
       val n = 60
   
       for (i <- 1 to batchNum) {
         println(s"Starting batch $i")
   
         val m = (r.nextInt % n + n) % n
         println(s"sleep for $m seconds before upsert")
         Thread.sleep(m * 1000)
   
         val df = generateTestingDF(spark, rowCount, 
spark.sparkContext.applicationId)
         saveDf(s3Location, dbName, tableName, df, getHudiOptions(dbName, 
tableName, lockProvider))
       }
     }
   
     def generateTestingDF(spark: SparkSession, rowCount: Int = 1000000, 
partitionKey: String): DataFrame = {
       val uuid = udf(() => java.util.UUID.randomUUID().toString)
       def randomStringGen(length: Int) = 
scala.util.Random.alphanumeric.take(length).mkString
   
   
       val df = spark.sparkContext.parallelize(
         Seq.fill(rowCount){(randomStringGen(4), randomStringGen(4), 
randomStringGen(6))}, 10
       ).toDF("col_1", "col_2", "col_3").withColumn("partition", 
lit(partitionKey)).withColumn("uuid", uuid()).withColumn("ts", 
current_timestamp())
       df
     }
   
   
     def getHudiOptions(dbName:String, tableName: String, lockProvider: String) 
: Map[String, String] = {
       def getHiveServerURI: String = {
           val hiveMetastoreURIs = new HiveConf().get("hive.metastore.uris")
           val parsedMetastoreURI = if (hiveMetastoreURIs != null) 
hiveMetastoreURIs.replaceAll("thrift://", "") else ":"
           val hiveServer2URI = parsedMetastoreURI.substring(0, 
parsedMetastoreURI.lastIndexOf(":"))
           hiveServer2URI
       }
       var m = Map[String, String](
           "hoodie.table.name" -> tableName,
           "hoodie.datasource.write.table.name" -> tableName,
           "hoodie.consistency.check.enabled" -> "true",
           "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
           "hoodie.datasource.write.recordkey.field" -> "uuid",
           "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.ComplexKeyGenerator",
           "hoodie.datasource.write.partitionpath.field" -> "partition",
           "hoodie.datasource.write.precombine.field" -> "ts",
           "hoodie.parquet.max.file.size" -> String.valueOf(500 * 1024 * 1024),
           "hoodie.datasource.hive_sync.enable" -> "true",
           "hoodie.datasource.hive_sync.jdbcurl" -> 
s"jdbc:hive2://${getHiveServerURI}:10000",
           "hoodie.datasource.hive_sync.database" -> dbName,
           "hoodie.datasource.hive_sync.table" -> tableName,
           "hoodie.datasource.hive_sync.partition_fields" -> "partition",
           "hoodie.datasource.hive_sync.partition_extractor_class" -> 
"org.apache.hudi.hive.MultiPartKeysValueExtractor",
           "hoodie.datasource.write.operation" -> "upsert",
           "hoodie.fail.on.timeline.archiving" -> "false"
       )
   
       if (lockProvider.equals("ZookeeperBasedLockProvider")) {
         print("Using ZookeeperBasedLockProvider")
         m += (
           "hoodie.write.lock.zookeeper.url" -> "ip-10-0-227-209.ec2.internal",
           "hoodie.write.lock.zookeeper.port" -> "2181",
           "hoodie.write.lock.zookeeper.lock_key" -> "test_table",
           "hoodie.write.lock.zookeeper.base_path" -> "/test",
           "hoodie.write.concurrency.mode" -> "optimistic_concurrency_control",
           "hoodie.cleaner.policy.failed.writes" -> "LAZY",
           "hoodie.write.lock.provider" -> 
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
           "hoodie.write.lock.client.num_retries" -> "20",
           "hoodie.write.lock.wait_time_ms" -> "20000",
           "hoodie.write.lock.wait_time_ms_between_retry" -> "5000"
         )
       }
   
       m
     }
   
   
     def saveDf(
           s3Location:String,
           dbName:String,
           tableName:String,
           inputDf: DataFrame,
           options: Map[String, String]
       ): Unit = {
       inputDf.write
         .format("org.apache.hudi")
         .options(options)
         .mode(SaveMode.Append)
         .save("%s/%s/%s".format(s3Location, dbName, tableName))
     }
   
   }
   
   ```
   
   ```
   spark-submit \
   --deploy-mode cluster \
   --executor-memory 43g \ 
   --driver-memory 43g \ 
   --executor-cores 6 \
   --class com.hudioccdebug.HudiOccDebug \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --jars 
/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar 
\ 
   s3://hudi_debug/jars/hudi_occ_debug.jar 
"hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs" "500000" "1" 
"ZookeeperBasedLockProvider"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to