ChiehFu opened a new issue #3782: URL: https://github.com/apache/hudi/issues/3782
Hello, We are running jobs on AWS EMR to compact tables stored in S3 and maintaining Athena tables through Hudi Hive sync. Recently we started exploring Hudi multi writer and we were experiencing some issues when running concurrent Hudi upsert jobs with Hudi OCC where each job save data into a distinct partition. The errors seem happen pretty randomly and get more frequently as the number of concurrent jobs increases. **Environment Description** * Hudi version : 0.8.0 * Spark version : 2.4.7 * Hive version : 2.3.7 * Hadoop version : 2.10.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no * AWS EMR: 5.33.0 --- **Errors** Type 1 error: `FileAlreadyExistsException` ``` User class threw exception: org.apache.hudi.exception.HoodieIOException: Failed to create file s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:526) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createFileInMetaPath(HoodieActiveTimeline.java:487) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createNewInstant(HoodieActiveTimeline.java:147) at org.apache.hudi.client.AbstractHoodieWriteClient.startCommit(AbstractHoodieWriteClient.java:714) at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:700) at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:691) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:185) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) ... Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs/.hoodie/20211011064521.commit.requested at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36) ``` Type 2 error: `IllegalArgumentException` ``` java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:396) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:377) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:154) at org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:212) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:185) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) ``` Type 3 error: `HoodieRollbackException` ``` User class threw exception: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback s3://hudi_debug/test_db/hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs commits 20211011064933 at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:593) at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:808) at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:797) at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$clean$1cda88ee$1(AbstractHoodieWriteClient.java:648) at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:135) at org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:647) at org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:630) at org.apache.hudi.client.AbstractHoodieWriteClient.clean(AbstractHoodieWriteClient.java:661) at org.apache.hudi.client.AbstractHoodieWriteClient.autoCleanOnCommit(AbstractHoodieWriteClient.java:494) at org.apache.hudi.client.AbstractHoodieWriteClient.postCommit(AbstractHoodieWriteClient.java:431) at org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:186) at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:121) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:476) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:222) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) ... Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20211011064933__commit__REQUESTED] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:196) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:173) at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:225) at ``` Type 4 error: Athena table wasn't updated by Hudi to include new partition properly. Our understanding is that with `hoodie.datasource.hive_sync.enable` set to true, Hudi is suppose to keep the corresponding Athena table up-to-date for any schema change and and new partitions. However, we found that sometime the partition is missing in Athena table after a job completed successfully. We were able to query the data from the missing partition by in a Hudi snapshot table, which suggests that the data is there it's just that for some reason Athena table didn't pick up that partition. --- **Error Reproduction** To reproduce the errors, I built a Scala jar that contains the logic to generate a testing data and save with `optimistic_concurrency_control` enabled, and ran 8 spark steps concurrently. ```Scala package com.hudioccdebug import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.sql.functions.{current_timestamp, lit, udf} object HudiOccDebug { val spark: SparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() val sqlContext = new org.apache.spark.sql.SQLContext(spark.sparkContext) import sqlContext.implicits._ def main(args: Array[String]): Unit = { val dbName = "test_db" val tableName = args(0) val rowCount = args(1).toInt val batchNum = args(2).toInt val lockProvider = args(3) val s3Location = "s3://hudi_debug" val r = scala.util.Random val n = 60 for (i <- 1 to batchNum) { println(s"Starting batch $i") val m = (r.nextInt % n + n) % n println(s"sleep for $m seconds before upsert") Thread.sleep(m * 1000) val df = generateTestingDF(spark, rowCount, spark.sparkContext.applicationId) saveDf(s3Location, dbName, tableName, df, getHudiOptions(dbName, tableName, lockProvider)) } } def generateTestingDF(spark: SparkSession, rowCount: Int = 1000000, partitionKey: String): DataFrame = { val uuid = udf(() => java.util.UUID.randomUUID().toString) def randomStringGen(length: Int) = scala.util.Random.alphanumeric.take(length).mkString val df = spark.sparkContext.parallelize( Seq.fill(rowCount){(randomStringGen(4), randomStringGen(4), randomStringGen(6))}, 10 ).toDF("col_1", "col_2", "col_3").withColumn("partition", lit(partitionKey)).withColumn("uuid", uuid()).withColumn("ts", current_timestamp()) df } def getHudiOptions(dbName:String, tableName: String, lockProvider: String) : Map[String, String] = { def getHiveServerURI: String = { val hiveMetastoreURIs = new HiveConf().get("hive.metastore.uris") val parsedMetastoreURI = if (hiveMetastoreURIs != null) hiveMetastoreURIs.replaceAll("thrift://", "") else ":" val hiveServer2URI = parsedMetastoreURI.substring(0, parsedMetastoreURI.lastIndexOf(":")) hiveServer2URI } var m = Map[String, String]( "hoodie.table.name" -> tableName, "hoodie.datasource.write.table.name" -> tableName, "hoodie.consistency.check.enabled" -> "true", "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE", "hoodie.datasource.write.recordkey.field" -> "uuid", "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field" -> "partition", "hoodie.datasource.write.precombine.field" -> "ts", "hoodie.parquet.max.file.size" -> String.valueOf(500 * 1024 * 1024), "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.jdbcurl" -> s"jdbc:hive2://${getHiveServerURI}:10000", "hoodie.datasource.hive_sync.database" -> dbName, "hoodie.datasource.hive_sync.table" -> tableName, "hoodie.datasource.hive_sync.partition_fields" -> "partition", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.write.operation" -> "upsert", "hoodie.fail.on.timeline.archiving" -> "false" ) if (lockProvider.equals("ZookeeperBasedLockProvider")) { print("Using ZookeeperBasedLockProvider") m += ( "hoodie.write.lock.zookeeper.url" -> "ip-10-0-227-209.ec2.internal", "hoodie.write.lock.zookeeper.port" -> "2181", "hoodie.write.lock.zookeeper.lock_key" -> "test_table", "hoodie.write.lock.zookeeper.base_path" -> "/test", "hoodie.write.concurrency.mode" -> "optimistic_concurrency_control", "hoodie.cleaner.policy.failed.writes" -> "LAZY", "hoodie.write.lock.provider" -> "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider", "hoodie.write.lock.client.num_retries" -> "20", "hoodie.write.lock.wait_time_ms" -> "20000", "hoodie.write.lock.wait_time_ms_between_retry" -> "5000" ) } m } def saveDf( s3Location:String, dbName:String, tableName:String, inputDf: DataFrame, options: Map[String, String] ): Unit = { inputDf.write .format("org.apache.hudi") .options(options) .mode(SaveMode.Append) .save("%s/%s/%s".format(s3Location, dbName, tableName)) } } ``` ``` spark-submit \ --deploy-mode cluster \ --executor-memory 43g \ --driver-memory 43g \ --executor-cores 6 \ --class com.hudioccdebug.HudiOccDebug \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \ s3://hudi_debug/jars/hudi_occ_debug.jar "hudi_occ_debug_zoo_1_batch_v2_8_concurrent_jobs" "500000" "1" "ZookeeperBasedLockProvider" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org