danielfordfc opened a new issue, #9341:
URL: https://github.com/apache/hudi/issues/9341

   **Describe the problem you faced**
   
   Apache Hudi's deltastreamer utility on EMR, with YARN as the scheduler. I've 
noticed a rather peculiar behaviour that has started causing sporadic errors in 
my jobs.
   
   When an EMR job is cancelled either manually by the user or programmatically 
(in my case, through Airflow), it seems that occasionally YARN doesn't receive 
the termination message and continues running the job in the background. This 
behavior is quite misleading, as the EMR Console indicates the job has been 
stopped, while in reality, it is still running on the cluster.
   
   The issue is further complicated when a cancelled job is rerun. It appears 
that in these scenarios, two instances of the deltastreamer job run 
concurrently, without awareness of each other. Considering the nature of 
deltastreamer, which is designed to run as a single process for a given Hudi 
table, this parallel execution leads to several inconsistencies and problems.
   
   **This is a known problem with AWS, and we have opened up a support ticket 
with them previously... Their main reply can be found in the additional context 
section below**
   
   **Symptoms**
   
   The manifestation of this issue is quite unpredictable, but some symptoms 
include:
   
   - Failure to clean up files that have already been marked for deletion
   - Concurrency errors during Glue sync operations
   - Generic failure when attempting to roll back commits
   - Incorrect rollback attempts, where the utility tries to roll back commits 
that don't require rolling back.
   
   These symptoms have led to sporadic failures of the jobs and have made our 
process quite unstable.
   
   For instance, please see a Single deltastreamer ingestion commit timeline, 
vs., when we have this issue.
   Jobs spuriously fail out between minutes and hours from starting up, when 
running the deltastreamer in `--continuous` mode
   
   ![](https://files.slack.com/files-pri/T4D7BR6T1-F05JM0V84LW/image.png)
   ![](https://files.slack.com/files-pri/T4D7BR6T1-F05JM0W23P0/image.png)
   
   
   
   **Reproduction Steps**
   
   - Submit an EMR job running Apache Hudi's deltastreamer. Example command 
inside additional context section below.
   - Cancel the job through the EMR Console or programmatically via Airflow.
   - Monitor the YARN scheduler to confirm whether it has stopped the job 
execution or if it's still running.
   - Submit another EMR job and observe the possible concurrent execution of 
the two jobs.
   
   
   
   **Expected behavior**
   
   The deltastreamer is somehow aware of the duplicate runs going on? I'm not 
even 100% certain that that IS whats happening..
   
   **Environment Description**
   
   emr-6.8.0
   S3 Storage
   Syncing Hive to Glue Catalog through Hudi Deltastreamer.
   
   **Additional context**
   
   **original slack thread about the issue:**
   
   https://apache-hudi.slack.com/archives/C4D716NPQ/p1690212730633249
   
   
   **AWS Support ticket response after a couple calls with them and back and 
forth communications:**
   
   _"As discussed, I understood that you have cancelled running steps in EMR 
Cluster from EMR console and the steps are still running in the background.
   
   I checked further at my end and I can see that multiple customers have 
reported this issue that upon cancelling any Running EMR step, the step shows 
CANCELLED on the console even though the step/related application is running at 
background. AWS EMR service team has become aware of this issue and confirmed 
this is a bug. They started working on it actively to fix it soon. But 
unfortunately, there is no ETA available on this yet.
   
   Hence currently as given here:[1] the best way to stop the running 
step/application would be to kill it using the application ID (for YARN steps) 
which you're already aware of."_
   
   **Sample Deltastreamer spark-submit command (formatted for readability)**
   
       "name": [
           {
               "Name": "name",
               "ActionOnFailure": "CONTINUE",
               "HadoopJarStep": {
                   "Jar": "command-runner.jar",
                   "Args": [
                       "spark-submit",
                       "--master", "yarn",
                       "--deploy-mode", "cluster",
                       "--executor-memory", "1g",
                       "--driver-memory", "2g",
                       "--num-executors", "2",
                       "--executor-cores","2",
                       "--class", 
"org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
                       "--conf", "spark.dynamicAllocation.enabled=false",
                       "--conf", 
"spark.serializer=org.apache.spark.serializer.KryoSerializer",
                       "--conf", "spark.sql.catalogImplementation=hive",
                       "--conf", "spark.sql.hive.convertMetastoreParquet=false",
                       "--conf", 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
                       "--conf", 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
                       "--conf", 
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                       "--conf", 
"spark.streaming.kafka.allowNonConsecutiveOffsets=true",
                       "--conf", 
"spark.hadoop.parquet.avro.write-old-list-structure=false",
                       # IMPORTANT: hudi-utilities-bundle must be declared 
immediately before any Hudi spark commands
                       "/usr/lib/hudi/hudi-utilities-bundle.jar",
                       "--source-class", 
"org.apache.hudi.utilities.sources.AvroKafkaSource",
                       "--table-type", "COPY_ON_WRITE",
                       "--op", "INSERT",
                       "--enable-sync",
                       "--continuous",
                       # Hudi write config
                       "--target-base-path", f"s3://{bucket}/raw/table_name",
                       "--target-table", "table_name",
                       "--hoodie-conf", 
"hoodie.merge.allow.duplicate.on.inserts=true",
                       "--hoodie-conf", "hoodie.database.name=table_name",
                       "--hoodie-conf", "hoodie.table.name=table_name",
                       "--hoodie-conf", 
"hoodie.datasource.write.recordkey.field=uuid",
                       "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
                       "--hoodie-conf", 
"hoodie.datasource.write.partitionpath.field=updated_at",
                       "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
                       "--hoodie-conf", 
"hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
                       "--hoodie-conf", 
"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true",
                       "--hoodie-conf", 
"hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer",
                       # Below config to avoid error discussed in 
https://jira.fundingcircle.com/browse/GDP-1265
                       "--hoodie-conf", "hoodie.write.markers.type=DIRECT",
                       # AWS Glue Data Catalog config
                       "--hoodie-conf", 
"hoodie.datasource.hive_sync.enable=true",
                       "--hoodie-conf", 
"hoodie.datasource.hive_sync.database=glue_db_name",
                       "--hoodie-conf", 
"hoodie.datasource.hive_sync.table=table_name",
                       "--hoodie-conf", 
"hoodie.datasource.hive_sync.partition_fields=_event_date",
                       # Hudi Metrics
                       "--hoodie-conf", "hoodie.metrics.on=true",
                       "--hoodie-conf", 
"hoodie.metrics.reporter.type=CLOUDWATCH",
   .... + a bunch of kafka and SR connectivity args...
                   ]
               }
           }
   
   
   **Stacktrace**
   
   Some of the stack traces we've seen highly likely due to this issue.
   
   
   1. It seems to want to clean up an already deleted file.
   ```bash
   Caused by: org.apache.hudi.exception.HoodieException: 
java.io.FileNotFoundException: No such file or directory 
's3://path-1/raw/table_name/2023/07/21/59c7616d-e78a-4d65-830d-6080ccba3098-0_0-88-78_20230721082240373.parquet'
       at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
       at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
       ... 29 more
   ```
   
   
   2. Failure to rollback error
   ```bash
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: Failed to rollback 
s3://path1/raw/table_name commits 20230721073522498
       at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
       at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
       at 
org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
       at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:190)
       ... 8 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to