[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 Hi @tgravescs , I tried it, but found it's difficult to produce KILLED_BY_RESOURCEMANAGER exit status. I followed [YARN-73](https://issues.apache.org/jira/browse/YARN-73) [YARN-495](https://issues.apache.org/jira/browse/YARN-495), but things didn't go as I expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 > it would be interesting to test it further to see if it does. @tgravescs Yeah, I have the same thought. I'd like to try it, but I can not guarantee that I can achieve it... Because I have never done this kind of test before. I'll try my best. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/23223#discussion_r239341126 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -612,11 +612,14 @@ private[yarn] class YarnAllocator( val message = "Container killed by YARN for exceeding physical memory limits. " + s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}." (true, message) + case exit_status if NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(exit_status) => --- End diff -- Updated. But I'm not sure about: > That way values like ContainerExitStatus.SUCCESS from the set would be really used. this part. @attilapiros --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/23223#discussion_r239316608 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala --- @@ -417,4 +426,59 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } + + test("SPARK-26296: YarnAllocator should have same blacklist behaviour with YARN") { +val rmClientSpy = spy(rmClient) +val maxExecutors = 11 + +val handler = createAllocator( + maxExecutors, + rmClientSpy, + Map( +"spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true", +"spark.blacklist.application.maxFailedExecutorsPerNode" -> "0")) +handler.updateResourceRequests() + +val hosts = (0 until maxExecutors).map(i => s"host$i") +val ids = (0 to maxExecutors).map(i => ContainerId.newContainerId(appAttemptId, i)) +val containers = createContainers(hosts, ids) +handler.handleAllocatedContainers(containers.slice(0, 9)) +val cs0 = ContainerStatus.newInstance(containers(0).getId, ContainerState.COMPLETE, + "success", ContainerExitStatus.SUCCESS) +val cs1 = ContainerStatus.newInstance(containers(1).getId, ContainerState.COMPLETE, + "preempted", ContainerExitStatus.PREEMPTED) +val cs2 = ContainerStatus.newInstance(containers(2).getId, ContainerState.COMPLETE, + "killed_exceeded_vmem", ContainerExitStatus.KILLED_EXCEEDED_VMEM) +val cs3 = ContainerStatus.newInstance(containers(3).getId, ContainerState.COMPLETE, + "killed_exceeded_pmem", ContainerExitStatus.KILLED_EXCEEDED_PMEM) +val cs4 = ContainerStatus.newInstance(containers(4).getId, ContainerState.COMPLETE, + "killed_by_resourcemanager", ContainerExitStatus.KILLED_BY_RESOURCEMANAGER) +val cs5 = ContainerStatus.newInstance(containers(5).getId, ContainerState.COMPLETE, + "killed_by_appmaster", ContainerExitStatus.KILLED_BY_APPMASTER) +val cs6 = ContainerStatus.newInstance(containers(6).getId, ContainerState.COMPLETE, + "killed_after_app_completion", ContainerExitStatus.KILLED_AFTER_APP_COMPLETION) +val cs7 = ContainerStatus.newInstance(containers(7).getId, ContainerState.COMPLETE, + "aborted", ContainerExitStatus.ABORTED) +val cs8 = ContainerStatus.newInstance(containers(8).getId, ContainerState.COMPLETE, + "disk_failed", ContainerExitStatus.DISKS_FAILED) --- End diff -- Nice suggestion! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23223: [SPARK-26269][YARN]Yarnallocator should have same...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/23223#discussion_r239316424 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala --- @@ -114,13 +116,20 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock) } - def createContainer(host: String, resource: Resource = containerResource): Container = { -val containerId = ContainerId.newContainerId(appAttemptId, containerNum) + def createContainer( + host: String, + containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum), --- End diff -- Good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 > it looks like its only going to blacklist the node for the AM, not other nodes for general containers. @squito Yarn have blacklist for AM when config `am-scheduling.node-blacklisting-enabled`=true, and have `ContainerFailureTracker` for general containers(haven't find a config for it). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 > Are you seeing actual issues with this blacklisting when it shouldn't? Unfortunately, no. @tgravescs @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23223: [SPARK-26269][YARN]Yarnallocator should have same blackl...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 > I mean if node blacklisting in Spark would be perfectly aligned to YARN then it would be just redundant to have it in Spark in the first place. This change seems result in *perfectly* aligned to YARN for node blacklisting in Spark, but my original thought is that some exit status (e.g. KILLED_BY_RESOURCEMANAGER ), currently, should not lead to a node blacklisting. So, actually, *perfectly* aligned to YARN is not the real target of this change, and we can also make some custom strategy for Spark. > Take for example disk failure. For spark task level backlisting, is it should be delegated to **schedulerBlacklist** in YarnAllocatorBlacklistTracker ? And it seems ContainerExitStatus.DISKS_FAILED in YARN is not same with Spark tasks' disk failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23223: Yarnallocator should have same blacklist behaviour with ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/23223 ping @attilapiros @vanzin @jerryshao for kindly review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23223: Yarnallocator should have same blacklist behaviou...
GitHub user Ngone51 opened a pull request: https://github.com/apache/spark/pull/23223 Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource ## What changes were proposed in this pull request? As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource, this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN. ## How was this patch tested? Added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ngone51/spark dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23223 commit 9f88e1c22876e4cdb1a0a6e952930e76f3206e96 Author: wuyi Date: 2018-12-04T16:17:35Z YarnAllocator should have same blacklist behaviour with YARN commit 65a70dcbb7993731104deab2592a5b969a31414e Author: Ngone51 Date: 2018-12-05T06:11:06Z fix ut --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20269: [SPARK-23029] [DOCS] Specifying default units of ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20269#discussion_r238135789 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -38,10 +38,13 @@ package object config { ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false) private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") +.doc("Amount of memory to use for the driver process, in MiB unless otherwise specified.") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead") +.doc("The amount of off-heap memory to be allocated per driver in cluster mode, " + --- End diff -- Hi, @ferdonline , can you explain why this is **off-heap** memory ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/22288 (I'm on a outside trip these days, so I have to use my mobile phone to type these words. Sorry for the format.) > Is this the same as the current pr, but just killing only if idle? Yes, simillar. This avoids a TaskSet to wait to be scheduled indefinitely. So, in case 2, if we do not find a idle executor before timeout, the TaskSet would abort, rather than hang. > But in case 2, you'd probably end up requesting a whole bunch more executors very briefly, until there are enough failures on one specific task. or maybe we can ensure that even if there are a huge number of unschedulable tasks, we only ever request one extra executor? I'm not sure I have understand this part totally. But I realized a fact that, by now, our DA' strategy is basically based on tasks' status, e.g. pending, specatulative. However, a executor whether to be blacklisted depends on a success TaskSet' status (IIRC). So this fact may introduce level mismatch when we want to introduce DA in TaskScheduleImpl. (hope I understood your main thought) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new execut...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/22288 As I mentioned at https://github.com/apache/spark/pull/22288#discussion_r216874530, I'm quite worry about this killing behaviour. I thik we should kill a executor iff it is idle. By looking through dissuction above, give my thoughts below: * with dynamic allocation Maybe, we can add `onTaskCompletelyBlacklisted()` method in DA manager's `Listener` and pass a e.g. `TaskCompletelyBlacklistedEvent` to it. Thus, DA manger will allocate new executor for us. * with static allocation Set `spark.scheduler.unschedulableTaskSetTimeout` for a `TaskSet`. If a task blacklisted completely, kill some executors iff they're idle (Maybe, taking executors' allocation time into acount here, we should increase timeout upperbound for a little for this `TaskSet`.). Then, waiting until to be scheduled or timeout&abort. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][SPARK-15815][Scheduler] Acquire new...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r216874530 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,48 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + + // If the taskSet is unschedulable we kill an existing blacklisted executor/s and + // kick off an abortTimer which after waiting will abort the taskSet if we were + // unable to schedule any task from the taskSet. + // Note: We keep a track of schedulability on a per taskSet basis rather than on a + // per task basis. + val executor = hostToExecutors.valuesIterator.next().iterator.next() --- End diff -- I'm wondering is it worth to kill someone executor which has some tasks running on it ? After all, a task blaklisted on all executors(currently allocated) can not be guaranteed to run on a new allocated executor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18492: [SPARK-19326] Speculated task attempts do not get...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/18492#discussion_r216597619 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager( // If our target has not changed, do not send a message // to the cluster manager and reset our exponential growth if (delta == 0) { - numExecutorsToAdd = 1 - return 0 + // Check if there is any speculative jobs pending + if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) { +numExecutorsTarget = + math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors) --- End diff -- Also confused by `+1` here. And I think we have already task `pendingSpeculativeTasks` into account @advancedxy : ``` def totalPendingTasks(): Int = { pendingTasks + pendingSpeculativeTasks } ``` Seems this check is redundant. And it doesn't sync to CM if `numExecutorsTarget ` change(after `+1`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r214720097 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) + }) + ) + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + abortTimer.schedule(new TimerTask() { +override def run() { + if (unschedulableTaskSetToExpiryTime.contains(taskSet) && +(unschedulableTaskSetToExpiryTime(taskSet) + + UNSCHEDULABLE_TASKSET_TIMEOUT_MS) + <= clock.getTimeMillis() + ) { +logInfo("Cannot schedule any task because of complete blacklisting. " + + "Wait time for scheduling expired. Aborting the application.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex.get) + } else { +this.cancel() + } +} + }, UNSCHEDULABLE_TASKSET_TIMEOUT_MS) +} + } else { +// TODO: try acquiring new executors for static allocation before aborting. --- End diff -- How ? Waiting for other tasks finish and release resources ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22288: [SPARK-22148][Scheduler] Acquire new executors to...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22288#discussion_r214719743 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -414,9 +425,54 @@ private[spark] class TaskSchedulerImpl( launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } + if (!launchedAnyTask) { - taskSet.abortIfCompletelyBlacklisted(hostToExecutors) -} + taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors) match { +case taskIndex: Some[Int] => // Returns the taskIndex which was unschedulable + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { +// If the taskSet is unschedulable we kill the existing blacklisted executor/s and +// kick off an abortTimer which after waiting will abort the taskSet if we were +// unable to get new executors and couldn't schedule a task from the taskSet. +// Note: We keep a track of schedulability on a per taskSet basis rather than on a +// per task basis. +if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + hostToExecutors.valuesIterator.foreach(executors => executors.foreach({ +executor => + logDebug("Killing executor because of task unschedulability: " + executor) + blacklistTrackerOpt.foreach(blt => blt.killBlacklistedExecutor(executor)) --- End diff -- Seriously? You killed all executors ? What if other taskSets' tasks are running on them ? BTW, if you want to refresh executors, you have to enable `spark.blacklist.killBlacklistedExecutors` also. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22202: [SPARK-25211][Core] speculation and fetch failed result ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/22202 Since `stage 1` is only a `ShuffleMapStage`, so, why there're no other child stages to be submitted ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212167438 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- > The numRecordsWritten in DiskBlockObjectWriter is still correct during the process after this PR The number is correct, but it is not consistent with what real happen compare to current behaviour. But as you said, we will get correct result at the end. So, it may not be a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212163785 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Yeah, I agree there' s no difference as for final result. But `writeMetrics` in `DiskBlockObjectWriter` would be incorrect during the process. Not only `numRecordsWritten`, but also `_bytesWritten`(this could only be correctly counted when `writer.write()` is called. You can see `recordWritten#updateBytesWritten` for detail). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r212160161 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Oh, I see. If so, I'm afraid you may have to change ` writer.recordWritten()`'s behaviour, which just count records one bye one right now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22163: [SPARK-25166][CORE]Reduce the number of write ope...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22163#discussion_r211954019 --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java --- @@ -206,14 +211,21 @@ private void writeSortedFile(boolean isLastFile) { long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); -Platform.copyMemory( - recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); -writer.write(writeBuffer, 0, toTransfer); +if (bufferOffset > 0 && bufferOffset + toTransfer > DISK_WRITE_BUFFER_SIZE) { --- End diff -- Not a bad idea, but codes here may not work as you expect. If we got a record with size `X` < `diskWriteBufferSize `(same as `DISK_WRITE_BUFFER_SIZE `), then we will only call `writer.write()` once. And if we got a record with size `Y` >= `diskWriteBufferSize `, then we will call `writer.write()` for (`Y` + `diskWriteBufferSize ` - 1) / `diskWriteBufferSize` times. And this do not change with the new code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209662081 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -402,6 +422,19 @@ class DAGScheduler( } } + /** + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail current check + * if trying to submit a barrier stage that requires more slots than current total number. If + * the check fails consecutively for three times for a job, then fail current job submission. --- End diff -- Seems I do not find the code about `"consecutively for three times"`, but only `maxFailureNumTasksCheck ` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r209658945 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- @kiszk IIUC, there's exactly only one thread in `eventLoop`, so, the scenario mentioned above will not happen. And I even feel it is no need to use `ConcurrentHashMap` for `jobIdToNumTasksCheckFailures` at all. @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21565: [SPARK-24558][Core]wrong Idle Timeout value is us...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21565#discussion_r200019612 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -488,9 +488,16 @@ private[spark] class ExecutorAllocationManager( newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { executorsRemoved.foreach { removedExecutorId => +// If it is cachedBlcok timeout is configured using +// spark.dynamicAllocation.cachedExecutorIdleTimeout +val idleTimeout = if (blockManagerMaster.hasCachedBlocks(removedExecutorId)) { --- End diff -- We do not maintain another HashMap, but alter its original structure. In this way, we do not need to issue extra rpc calls to `BlockManagerMaster` here. As you mentioned 'API', this thing happens after a rpc call happened. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21565: [SPARK-24558][Core]wrong Idle Timeout value is us...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21565#discussion_r195927590 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -488,9 +488,16 @@ private[spark] class ExecutorAllocationManager( newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { executorsRemoved.foreach { removedExecutorId => +// If it is cachedBlcok timeout is configured using +// spark.dynamicAllocation.cachedExecutorIdleTimeout +val idleTimeout = if (blockManagerMaster.hasCachedBlocks(removedExecutorId)) { --- End diff -- How about changing `removeTimes` to `HashMap[String, (Long, Boolean)]` (and the `Boolean` field indicates whether it is for cachedExecutor idle timeout or not) ? Thus, we do not need to ask `blockManagerMaster` again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r194606075 --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala --- @@ -197,14 +197,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) // Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) --- End diff -- To be more specific, `killAndReplaceExecutor#killExecutors` will block until we get response from cluster manager or overtime after 120s (by default RPC timeout config). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate pa...
Github user Ngone51 closed the pull request at: https://github.com/apache/spark/pull/21096 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20996: [SPARK-23884][CORE] hasLaunchedTask should be tru...
Github user Ngone51 closed the pull request at: https://github.com/apache/spark/pull/20996 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21494: [WIP][SPARK-24375][Prototype] Support barrier scheduling
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21494 Hi, @jiangxb1987 , can you explain more for what is `barrier scheduling` in spark and elaborate an example which would only works with `barrier scheduling`( but could not work under current spark schedule mechanism) for better understanding ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192592142 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,55 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired host should not be offered again") { +scheduler = spy(new TaskSchedulerImpl(sc)) +scheduler.setDAGScheduler(sc.dagScheduler) +when(sc.taskScheduler).thenReturn(scheduler) +doReturn(true).when(scheduler).executorHeartbeatReceived(any(), any(), any()) + +// Set up a fake backend and cluster manager to simulate killing executors +val rpcEnv = sc.env.rpcEnv +val fakeClusterManager = new FakeClusterManager(rpcEnv) +val fakeClusterManagerRef = rpcEnv.setupEndpoint("fake-cm", fakeClusterManager) +val fakeSchedulerBackend = new FakeSchedulerBackend(scheduler, rpcEnv, fakeClusterManagerRef) +when(sc.schedulerBackend).thenReturn(fakeSchedulerBackend) + +fakeSchedulerBackend.start() +val dummyExecutorEndpoint1 = new FakeExecutorEndpoint(rpcEnv) +val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) +fakeSchedulerBackend.driverEndpoint.askSync[Boolean]( + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 2, Map.empty)) +heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet) +addExecutorAndVerify(executorId1) +triggerHeartbeat(executorId1, executorShouldReregister = false) + +scheduler.initialize(fakeSchedulerBackend) +sc.requestTotalExecutors(0, 0, Map.empty) --- End diff -- why request 0 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192592170 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,55 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired host should not be offered again") { --- End diff -- Also, better to attach JIRA number. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21486: [SPARK-24387][Core] Heartbeat-timeout executor is...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21486#discussion_r192591845 --- Diff: core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala --- @@ -207,6 +210,55 @@ class HeartbeatReceiverSuite assert(fakeClusterManager.getExecutorIdsToKill === Set(executorId1, executorId2)) } + test("expired host should not be offered again") { --- End diff -- `host` or `executor` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191623277 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimi
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191178697 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimi
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191182696 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimi
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191176828 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { --- End diff -- nit: generateChunk**ed**ByteBuffer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175242 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes + + protected def deallocate: Unit = {} + + override def count(): Long = chunkedByteBuffer.size + + // this is the "start position" of the overall Data in the backing file, not our current position + override def position(): Long = 0 + + override def transferred(): Long = _transferred + + override def transfered(): Long = _transferred + + override def touch(): ChunkedByteBufferFileRegion = this + + override def touch(hint: Object): ChunkedByteBufferFileRegion = this + + override def retain(): FileRegion = { +super.retain() +this + } + + override def retain(increment: Int): FileRegion = { +super.retain(increment) +this + } + + private var currentChunkIdx = 0 + + def transferTo(target: WritableByteChannel, position: Long): Long = { +assert(position == _transferred) +if (position == size) return 0L +var keepGoing = true +var written = 0L +var currentChunk = chunks(currentChunkIdx) +var originalLimit = currentChunk.limit() --- End diff -- Seems it is unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175890 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimi
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191117686 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes + + protected def deallocate: Unit = {} + + override def count(): Long = chunkedByteBuffer.size --- End diff -- What's the difference between `size` and `count`? Should `count` indicates the rest data's size can be transfered ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191175960 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimi
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191104760 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferFileRegion.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util.io + +import java.nio.channels.WritableByteChannel + +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.internal.Logging + + +/** + * This exposes a ChunkedByteBuffer as a netty FileRegion, just to allow sending > 2gb in one netty + * message. This is because netty cannot send a ByteBuf > 2g, but it can send a large FileRegion, + * even though the data is not backed by a file. + */ +private[io] class ChunkedByteBufferFileRegion( +val chunkedByteBuffer: ChunkedByteBuffer, +val ioChunkSize: Int) extends AbstractReferenceCounted with FileRegion with Logging { + + private var _transferred: Long = 0 + // this duplicates the original chunks, so we're free to modify the position, limit, etc. + private val chunks = chunkedByteBuffer.getChunks() + private val cumLength = chunks.scanLeft(0L) { _ + _.remaining()} + private val size = cumLength.last + // Chunk size in bytes --- End diff -- Should this comment be moved above last line ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189939603 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- `destructiveIterator` should just return a destructive iterator (especially for map buffer) as it's function name implies, and it it none business of `CompletionIterator `. And developers should be free to define the complete function for the returned destructive iterator, in case of we want a different one somewhere else in future. > Your suggested codes does exactly the same but is less streamlined I don't think this little change will pay a huge influence on `streamlined `. > and relies on an intermediate value (fortunately it's already a member variable) The current fix leads to this, not me. And even this variable is not a member variable, we can define a temp local variable. It's not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189894423 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { --- End diff -- I understand what this test want to do. But it seems code without this PR could also pass it if everything goes normally. And I know it's a little hard to reflect the change by unit test. Also, I'd prefer to leave some comments to explain the potential memory leak in source code above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189892444 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -267,7 +273,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = { readingIterator = new SpillableIterator(inMemoryIterator) -readingIterator +readingIterator.toCompletionIterator --- End diff -- This change the original behavior of `destructiveIterator `. I'd prefer do like this: ``` CompletionIterator[(K, C), Iterator[(K, C)]]( destructiveIterator(currentMap.iterator), readingIterator.destroy) ``` which keep compatibility with current code, and do not introduce unnecessary function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189892547 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty --- End diff -- Why `empy`, not `null`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21369 cc @JerryLead --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r189438190 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- Does the change means we should reassign `upstream` (which eliminates reference to `currentMap`) after spill **immediately**, otherwise, we may hit OOM (e.g. never `readNext()` after spill - is this the real cause for JIRA issue?) ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187825561 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param reportHeartbeat the heartbeat reporting function to call. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") --- End diff -- "pass in the name to the constructor" is better(if we do need to do this for the driver) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187824094 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1753,9 +1766,21 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() +heartbeater.stop() + } + + /** Reports heartbeat metrics for the driver. */ + private def reportHeartBeat(): Unit = { --- End diff -- > With cluster mode, including YARN, there isn't a local executor, so the metrics for the driver would not be collected. Yes. But the problem is can we use `executor`'s `getCurrentExecutorMetrics()` method for collecting memory metrics for `driver` ? IIRC, `driver` do not acqurie memory from execution memory pool at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187823298 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +// log the peak executor metrics for the stage, for each executor +val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() +val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) +executorMap.foreach { + executorEntry => { +for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- I revisited the code, I think you're right. My mistake, sorry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187248156 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +94,10 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of live stages, to peak executor metrics for the stage + private val liveStageExecutorMetrics = mutable.HashMap[(Int, Int), --- End diff -- Why we should track executor's memory metrics for each stage? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187236701 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1753,9 +1766,21 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() +heartbeater.stop() + } + + /** Reports heartbeat metrics for the driver. */ + private def reportHeartBeat(): Unit = { --- End diff -- Why we need this for `driver` ? If spark run in local mode, there's a local `executor`, which will report heartbeat. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187239219 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +// log the peak executor metrics for the stage, for each executor +val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() +val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) +executorMap.foreach { + executorEntry => { +for ((executorId, peakExecutorMetrics) <- executorEntry) { --- End diff -- How about `case (executorId, peakExecutorMetrics) =>` ? It would be more readable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187247534 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { --- End diff -- Do we really need this class? It seems `ExecutorMetrics` can already do the same work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187244792 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +179,27 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +// log the peak executor metrics for the stage, for each executor +val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() +val executorMap = liveStageExecutorMetrics.remove( --- End diff -- Do we always post a `SparkListenerStageCompleted` event for failed satges (I can't rememer clearly)? If not, I think we should clean up other attempts of the same stage here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r187238940 --- Diff: core/src/main/scala/org/apache/spark/Heartbeater.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.concurrent.TimeUnit + +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Creates a heartbeat thread which will call the specified reportHeartbeat function at + * intervals of intervalMs. + * + * @param reportHeartbeat the heartbeat reporting function to call. + * @param intervalMs the interval between heartbeats. + */ +private[spark] class Heartbeater(reportHeartbeat: () => Unit, intervalMs: Long) { + // Executor for the heartbeat task + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") --- End diff -- I'm wondering should the prefix name of heartbeater thread be `"executor-heartbeater"` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21209 Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r186425765 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- @squito any thoughts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate parent Sh...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21096 Thanks for your opinions @squito @markhamstra . Maybe, I should leave it for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21209 ping @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21212: [SPARK-24143] filter empty blocks when convert ma...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21212#discussion_r186261650 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -267,28 +269,30 @@ final class ShuffleBlockFetcherIterator( // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] -// Tracks total number of blocks (including zero sized blocks) -var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { - totalBlocks += blockInfos.size if (address.executorId == blockManager.blockManagerId.executorId) { -// Filter out zero-sized blocks -localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1) +blockInfos.find(_._2 <= 0) match { + case Some((blockId, size)) if size < 0 => +throw new BlockException(blockId, "Negative block size " + size) + case Some((blockId, size)) if size == 0 => +throw new BlockException(blockId, "Zero-sized blocks should be excluded.") --- End diff -- Is it necessary to throw exception here? If so, shall we also throw exception when detect 0-sized **remote** block rather than skip it silently? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBac...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21209 ping @squito @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21209: [SPARK-24141][CORE] Fix bug in CoarseGrainedSched...
GitHub user Ngone51 opened a pull request: https://github.com/apache/spark/pull/21209 [SPARK-24141][CORE] Fix bug in CoarseGrainedSchedulerBackend.killExecutors ## What changes were proposed in this pull request? In method *CoarseGrainedSchedulerBackend.killExecutors()*, `numPendingExecutors` should add `executorsToKill.size` rather than `knownExecutors.size` if we do not adjust target number of executors. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ngone51/spark SPARK-24141 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21209 commit 264f316c178ff32ea632cc3db7e20ab68d555b85 Author: wuyi Date: 2018-05-02T01:50:01Z fix a bug in killExecutors --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r185159109 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- Hi @squito , thanks for your reply. > but only *when* pending tasks increase. `ExecutorAllocationManager ` will check pending (or backlog) tasks periodically. So, we do not have to wait for *increment* actually. And for `Dynamic Allocation` & `User` case, yeah, that's hard to define. And I checked `SchedulerBackendUtils.getInitialTargetExecutorNumbe`, it set `DEFAULT_NUMBER_EXECUTORS` = 2. But, this is not consistent with `Master`, which set `executorLimit` to `Int.MaxValue` if we are not under dynamic allocation mode. Maybe we can just init `requestedTotalExecutors ` with `Int.MaxValue`(only when we are not under dynamic allocation mode). Or, we do not call `doRequestTotalExecutors` if we call `requestExecutors` or `killExecutors`, except `requestTotalExecutors`(only when we are not under dynamic allocation mode). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20604 ping @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21175 cc @kiszk @maropu @cloud-fan @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r184607965 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala --- @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") { +val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024))) +chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) +assert(chunkedByteBuffer.size === (80L * 1024L * 1024L)) --- End diff -- `ByteArrayWritableChannel `'s size, not `chunkedByteBuffer`'s size. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r184597197 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala --- @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { +val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) --- End diff -- nit: space beside `*`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r184596199 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala --- @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") { +val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024))) +chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)) +assert(chunkedByteBuffer.getChunks().head.position() === 0) --- End diff -- This assert is unnecessary for this PR change. Please replace it with assert channel's length here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21175#discussion_r184590989 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala --- @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite { assert(chunkedByteBuffer.getChunks().head.position() === 0) } + test("writeFully() does not affect original buffer's position") { --- End diff -- Hi @manbuyun .You should add a new unit test to support your own change. For example, "writeFully() can write buffer which is larger than `bufferWriteChunkSize` correctly. " And update the test code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21131 LGTM, and nice UT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21175 @manbuyun you need to add the unit test into `ChunkedByteBufferSuite.scala` and push a new commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20930 No wonder I can't understand the issue for a long time since I've thought it happened on Spark2.3 . And now it makes sense. Thanks @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20604 ping @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183797532 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- The explanation is quite clear and I get understand now. Thank you very mush! @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183790463 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition th
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183789814 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- > because they won't be able to get their shuffle input, same as the original fetch failure why? In `DAGScheduler`, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task) may still get MapOutputs from `MapOutputTrackerMaster`, and fetch data from other `Executor`s. So, they can success normally. Do I miss something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183701269 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => --- End diff -- Yet, launched tasks has nothing to do with other running tasks in other `TaskSet`s. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example, launching FetchFailed task or tasks who do not have a running copy across `TaskSet`s firstly ? (But, it seems we will always have running copies in other `TaskSet`s for our final `TaskSet`, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous `TaskSet`s.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183690133 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.initialize(new FakeSchedulerBackend) } } + + test("Completions in zombie tasksets update status of non-zombie taskset") { +val taskScheduler = setupSchedulerWithMockTaskSetBlacklist() +val valueSer = SparkEnv.get.serializer.newInstance() + +def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = { + val indexInTsm = tsm.partitionToIndex(partition) + val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head + val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq()) + tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result) +} + +// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt, +// two times, so we have three active task sets for one stage. (For this to really happen, +// you'd need the previous stage to also get restarted, and then succeed, in between each +// attempt, but that happens outside what we're mocking here.) +val zombieAttempts = (0 until 2).map { stageAttempt => + val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt) + taskScheduler.submitTasks(attempt) + val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get + val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } + taskScheduler.resourceOffers(offers) + assert(tsm.runningTasks === 10) + if (stageAttempt < 2) { +// fail attempt +tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED, + FetchFailed(null, 0, 0, 0, "fetch failed")) +// the attempt is a zombie, but the tasks are still running (this could be true even if +// we actively killed those tasks, as killing is best-effort) +assert(tsm.isZombie) +assert(tsm.runningTasks === 9) + } + tsm +} + +// we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for +// the stage, but this time with insufficient resources so not all tasks are active. + +val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2) +taskScheduler.submitTasks(finalAttempt) +val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get +val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } +val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => + finalAttempt.tasks(task.index).partitionId +}.toSet +assert(finalTsm.runningTasks === 5) +assert(!finalTsm.isZombie) + +// We simulate late completions from our zombie tasksets, corresponding to all the pending +// partitions in our final attempt. This means we're only waiting on the tasks we've already +// launched. +val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions) +finalAttemptPendingPartitions.foreach { partition => + completeTaskSuccessfully(zombieAttempts(0), partition) +} + +// If there is another resource offer, we shouldn't run anything. Though our final attempt +// used to have pending tasks, now those tasks have been completed by zombie attempts. The +// remaining tasks to compute are already active in the non-zombie attempt. +assert( + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) + +val allTaskSets = zombieAttempts ++ Seq(finalTsm) +val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) + +// finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be +// marked as zombie. +// for each of the remaining tasks, find the tasksets with an active copy of the task, and +// finish the task. +remainingTasks.foreach { partition => + val tsm = if (partition == 0) { +// we failed this task on both zombie attempts, this one is only present in the latest +// taskset +finalTsm + } else { +// should be active in every taskset. We choose a zombie taskset just to make sure that +// we transition th
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183619646 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. --- End diff -- IIRC, there's only one active `TaskSetManager` for a given stage, and with some zombie `TaskSetManager`s possibly. Though, there may be some running tasks in zombie `TaskSetManager`s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21131#discussion_r183619704 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl( } } + /** + * Marks the task has completed in all TaskSetManagers for the given stage. + * + * After stage failure and retry, there may be multiple active TaskSetManagers for the stage. + * If an earlier attempt of a stage completes a task, we should ensure that the later attempts + * do not also submit those same tasks. That also means that a task completion from an earlier + * attempt can lead to the entire stage getting marked as successful. + */ + private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { +taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => --- End diff -- Generally, it seems impossible for a unfinished `TaskSet` to get an empty `Map()` in `taskSetsByStageIdAndAttempt` . But, if it does, maybe, we can tell the caller the stage has already finished. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20998: [SPARK-23888][CORE] correct the comment of hasAttemptOnH...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20998 Agree and thank you @squito . And thanks for all of you. @felixcheung @mridulm @jiangxb1987 @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20998: [SPARK-23888][CORE] correct the comment of hasAtt...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20998#discussion_r183408481 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -287,7 +287,7 @@ private[spark] class TaskSetManager( None } - /** Check whether a task is currently running an attempt on a given host */ + /** Check whether a task once run an attempt on a given host */ --- End diff -- Yes. Thank you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20930 > because we can get the MapStatus, but get a 'null'. If I'm not mistaken, this also because the ExecutorLost trigger removeOutputsOnExecutor If there's a `null` MapStatus for stage 2, how can it retry 4 times without any tasks? IIUC, `null` MapStatus leads to missing partition, which means there will be some tasks to submit. As for stage 3's shuffle Id, that's really weird. Hope you can fix it! @xuanyuanking --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20930 Hi, @xuanyuanking , thank for your patient explanation, sincerely. With regard to your latest explanation: > stage 2's shuffleID is 1, but stage 3 failed by missing an output for shuffle '0'! So here the stage 2's skip cause stage 3 got an error shuffleId. However, I don't think stage 2's skip will lead to stage 3 got an error shuffleId, as we've already created all `ShuffleDependencies ` (constructed with certain ids) for `ShuffleMapStages` before any stages of a job submitted. As I struggle for understanding this issue for a while, finally, I got my own inference: (assume the 2 ShuffleMapTasks below is belong to stage 2, and stage 2 has two partitions on map side. And stage 2 has a parent stage named stage 1, and a child stage named stage 3.) 1. ShuffleMapTask 0.0 run on ExecutorB, and write map output on ExecutorB, succeed normally. And now, there's only '1' available map output registered on `MapOutputTrackerMaster `. 2. ShuffleMapTask 1.0 is running on ExecutorA, and fetch data from ExecutorA, and write map output on ExecutorA, too. 3. ExecutorA lost for unknown reason after send `StatusUpdate` message to driver, which tells ShuffleMapTask 1.0's success. And all map outputs on ExecutorA lost, include ShuffleMapTask 1.0's map output. 4. And driver launch a speculative ShuffleMapTask 1.1 before it receives the `StatusUpdate` message. And ShuffleMapTask 1.1 get FetchFailed immediately. 5. `DAGScheduler` handle the FetchFailed ShuffleMapTask 1.1 firstly, mark stage 2 and it's parent stage 1 as failed. And stage 1 & stage 2 are waiting for resubmit. 6. `DAGScheduler ` handle the success ShuffleMapTask 1.0 before stage 1 & stage 2 resubmit, which trigger `MapOutputTrackerMaster.registerMapOutput` . And now, there's '2' available map output registered on `MapOutputTrackerMaster ` (but knowing ShuffleMapTask 1.0's map output on ExecutorA has been lost.). 7. stage 1 resubmitted and succeed normally. 8. stage 2 resubmitted. As stage 2 has '2' available map output registered on `MapOutputTrackerMaster `, so there's no missing partitions for stage 2. Thus, stage 2 has no missing tasks to submit, too. 9. And then, we submit stage 3. As stage 2's map output file lost on ExecutorA, so stage 3 must get a FetchFailed at the end. Then, we resubmit stage 2& stage 3. And then we get into a loop until stag 3 abort. But if the issue is what I described above, we should get `FetchFailedException` instead of `MetadataFetchFailedException` shown in screenshot. So, at this point which can not make sense. Please feel free to point my wrong spot out. Anyway, thanks again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21096: [SPARK-24011][CORE][WIP] cache rdd's immediate parent Sh...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21096 ping @jiangxb1987 @squito Would you please have a look at this PR? What's your opinions on the cache strategy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21104: [SPARK-24021][CORE] fix bug in BlacklistTracker's update...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21104 ping @jerryshao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21104: [SPARK-24021][CORE] fix bug in BlacklistTracker's...
GitHub user Ngone51 opened a pull request: https://github.com/apache/spark/pull/21104 [SPARK-24021][CORE] fix bug in BlacklistTracker's updateBlacklistForFetchFailure ## What changes were proposed in this pull request? Thereâs a miswrite in BlacklistTracker's updateBlacklistForFetchFailure: ``` val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]()) blacklistedExecsOnNode += exec ``` where first **exec** should be **host**. ## How was this patch tested? adjust existed test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ngone51/spark SPARK-24021 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21104.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21104 commit d2257213ecd4b0e8ec91bfa52f7caf725c267b16 Author: wuyi Date: 2018-04-19T02:42:51Z fix bug in BlacklistTracker's updateBlacklistForFetchFailure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21096: [SPARK-24011][CORE] cache rdd's immediate parent Shuffle...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/21096 Thank you for your comments @markhamstra . Yeah, I'm considering adding a UT to support this change. And thank for reminding me of DAGScheduler's basic principle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21096: cache rdd's immediate parent ShuffleDependencies ...
GitHub user Ngone51 opened a pull request: https://github.com/apache/spark/pull/21096 cache rdd's immediate parent ShuffleDependencies to accelerate getShuffleDependencies ## What changes were proposed in this pull request? When creating stages for jobs, we need to find a rdd's (except the final rdd) immediate parent ShuffleDependencies by method `getShuffleDependencies()` for at least 2 times (first in `getMissingAncestorShuffleDependencies()`, and second in `getOrCreateParentStages()`). So, we can cache the result at the fist time we call `getShuffleDependencies()`. This is helpful for cutting time consuming when there's many `NarrowDependencies` between the rdd and its immediate parent `ShuffleDependencies` or if the rdd has a number of immediate parent `ShuffleDependencies` . There's an exception for checkpointed rdd. If a rdd is checkpointed, it's immediate parent `ShuffleDependencies` should adjust to empty. ## How was this patch tested? exists. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ngone51/spark SPARK-24011 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21096.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21096 commit 59fb931135b7bc8fc1f516c39015f7412ae25208 Author: wuyi Date: 2018-04-18T10:35:22Z cache rdd's immediate ShuffleDependencies --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20930 Hi, @xuanyuanking , I'm still confused (smile & cry). > Stage 2 retry 4 times triggered by Stage 3's fetch failed event. Actually in this scenario, stage 3 will always failed by fetch fail. Stage 2 has no missing tasks, right? So, there's no missing partitions for Stage 2 (which means Stage 3 can always get Stage 2's MapOutputs from `MapOutputTrackerMaster` ), right? So, why Stage 3 will always failed by FetchFail? Hope you can explain more. Thank you very much! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r182308871 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("[SPARK-23811] FetchFailed comes before Success of same task will cause child stage" + +" never succeed") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), --- End diff -- Maybe, you can `runEvent(SpeculativeTaskSubmitted)` first to simulate a speculative task submitted before you `runEvent(makeCompletetionEvent())`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20998: [SPARK-23888][CORE] speculative task should not run on a...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20998 Will do, and it's okay. My view limited in the source code yet, but you guys have more practical experience. So I learned from your points. It's beneficial. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user Ngone51 commented on the issue: https://github.com/apache/spark/pull/20604 > I'd go even further and suggest that with this fix in, we can actually remove SPARK-21834, as its no longer necessary. Yes, otherwise, this PR's work is meaningless. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r182086337 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: ExecutorAllocationClient => -b.killExecutors(executorIds, replace = false, force = true).nonEmpty +require(executorAllocationManager.isEmpty, --- End diff -- Hi, @squito , I'm quite questioned about the cases: > If you've got just one executor, and then you kill it, should your app sit with 0 executors? if app sit with 0 executors, then pending tasks increase, which lead to `ExecutorAllocationManager` increases target number of executors. So, app will not always sit with 0 executors. > Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? for this case, to be honest, I really do not get your point. But, it must blame my poor English. And, what will happens if we use this method without `ExecutorAllocationManager `? Or do we really need adjust TargetNumExecutors (set `adjustTargetNumExecutors = true` below) if we are not using `ExecutorAllocationManager `? see these several lines in `killExecutors()`: ``` if (adjustTargetNumExecutors) { requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) ... doRequestTotalExecutors(requestedTotalExecutors) } ``` Set `adjustTargetNumExecutors = true` will change `requestedTotalExecutors` . And IIUC, `requestedTotalExecutors ` is only used in dynamic allocation mode. So, if we are not using `ExecutorAllocationManager `, allocation client will request `requestedTotalExecutors = 0` number of executors to cluster manager (this is really terrible). But, actually, app without `ExecutorAllocationManager ` do not have a limit requesting executors (in default). Actually, I think this series methods, including `killAndReplaceExecutor `, `requestExecutors`, etc, are designed with dynamic allocation mode. And if we still want use these methods while app do not use `ExecutorAllocationManager`, we should not change `requestedTotalExecutors `, or even not request cluster manager with a specific number. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21079: [SPARK-23992][CORE] ShuffleDependency does not ne...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21079#discussion_r181938225 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -113,3 +118,24 @@ private[spark] class ShuffleMapTask( override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) } + +object ShuffleMapTask extends Logging { + private val cache = CacheBuilder.newBuilder() --- End diff -- Do we need to clear this `cache` at the end of a app ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181729645 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + +" because task " + index + " has already failed by FetchFailed") + return --- End diff -- We can not simply `return` here. And we should always send a task `CompletionEvent` to DAG, in case of there's any listeners are waiting for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r181732788 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -750,6 +752,10 @@ private[spark] class TaskSetManager( if (tasksSuccessful == numTasks) { isZombie = true } +} else if (fetchFailedTaskIndexSet.contains(index)) { + logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + +" because task " + index + " has already failed by FetchFailed") + return --- End diff -- Maybe, we can mark task as`FAILED` with `UnknownReason` here. And then, DAG will treat this task as no-op, and `registerMapOutput` will not be triggered. Though, it is not a elegant way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20998: [SPARK-23888][CORE] speculative task should not r...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/20998#discussion_r180937626 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -880,8 +880,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) } - test("speculative task should not run on a given host where another attempt " + -"is already running on") { + test("SPARK-23888: speculative task should not run on a given host " + +"where another attempt is already running on") { --- End diff -- Sure. Also, do we need to reword PR and jira title? @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org