[ 
https://issues.apache.org/jira/browse/HUDI-739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Catalin Alexandru Zamfir updated HUDI-739:
------------------------------------------
    Description: 
We are evaluating Hudi to use for our near real-time ingestion needs, compared 
to other solutions (Delta/Iceberg). We've picked Hudi because pre-installed 
with Amazon EMR by AWS. However, adopting it is blocking on this issue with 
concurrent small batch (of 256 files) write jobs (to the same S3 path).

Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with 
EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. We're 
writing Spark SQL datasets promoted up from RDDs. The 
"hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. 
Hoodie version is 0.5.0-incubating.

Both on COW and MOR tables some of the submitted jobs are failing with the 
below exception:
{code:java}
org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant 
[==>20200326175252__deltacommit__INFLIGHT]
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
        at 
org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
        at 
org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
        at 
org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
        at 
org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
        at 
org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
        at 
org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
        at 
org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
{code}
The jobs are sent in concurrent batches of 256 files, over the same S3 path, in 
total some 8k files for 6 hours of our data.

Writing happens with the following code (basePath is an S3 bucket):
{code:java}
// Configs (edited)
String databaseName = "nrt";
String assumeYmdPartitions = "false";
String extractorClass = MultiPartKeysValueExtractor.class.getName ();
String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
String basePath = "s3a://some_path_to_hudi";
String avroSchemaAsString = avroSchema.toString ();
String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");

eventsDataset.write ()
    .format ("org.apache.hudi")
    .option (HoodieWriteConfig.TABLE_NAME, tableName)
    .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
    .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
    .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), 
"partition_path")
    .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
    .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
    .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
    .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
    .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), 
"tenant,year,month,day")
    .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
    .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), 
assumeYmdPartitions)
    .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), 
extractorClass)
    .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
.mode (SaveMode.Append)
.save (String.format ("%s/%s", basePath, tableName));
{code}

  was:
Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with 
EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. We're 
writing Spark SQL datasets promoted up from RDDs. The 
"hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. 
Hoodie version is 0.5.0-incubating.

Both on COW and MOR tables some of the submitted jobs are failing with the 
below exception:
{code:java}
org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant 
[==>20200326175252__deltacommit__INFLIGHT]
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
        at 
org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
        at 
org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
        at 
org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
        at 
org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
        at 
org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
        at 
org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
        at 
org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
        at 
org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
{code}
The jobs are sent in concurrent batches of 256 files, over the same S3 path, in 
total some 8k files for 6 hours of our data.

Writing happens with the following code (basePath is an S3 bucket):
{code:java}
// Configs (edited)
String databaseName = "nrt";
String assumeYmdPartitions = "false";
String extractorClass = MultiPartKeysValueExtractor.class.getName ();
String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
String basePath = "s3a://some_path_to_hudi";
String avroSchemaAsString = avroSchema.toString ();
String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");

eventsDataset.write ()
    .format ("org.apache.hudi")
    .option (HoodieWriteConfig.TABLE_NAME, tableName)
    .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
    .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
    .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), 
"partition_path")
    .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
    .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
    .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
    .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
    .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), 
"tenant,year,month,day")
    .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
    .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), 
assumeYmdPartitions)
    .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), 
extractorClass)
    .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
.mode (SaveMode.Append)
.save (String.format ("%s/%s", basePath, tableName));
{code}


> HoodieIOException: Could not delete in-flight instant
> -----------------------------------------------------
>
>                 Key: HUDI-739
>                 URL: https://issues.apache.org/jira/browse/HUDI-739
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Common Core
>    Affects Versions: 0.5.0
>            Reporter: Catalin Alexandru Zamfir
>            Priority: Blocker
>              Labels: AWS, S3
>
> We are evaluating Hudi to use for our near real-time ingestion needs, 
> compared to other solutions (Delta/Iceberg). We've picked Hudi because 
> pre-installed with Amazon EMR by AWS. However, adopting it is blocking on 
> this issue with concurrent small batch (of 256 files) write jobs (to the same 
> S3 path).
> Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR 
> with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. 
> We're writing Spark SQL datasets promoted up from RDDs. The 
> "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. 
> Hoodie version is 0.5.0-incubating.
> Both on COW and MOR tables some of the submitted jobs are failing with the 
> below exception:
> {code:java}
> org.apache.hudi.exception.HoodieIOException: Could not delete in-flight 
> instant [==>20200326175252__deltacommit__INFLIGHT]
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
>       at 
> org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
>       at 
> org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
>       at 
> org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
>       at 
> org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
>       at 
> org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
>       at 
> org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
>       at 
> org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
>       at 
> org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
>       at 
> org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> {code}
> The jobs are sent in concurrent batches of 256 files, over the same S3 path, 
> in total some 8k files for 6 hours of our data.
> Writing happens with the following code (basePath is an S3 bucket):
> {code:java}
> // Configs (edited)
> String databaseName = "nrt";
> String assumeYmdPartitions = "false";
> String extractorClass = MultiPartKeysValueExtractor.class.getName ();
> String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
> String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
> String hiveJdbcUri = 
> "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
> String basePath = "s3a://some_path_to_hudi";
> String avroSchemaAsString = avroSchema.toString ();
> String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
> eventsDataset.write ()
>     .format ("org.apache.hudi")
>     .option (HoodieWriteConfig.TABLE_NAME, tableName)
>     .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
>     .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
>     .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), 
> "partition_path")
>     .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
>     .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
>     .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
>     .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), 
> "tenant,year,month,day")
>     .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
>     .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), 
> assumeYmdPartitions)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY 
> (), extractorClass)
>     .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
> .mode (SaveMode.Append)
> .save (String.format ("%s/%s", basePath, tableName));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to