[ https://issues.apache.org/jira/browse/SPARK-27614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-27614: --------------------------------- Description: Most of the Tasks have been completed, and individual Tasks have a particularly long Duration and are not being processed at all The corresponding Executor has a connection timeout, and the stack information shows hang in the method of ShuffleBlockFetcherIterator.next. The corresponding code is as follows: {code} while (!isZombie && result == null) { val startFetchWait = System.nanoTime() result = results.take() val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait) shuffleMetrics.incFetchWaitTime(fetchWaitTime) {code} LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is as follows: {code} currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", true)) results.take() else { logInfo("set spark.shuffle.fetch.timeout.enable=true.") val GB = 1L << 30 val MB = 1L << 20 val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS) else if(bytesInFlight >= GB) (1, TimeUnit.HOURS) else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES) else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES) else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES) else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES) else (10, TimeUnit.MINUTES) val r = results.poll(waitTime, unit) if(r == null) { val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block, give up!" logError(cost) throw new SparkException(cost) {code} was: Most of the Tasks have been completed, and individual Tasks have a particularly long Duration and are not being processed at all The corresponding Executor has a connection timeout, and the stack information shows hang in the method of ShuffleBlockFetcherIterator.next. The corresponding code is as follows: while (!isZombie && result == null) { val startFetchWait = System.nanoTime() result = results.take() val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait) shuffleMetrics.incFetchWaitTime(fetchWaitTime) LinkedBlockingQueue's take method is blocked. We can use poll instead. The modified code is as follows: currentResult = if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", true)) results.take() else { logInfo("set spark.shuffle.fetch.timeout.enable=true.") val GB = 1L << 30 val MB = 1L << 20 val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS) else if(bytesInFlight >= GB) (1, TimeUnit.HOURS) else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES) else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES) else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES) else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES) else (10, TimeUnit.MINUTES) val r = results.poll(waitTime, unit) if(r == null) { val cost = "cost " + waitTime + unit.toString + " to wait for a shuffle block, give up!" logError(cost) throw new SparkException(cost) > Executor shuffle fetch hang > --------------------------- > > Key: SPARK-27614 > URL: https://issues.apache.org/jira/browse/SPARK-27614 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.1, 2.4.0 > Reporter: weDataSphere > Priority: Major > > Most of the Tasks have been completed, and individual Tasks have a > particularly long Duration and are not being processed at all > > The corresponding Executor has a connection timeout, and the stack > information shows hang in the method of ShuffleBlockFetcherIterator.next. > > The corresponding code is as follows: > {code} > while (!isZombie && result == null) { > val startFetchWait = System.nanoTime() > result = results.take() > val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - > startFetchWait) > shuffleMetrics.incFetchWaitTime(fetchWaitTime) > {code} > > > LinkedBlockingQueue's take method is blocked. We can use poll instead. The > modified code is as follows: > {code} > currentResult = > if(!this.blockManager.conf.getBoolean("spark.shuffle.fetch.timeout.enable", > true)) results.take() > else { > logInfo("set spark.shuffle.fetch.timeout.enable=true.") > val GB = 1L << 30 > val MB = 1L << 20 > val (waitTime, unit) = if(bytesInFlight >= 2 * GB) (2, TimeUnit.HOURS) > else if(bytesInFlight >= GB) (1, TimeUnit.HOURS) > else if(bytesInFlight >= 512*MB) (45, TimeUnit.MINUTES) > else if(bytesInFlight >= 200*MB) (30, TimeUnit.MINUTES) > else if(bytesInFlight >= 100*MB) (20, TimeUnit.MINUTES) > else if(bytesInFlight >= 10*MB) (15, TimeUnit.MINUTES) > else (10, TimeUnit.MINUTES) > val r = results.poll(waitTime, unit) > if(r == null) { > val cost = "cost " + waitTime + unit.toString + " to wait for a > shuffle block, give up!" > logError(cost) > throw new SparkException(cost) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org