[
https://issues.apache.org/jira/browse/MAPREDUCE-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18046458#comment-18046458
]
ASF GitHub Bot commented on MAPREDUCE-7508:
-------------------------------------------
github-actions[bot] commented on PR #7859:
URL: https://github.com/apache/hadoop/pull/7859#issuecomment-3672853383
We're closing this stale PR because it has been open for 100 days with no
activity. This isn't a judgement on the merit of the PR in any way. It's just a
way of keeping the PR queue manageable.
If you feel like this was a mistake, or you would like to continue working
on it, please feel free to re-open it and ask for a committer to remove the
stale tag and review again.
Thanks all for your contribution.
> FileInputFormat can throw ArrayIndexOutofBoundsException because of some
> concurrent execution.
> ----------------------------------------------------------------------------------------------
>
> Key: MAPREDUCE-7508
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-7508
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: mapreduce-client
> Reporter: liang yu
> Priority: Major
> Labels: pull-request-available
>
> Using Spark Streaming (version 2.4.0), Hadoop Mapreduce-client (version 2.6.5)
> {*}Scenario{*}:
> I am using Spark Streaming to process files stored in HDFS. In my setup, the
> upstream system sometimes starts two identical tasks that attempt to create
> and write to the same HDFS file simultaneously. This can lead to conflicts
> where a file is created and written to twice in quick succession.
> {*}Problem{*}:
> When Spark scans for files, it uses the FileInputFormat.getSplits() method to
> split the file. The first step in getSplits is to retrieve the file's length.
> If the file length is not zero, the next step is to get the block locations
> array for that file. However, if the two upstream programs rapidly create and
> write to the same file (i.e., the file is overwritten or appended to almost
> simultaneously), a race condition may occur:
> The file's length is already non-zero,
> but calling getFileBlockLocations() returns an empty array because the file
> is being overwritten or is not yet fully written.
> When this happens, subsequent logic in getSplits (such as accessing the last
> element of the block locations array) will throw an
> ArrayIndexOutOfBoundsException because the block locations array is
> unexpectedly empty.
> {*}Summary{*}:
> This issue can occur when multiple upstream writers operate on the same HDFS
> file nearly simultaneously. As a result, Spark jobs may intermittently fail
> due to an unhandled empty block locations array in
> FileInputFormat.getSplits() when processing files that are in the process of
> being overwritten or not yet fully written.
>
> *Exception Stacktrace:*
> {code:java}
> [ERROR] 2025-08-06 15:22:02 org.apache.spark.deploy.yarn.ApplicationMaster:91
> - User class threw exception: java.lang.ArrayIndexOutOfBoundsException: 0
> java.lang.ArrayIndexOutOfBoundsException: 0 at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:449)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at
> scala.Option.getOrElse(Option.scala:121) at
> org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at
> org.apache.spark.streaming.dstream.MyFileInputDStream.org$apache$spark$streaming$dstream$MyFileInputDStream$$filesToRDD(MyFileInputDStream.scala:350)
> at
> org.apache.spark.streaming.dstream.MyFileInputDStream.compute(MyFileInputDStream.scala:155)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289) at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289) at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
> org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:36)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
> at scala.Option.orElse(Option.scala:289) at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:192) at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) [WARN]
> 2025-08-06 15:22:32 org.apache.hadoop.util.ShutdownHookManager:128 -
> ShutdownHook '$anon$2' timeout, java.util.concurrent.TimeoutException
> java.util.concurrent.TimeoutException at
> java.util.concurrent.FutureTask.get(FutureTask.java:205) at
> org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
> at
> org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95)
> [ERROR] 2025-08-06 15:22:32 org.apache.spark.util.Utils:91 - Uncaught
> exception in thread shutdown-hook-0 java.lang.InterruptedException at
> java.lang.Object.wait(Native Method) at
> java.lang.Thread.join(Thread.java:1252) at
> java.lang.Thread.join(Thread.java:1326) at
> org.apache.spark.streaming.util.RecurringTimer.stop(RecurringTimer.scala:86)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.stop(JobGenerator.scala:137)
> at
> org.apache.spark.streaming.scheduler.JobScheduler.stop(JobScheduler.scala:123)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$stop$1.apply$mcV$sp(StreamingContext.scala:681)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340) at
> org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:680)
> at
> org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:714)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:599)
> at
> org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216) at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
> at scala.util.Try$.apply(Try.scala:192) at
> org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
> at
> org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]