[jira] [Created] (HADOOP-17935) Spark job stuck in S3A StagingCommitter::setupJob

2021-09-23 Thread Brandon (Jira)
Brandon created HADOOP-17935:


 Summary: Spark job stuck in S3A StagingCommitter::setupJob
 Key: HADOOP-17935
 URL: https://issues.apache.org/jira/browse/HADOOP-17935
 Project: Hadoop Common
  Issue Type: Bug
  Components: fs/s3
Affects Versions: 3.2.1
 Environment: Spark 2.4.4
Hadoop 3.2.1
"spark.hadoop.fs.s3a.committer.name": "directory"
Reporter: Brandon


This is using the S3A directory staging committer, the Spark driver gets stuck 
in a retry loop inside setupJob. Here's a stack trace:


{noformat}
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$1753/2105635903.apply(Unknown
 Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
org.apache.spark.sql.DataFrameWriter$$Lambda$1752/114484787.apply(Unknown 
Source)
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:85)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
 = holding Monitor(org.apache.spark.sql.execution.QueryExecution@705144571})
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1574/1384254911.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1573/696771575.apply(Unknown 
Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
 = holding 
Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@539925125})
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:170)
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter.setupJob(DirectoryStagingCommitter.java:65)
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.setupJob(StagingCommitter.java:458)
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:355)
org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2062)
org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2129)
org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2808)
org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2833)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.Invoker$$Lambda$232/695085082.execute(Unknown Source)
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1932/855044548.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2835)
org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1589)
org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:2751)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:2785)
org.apache.hadoop.fs.s3a.S3AFileSystem.removeKeys(S3AFileSystem.java:1717)
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteObjects(S3AFileSystem.java:1457)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
org.apache.hadoop.fs.s3a.S3AFileSystem$$Lambda$1933/1245120662.execute(Unknown 
Source)
org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$deleteObjects$8(S3AFileSystem.java:1461)

[jira] [Created] (HADOOP-17377) ABFS: Frequent HTTP429 exceptions with MSI token provider

2020-11-12 Thread Brandon (Jira)
Brandon created HADOOP-17377:


 Summary: ABFS: Frequent HTTP429 exceptions with MSI token provider
 Key: HADOOP-17377
 URL: https://issues.apache.org/jira/browse/HADOOP-17377
 Project: Hadoop Common
  Issue Type: Bug
  Components: fs/azure
Affects Versions: 3.2.1
Reporter: Brandon


*Summary*
 The MSI token provider fetches auth tokens from the local instance metadata 
service.
 The instance metadata service documentation states a limit of 5 requests per 
second: 
[https://docs.microsoft.com/en-us/azure/virtual-machines/windows/instance-metadata-service#error-and-debugging]
 which is fairly low.

Using ABFS and the MSI token provider, especially when used from multiple 
threads, ABFS frequently throws HTTP429 throttled exception. The implementation 
for fetching a token from MSI uses ExponentialRetryPolicy, however 
ExponentialRetryPolicy does not retry on status code 429, from my read of the 
code.

So an initial idea is that the ExponentialRetryPolicy could retry HTTP429 
errors.

Another potential enhancement, though more complicated, is to use a static 
cache for the MSI tokens. The cache would be shared by all threads in the JVM.

*Environment*
 This is in the context of Spark clusters running on Azure Virtual Machine 
Scale Sets. The Virtual Machine Scale Set is configured with a user-assigned 
identity. The Spark cluster is configured to download application JARs from an 
`abfs://` path, and auth to the storage account with the MSI token provider. 
The Spark version is 2.4.4. Hadoop libraries are version 3.2.1. More details on 
the Spark configuration: each VM runs 3 executor processes, and each executor 
process uses 5 cores. So I expect a maximum of 15 concurrent requests to MSI 
when the application is starting up and fetching its JAR.

*Impact*
 In my particular use case, the download operation itself is wrapped with 3 
additional retries. I have never seen the download cause all the tries to be 
exhausted and fail. In the end, it seems to contribute mostly noise and 
slowness from the retries. However, having the HTTP429 handled robustly in the 
ABFS implementation would help application developers succeed and write cleaner 
code without wrapping individual ABFS operations with retries.

*Example*
 Here's an example error message and stack trace. It's always the same stack 
trace. This appears in my logs a few hundred to low thousands of times a day.
{noformat}
AADToken: HTTP connection failed for getting token from AzureAD. Http response: 
429 null
Content-Type: application/json; charset=utf-8 Content-Length: 90 Request ID:  
Proxies: none
First 1K of Body: {"error":"invalid_request","error_description":"Temporarily 
throttled, too many requests"}
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.executeHttpOperation(AbfsRestOperation.java:190)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:125)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:506)
at 
org.apache.hadoop.fs.azurebfs.services.AbfsClient.getAclStatus(AbfsClient.java:489)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getIsNamespaceEnabled(AzureBlobFileSystemStore.java:208)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFileStatus(AzureBlobFileSystemStore.java:473)
at 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.getFileStatus(AzureBlobFileSystem.java:437)
at org.apache.hadoop.fs.FileSystem.isFile(FileSystem.java:1717)
at org.apache.spark.util.Utils$.fetchHcfsFile(Utils.scala:747)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:724)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:496)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:812)
at 
org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:803)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:792)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:791)
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:803)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at