[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18012491#comment-18012491
 ] 

ASF GitHub Bot commented on MAPREDUCE-7508:
-------------------------------------------

liangyu-1 opened a new pull request, #7859:
URL: https://github.com/apache/hadoop/pull/7859

   …on because of some concurrent execution.
   
   <!--
     Thanks for sending a pull request!
       1. If this is your first time, please read our contributor guidelines: 
https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
       2. Make sure your PR title starts with JIRA issue id, e.g., 
'HADOOP-17799. Your PR title ...'.
   -->
   
   ### Description of PR
   As Described in 
[MAPREDUCE-7508](https://issues.apache.org/jira/browse/MAPREDUCE-7508)
   
   When Spark scans for files, it uses the FileInputFormat.getSplits() method 
to split the file. The first step in getSplits is to retrieve the file's 
length. If the file length is not zero, the next step is to get the block 
locations array for that file. However, if the two upstream programs rapidly 
create and write to the same file (i.e., the file is overwritten or appended to 
almost simultaneously), a race condition may occur:
   
   The file's length is already non-zero,
   
   but calling getFileBlockLocations() returns an empty array because the file 
is being overwritten or is not yet fully written.
   
   When this happens, subsequent logic in getSplits (such as accessing the last 
element of the block locations array) will throw an 
ArrayIndexOutOfBoundsException because the block locations array is 
unexpectedly empty.
   
   ### How was this patch tested?
   
   I rebuild the project and ran on our cluster, spark did not throw Execptions.
   
   ### For code changes:
   
   If Array `blkLocations` is empty, it will continue to next iteration, so 
that it will now find the the last blockLocation of this file.
   
   - [ ] Does the title or this PR starts with the corresponding JIRA issue id 
(e.g. 'HADOOP-17799. Your PR title ...')?
   - [ ] Object storage: have the integration tests been executed and the 
endpoint declared according to the connector-specific documentation?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, 
`NOTICE-binary` files?
   
   




> FileInputFormat can throw ArrayIndexOutofBoundsException because of some 
> concurrent execution.
> ----------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7508
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7508
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mapreduce-client
>            Reporter: liang yu
>            Priority: Major
>
> Using Spark Streaming (version 2.4.0), Hadoop Mapreduce-client (version 2.6.5)
> {*}Scenario{*}:
> I am using Spark Streaming to process files stored in HDFS. In my setup, the 
> upstream system sometimes starts two identical tasks that attempt to create 
> and write to the same HDFS file simultaneously. This can lead to conflicts 
> where a file is created and written to twice in quick succession.
> {*}Problem{*}:
> When Spark scans for files, it uses the FileInputFormat.getSplits() method to 
> split the file. The first step in getSplits is to retrieve the file's length. 
> If the file length is not zero, the next step is to get the block locations 
> array for that file. However, if the two upstream programs rapidly create and 
> write to the same file (i.e., the file is overwritten or appended to almost 
> simultaneously), a race condition may occur:
> The file's length is already non-zero,
> but calling getFileBlockLocations() returns an empty array because the file 
> is being overwritten or is not yet fully written.
> When this happens, subsequent logic in getSplits (such as accessing the last 
> element of the block locations array) will throw an 
> ArrayIndexOutOfBoundsException because the block locations array is 
> unexpectedly empty.
> {*}Summary{*}:
> This issue can occur when multiple upstream writers operate on the same HDFS 
> file nearly simultaneously. As a result, Spark jobs may intermittently fail 
> due to an unhandled empty block locations array in 
> FileInputFormat.getSplits() when processing files that are in the process of 
> being overwritten or not yet fully written.
>  
> *Exception Stacktrace:*
> {code:java}
> [ERROR] 2025-08-06 15:22:02 org.apache.spark.deploy.yarn.ApplicationMaster:91 
> - User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 0 
> java.lang.ArrayIndexOutOfBoundsException: 0 at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:449)
>  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130) 
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at 
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at 
> scala.Option.getOrElse(Option.scala:121) at 
> org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at 
> org.apache.spark.streaming.dstream.MyFileInputDStream.org$apache$spark$streaming$dstream$MyFileInputDStream$$filesToRDD(MyFileInputDStream.scala:350)
>  at 
> org.apache.spark.streaming.dstream.MyFileInputDStream.compute(MyFileInputDStream.scala:155)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
>  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
>  at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
>  at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
>  at scala.Option.orElse(Option.scala:289) at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
>  at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) 
> at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
>  at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [WARN] 
> 2025-08-06 15:22:32 org.apache.hadoop.util.ShutdownHookManager:128 - 
> ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException 
> java.util.concurrent.TimeoutException at 
> java.util.concurrent.FutureTask.get(FutureTask.java:205) at 
> org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
>  at 
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95) 
> [ERROR] 2025-08-06 15:22:32 org.apache.spark.util.Utils:91 - Uncaught 
> exception in thread shutdown-hook-0 java.lang.InterruptedException at 
> java.lang.Object.wait(Native Method) at 
> java.lang.Thread.join(Thread.java:1252) at 
> java.lang.Thread.join(Thread.java:1326) at 
> org.apache.spark.streaming.util.RecurringTimer.stop(RecurringTimer.scala:86) 
> at 
> org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:137)
>  at 
> org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681)
>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) at 
> org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680) 
> at 
> org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714)
>  at 
> org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599)
>  at 
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
>  at scala.util.Try$.apply(Try.scala:192) at 
> org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
>  at 
> org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748){code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to