[GitHub] spark pull request: SPARK-2380: Support displaying accumulator val...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1309#discussion_r15557746 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -809,12 +810,25 @@ class DAGScheduler( listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) runningStages -= stage } + event.reason match { case Success => logInfo("Completed " + task) if (event.accumUpdates != null) { // TODO: fail the stage if the accumulator update fails... Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted + event.accumUpdates.foreach { case (id, partialValue) => +val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]] +val name = acc.name +// To avoid UI cruft, ignore cases where value wasn't updated +if (partialValue != acc.zero) { + val stringPartialValue = acc.prettyPartialValue(partialValue) + val stringValue = acc.prettyValue(acc.value) + stageToInfos(stage).accumulables(id) = AccumulableInfo(id, acc.name, stringValue) + event.taskInfo.accumulables += +AccumulableInfo(id, name, Some(stringPartialValue), stringValue) +} + } --- End diff -- Can this be moved to a method on the Accumulators companion object or something? These details about AccumulableInfo, prettyPartialValues, etc. aren't things that need to appear in the DAGScheduler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1056#discussion_r15543587 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -38,8 +37,10 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.RDD +import org.apache.spark.storage._ import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} --- End diff -- redundant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1056#discussion_r15542781 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -348,4 +353,48 @@ private[spark] class Executor( } } } + + def stop() { +isStopped = true +threadPool.shutdown() + } + + def startDriverHeartbeater() { +val interval = conf.getInt("spark.executor.heartbeatInterval", 2000) +val timeout = AkkaUtils.lookupTimeout(conf) +val retryAttempts = AkkaUtils.numRetries(conf) +val retryIntervalMs = AkkaUtils.retryWaitMs(conf) +val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem) + +val t = new Thread() { --- End diff -- use Utils.namedThreadFactory or otherwise generate a named thread --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1542#issuecomment-50511022 I dunno, merging a PR with no changed files doesn't sound too scary to me. Something is definitely messed up in this PR, with both `Commits` and `Files changed` showing invalid results. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1056#discussion_r15538656 --- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import akka.actor.Actor +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.scheduler.TaskScheduler + +/** + * A heartbeat from executors to the driver. This is a shared message used by several internal + * components to convey liveness or execution information for in-progress tasks. + */ +private[spark] case class Heartbeat( +executorId: String, +taskMetrics: Array[(Long, TaskMetrics)], +blockManagerId: BlockManagerId) + extends Serializable --- End diff -- case class is inherently Serializable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15499105 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException => +abortStage(stage, "Task not serializable: " + e.toString) +runningStages -= stage --- End diff -- Yeah, I think the intention was to avoid SparkListenerStageCompleted when Listeners had never seen a prior StageSubmitted event. If you want to handle Completed without Submitted in the UI, I don't think those changes will break anything else -- but it may be unexpected behavior for other Listeners. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498156 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{NotSerializableException, PrintWriter, StringWriter} +import java.io.{NotSerializableException} --- End diff -- must be wip :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Build] SPARK-2614: (2nd patch) Create a spark...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1611#discussion_r15442657 --- Diff: assembly/src/deb/control/examples/control --- @@ -0,0 +1,8 @@ +Package: [[deb.pkg.name]]-examples +Version: [[version]]-[[buildNumber]] +Section: misc +Priority: extra +Architecture: all --- End diff -- probably a good idea to add ``Depends: [[deb.pkg.name]] (= [[version]]-[[buildNumber]])`` -- i.e. an explicit dependency on the `spark` package with the same version number. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1360#issuecomment-50281668 ping Probably too late for a 1.0.2-rc, but this should go into 1.0.3 and 1.1.0. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2568] RangePartitioner should run only ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1562#discussion_r15439847 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -105,24 +108,91 @@ class RangePartitioner[K : Ordering : ClassTag, V]( private var ordering = implicitly[Ordering[K]] + @transient private[spark] var singlePass = true // for unit tests + // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions == 1) { - Array() + Array.empty } else { - val rddSize = rdd.count() - val maxSampleSize = partitions * 20.0 - val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) - val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted - if (rddSample.length == 0) { -Array() + // This is the sample size we need to have roughly balanced output partitions. + val sampleSize = 20.0 * partitions + // Assume the input partitions are roughly balanced and over-sample a little bit. + val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt + val shift = rdd.id + val classTagK = classTag[K] + val sketch = rdd.mapPartitionsWithIndex { (idx, iter) => +val seed = byteswap32(idx + shift) +val (sample, n) = SamplingUtils.reservoirSampleAndCount( + iter.map(_._1), sampleSizePerPartition, seed)(classTagK) +Iterator((idx, n, sample)) + }.collect() + var numItems = 0L + sketch.foreach { case (_, n, _) => +numItems += n + } --- End diff -- `val numItems = sketch.map(_._2.toLong).sum` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2684: Update ExternalAppendOnlyMap to ta...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1607#discussion_r15439805 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Insert the given key and value into the map. + */ + def insert(key: K, value: V) { +insertAll(Iterator((key, value))) + } + + /** + * Insert the given iterator of keys and values into the map. * - * If the underlying map is about to grow, check if the global pool of shuffle memory has + * When the underlying map needs to grow, check if the global pool of shuffle memory has * enough room for this to happen. If so, allocate the memory required to grow the map; * otherwise, spill the in-memory map to disk. * * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. */ - def insert(key: K, value: V) { + def insertAll(entries: Iterator[Product2[K, V]]) { +// An update function for the map that we reuse across entries to avoid allocating +// a new closure each time +var curEntry: Product2[K, V] = null val update: (Boolean, C) => C = (hadVal, oldVal) => { - if (hadVal) mergeValue(oldVal, value) else createCombiner(value) + if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2) } -if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { - val mapSize = currentMap.estimateSize() - var shouldSpill = false - val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap - - // Atomically check whether there is sufficient memory in the global pool for - // this map to grow and, if possible, allocate the required amount - shuffleMemoryMap.synchronized { -val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) -val availableMemory = maxMemoryThreshold - - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) - -// Assume map growth factor is 2x -shouldSpill = availableMemory < mapSize * 2 -if (!shouldSpill) { - shuffleMemoryMap(threadId) = mapSize * 2 + +while (entries.hasNext) { + curEntry = entries.next() + if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { +val mapSize = currentMap.estimateSize() +var shouldSpill = false +val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + +// Atomically check whether there is sufficient memory in the global pool for +// this map to grow and, if possible, allocate the required amount +shuffleMemoryMap.synchronized { + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val availableMemory = maxMemoryThreshold - +(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + + // Assume map growth factor is 2x + shouldSpill = availableMemory < mapSize * 2 + if (!shouldSpill) { +shuffleMemoryMap(threadId) = mapSize * 2 + } +} +// Do not synchronize spills +if (shouldSpill) { + spill(mapSize) } } - // Do not synchronize spills - if (shouldSpill) { -spill(mapSize) - } + currentMap.changeValue(curEntry._1, update) + numPairsInMemory += 1 } -currentMap.changeValue(key, update) -numPairsInMemory += 1 } --- End diff -- Yeah, I saw. Thanks. I'm not really sure how useful it is, because you are correct that we usually have an iterator anyway, but it was simple and clean to add the Iterable interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2684: Update ExternalAppendOnlyMap to ta...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1607#discussion_r15437121 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C]( /** * Insert the given key and value into the map. + */ + def insert(key: K, value: V) { +insertAll(Iterator((key, value))) + } + + /** + * Insert the given iterator of keys and values into the map. * - * If the underlying map is about to grow, check if the global pool of shuffle memory has + * When the underlying map needs to grow, check if the global pool of shuffle memory has * enough room for this to happen. If so, allocate the memory required to grow the map; * otherwise, spill the in-memory map to disk. * * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. */ - def insert(key: K, value: V) { + def insertAll(entries: Iterator[Product2[K, V]]) { +// An update function for the map that we reuse across entries to avoid allocating +// a new closure each time +var curEntry: Product2[K, V] = null val update: (Boolean, C) => C = (hadVal, oldVal) => { - if (hadVal) mergeValue(oldVal, value) else createCombiner(value) + if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2) } -if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { - val mapSize = currentMap.estimateSize() - var shouldSpill = false - val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap - - // Atomically check whether there is sufficient memory in the global pool for - // this map to grow and, if possible, allocate the required amount - shuffleMemoryMap.synchronized { -val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) -val availableMemory = maxMemoryThreshold - - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) - -// Assume map growth factor is 2x -shouldSpill = availableMemory < mapSize * 2 -if (!shouldSpill) { - shuffleMemoryMap(threadId) = mapSize * 2 + +while (entries.hasNext) { + curEntry = entries.next() + if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { +val mapSize = currentMap.estimateSize() +var shouldSpill = false +val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap + +// Atomically check whether there is sufficient memory in the global pool for +// this map to grow and, if possible, allocate the required amount +shuffleMemoryMap.synchronized { + val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId) + val availableMemory = maxMemoryThreshold - +(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) + + // Assume map growth factor is 2x + shouldSpill = availableMemory < mapSize * 2 + if (!shouldSpill) { +shuffleMemoryMap(threadId) = mapSize * 2 + } +} +// Do not synchronize spills +if (shouldSpill) { + spill(mapSize) } } - // Do not synchronize spills - if (shouldSpill) { -spill(mapSize) - } + currentMap.changeValue(curEntry._1, update) + numPairsInMemory += 1 } -currentMap.changeValue(key, update) -numPairsInMemory += 1 } --- End diff -- Does it makes sense to include a more collection-oriented interface, more like scala.collection.mutable.Buffer#insertAll: ```scala def insertAll(coll: Iterable[Product2[K, V]]): Unit = insertAll(coll.iterator) ``` ...so that you can do things like `externalAppendOnlyMap.insertAll(aMap)` instead of `externalAppendOnlyMap.insertAll(aMap.iterator)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1561#issuecomment-50201015 @JoshRosen Why a trait instead of an abstract class? We're not expecting to need to mixin Stage outside of the Stage class hierarchy, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1548#issuecomment-50174114 @YanTangZhai If you are searching for another solution and abandoning this PR, could you please close this PR and open a new one when you have something different for us to look at? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-50160338 After installing `hub` you can also do a bunch of new stuff on the command line, including `hub checkout https://github.com/apache/spark/pull/1499` https://hub.github.com/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2529] Clean closures in foreach and for...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1583#issuecomment-50105915 Does anyone recall why we lost the closure cleaning in https://github.com/apache/spark/commit/6b288b75d4c05f42ad3612813dc77ff824bb6203 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1715: Ensure actor is self-contained in ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/637#issuecomment-50057749 This rebases cleanly on top of https://github.com/apache/spark/pull/1561, so let's get that one in first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15361883 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -341,8 +336,9 @@ class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { logError("No stages registered for job " + job.jobId) } else { - stageIdToJobIds.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { -case (stageId, jobSet) => + stageIdToStage.filter(s => registeredStages.get.contains(s._1)).foreach { --- End diff -- Yes, and you've got it after filterKeys just like after filter. The result of filterKeys is a Map[Int, Stage], and nothing needs to change within the foreach { ... }. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15356652 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -341,8 +336,9 @@ class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { logError("No stages registered for job " + job.jobId) } else { - stageIdToJobIds.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { -case (stageId, jobSet) => + stageIdToStage.filter(s => registeredStages.get.contains(s._1)).foreach { --- End diff -- What? How does `stageIdToJobIds.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach {` do more hash lookups than does `stageIdToStage.filter(s => registeredStages.get.contains(s._1)).foreach {`? Looks the same to me: https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/MapLike.scala#L230 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = "$" + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s"$prefix$javaSeparator${curId.getAndIncrement}") + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[E
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15332026 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -56,6 +58,16 @@ private[spark] class Stage( val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + + /** List of jobs that this stage belong to. */ + val jobIds = new HashSet[Int] + /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */ + var resultOfJob: Option[ActiveJob] = None + var pendingTasks: Option[HashSet[Task[_]]] = None --- End diff -- And potentially error prone. I don't think there is anyplace left where the distinction between None and Some(emptySet) is useful except for the check in removeStage (https://github.com/rxin/spark/blob/dagSchedulerHashMaps/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357), and that's not really even useful anymore. On the other hand, having a None where we expect at least Some(emptySet) can be a problem. I think we're better off without the Option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331819 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite +import scala.collection.mutable.HashSet --- End diff -- I'll have to remember that one! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331775 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -355,14 +351,13 @@ class DAGScheduler( logDebug("Removing running stage %d".format(stageId)) runningStages -= stage } -stageToInfos -= stage for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { shuffleToMapStage.remove(k) } -if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { +if (stage.pendingTasks.isDefined && !stage.pendingTasks.isEmpty) { logDebug("Removing pending status for stage %d".format(stageId)) } -pendingTasks -= stage +stage.pendingTasks = None --- End diff -- The old pendingTasks needed the clearing step so that it wouldn't continue to have removed stages mapping to empty sets of remaining tasks. I agree that a similar step shouldn't be necessary now that pendingTasks is encapsulated in a Stage that is going away. In fact, in the event of resubmission don't we want this to _not_ be None, else stage.pendingTasks.get will fail? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1726] [SPARK-2567] Eliminate zombie sta...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1566#issuecomment-49968268 Makes sense. LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1561#issuecomment-49967238 JIRA? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15328773 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -992,13 +974,14 @@ class DAGScheduler( } private[scheduler] def handleStageCancellation(stageId: Int) { -if (stageIdToJobIds.contains(stageId)) { - val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray - jobsThatUseStage.foreach(jobId => { -handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) - }) -} else { - logInfo("No active jobs to kill for Stage " + stageId) +stageIdToStage.get(stageId) match { + case Some(stage) => +val jobsThatUseStage: Array[Int] = stage.jobIds.toArray +jobsThatUseStage.foreach(jobId => { + handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) +}) --- End diff -- ```scala jobsThatUseStage.foreach { jobId => handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId)) } ``` There are also opportunities throughout to switch from concatenated or formatted strings to string interpolation, which I favor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15328544 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -341,8 +336,9 @@ class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { --- End diff -- update the description of `@param resultStage` above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15328468 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -341,8 +336,9 @@ class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { logError("No stages registered for job " + job.jobId) } else { - stageIdToJobIds.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { -case (stageId, jobSet) => + stageIdToStage.filter(s => registeredStages.get.contains(s._1)).foreach { --- End diff -- Why filter instead of filterKeys? Looks to me like that only serves to make the predicate harder to read. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15328329 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -315,13 +309,14 @@ class DAGScheduler( */ private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) { def updateJobIdStageIdMapsList(stages: List[Stage]) { - if (!stages.isEmpty) { + if (stages.nonEmpty) { val s = stages.head -stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId +s.jobIds += jobId jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id -val parents = getParentStages(s.rdd, jobId) -val parentsWithoutThisJobId = parents.filter(p => - !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) +val parents: List[Stage] = getParentStages(s.rdd, jobId) +val parentsWithoutThisJobId = parents.filter { p: Stage => + !p.jobIds.contains(jobId) --- End diff -- could now do this as `filter { !_.jobIds.contains(jobId) }` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15328234 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -315,13 +309,14 @@ class DAGScheduler( */ private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) { def updateJobIdStageIdMapsList(stages: List[Stage]) { - if (!stages.isEmpty) { + if (stages.nonEmpty) { val s = stages.head -stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId +s.jobIds += jobId jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id -val parents = getParentStages(s.rdd, jobId) -val parentsWithoutThisJobId = parents.filter(p => - !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) +val parents: List[Stage] = getParentStages(s.rdd, jobId) --- End diff -- Why the type annotation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15327897 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -56,6 +58,16 @@ private[spark] class Stage( val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + + /** List of jobs that this stage belong to. */ --- End diff -- nit: It's a Set not a List --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1528#issuecomment-49916553 Yeah, I'm wondering whether the actual problem is that creation and use of scheduler pools with different weights is unclear or too difficult; and that if we could resolve those issues, then the need for this PR would disappear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2567] Resubmitted stage sometimes remai...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1516#issuecomment-49911032 This appears to be a reversion of d58502a1562bbfb1bb4e517ebcc8239efd639297 while ignoring and misapplying the comment regarding ordering (which I'm not completely understanding.) @xiajunluan ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1548#discussion_r15289573 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1202,8 +1202,12 @@ private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, -listener, properties) + new Thread("JobSubmitted for " + jobId) { +override def run() { + dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, +listener, properties) +} + }.start() --- End diff -- Starting a new thread within eventProcessorActor to concurrently manipulate the DAGScheduler's state strikes me as a really bad idea. The actor model and the DAGScheduler are built around the fundamental assumption that only one event will be processed at a time. Breaking that model opens us up to numerous potential race conditions, and I am a long way from being convinced that this PR is either safe of desirable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Add caching information to rdd.toDebugString
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1535#issuecomment-49874874 Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1528#issuecomment-49848309 Sorry, looks like you already have SPARK-2618, so change change the title of this PR to include that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1528#issuecomment-49848091 This looks like a clean implementation, but you still need to open a JIRA issue to explain why you want this; then edit the description of this PR to reference that JIRA. https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15272634 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -268,14 +264,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } } + def sufficientResourcesRegistered(): Boolean = true + override def isReady(): Boolean = { -if (ready) { +if (sufficientResourcesRegistered) { + logInfo("SchedulerBackend is ready for scheduling beginning" + +", total expected resources: " + totalExpectedResources.get() + +", minRegisteredResourcesRatio: " + minRegisteredRatio) return true } if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { - ready = true logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + -"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) +"maxRegisteredResourcesWaitingTime(ms): " + maxRegisteredWaitingTime) --- End diff -- I'd do these two log messages with string interpolation instead of using `+`. http://docs.scala-lang.org/overviews/core/string-interpolation.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1528#discussion_r15272274 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala --- @@ -27,9 +27,18 @@ private[spark] class TaskSet( val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, -val priority: Int, +val jobId: Int, val properties: Properties) { val id: String = stageId + "." + attempt +val DEFAULT_PRIORITY: Int = 0 + + val priority:Int = { +if(properties != null){ + properties.getProperty("spark.scheduler.priority", "0").toInt +}else{ + DEFAULT_PRIORITY +} + } --- End diff -- Is the style checker ok with `val priority = if (...) {...` instead of `val priority = { if (...) {...`? If it is, I'd rather do without the extra `{}`. You can also drop the `: Int` from `val DEFAULT_PRIORITY` and `val priority` -- the types are obvious without the annotations. Also, I'm not sure that DEFAULT_PRIORITY really gains you anything -- I'd be fine with just `if (...) {...} else 0`. And make sure you follow the style guide for spacing with parens and braces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15270909 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 --- End diff -- Sorry, was doing something dumb. Leave it a `var` but clean up the initialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15270679 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 --- End diff -- ...actually, it doesn't look like it's used at all anymore except in a log message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15270593 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 --- End diff -- Looks like `minRegisteredRatio` is only needed within this class and doesn't need to be a var: ```scala private val minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1528#discussion_r15270239 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala --- @@ -27,9 +27,18 @@ private[spark] class TaskSet( val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, -val priority: Int, +val jobId: Int, val properties: Properties) { val id: String = stageId + "." + attempt +val DEFAULT_PRIORITY: Int = 0 + + def priority:Int = { +if(properties != null){ + properties.getProperty("spark.scheduler.priority", "0").toInt +}else{ + DEFAULT_PRIORITY +} + } --- End diff -- Why the change from `val` to `def`? I believe `val priority = if ... else ...` will work fine here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1528#discussion_r15270158 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala --- @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.math.Ordering.Implicits._ --- End diff -- Pulling in these implicits can have unintended consequences; that's why in my previous comment I kept the scope of the import as small as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15268935 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + // Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds). --- End diff -- Ah, I see -- sorry. Looks like this is what we want? `// Submit tasks after maxRegisteredWaitingTime milliseconds if minRegisteredRatio has not yet been reached` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Add caching information to rdd.toDebugString
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1535#discussion_r15259034 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1294,7 +1307,11 @@ abstract class RDD[T: ClassTag]( val partitionStr = "(" + rdd.partitions.size + ")" val leftOffset = (partitionStr.length - 1) / 2 val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) - Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix) + + debugSelf(rdd).zipWithIndex.map{ +case (desc: String, 0) => partitionStr+" "+desc +case (desc: String, _) => nextPrefix+" "+desc --- End diff -- And elsewhere in this PR, avoid string concatenation with `+` when string interpolation would be equally clear or clearer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Add caching information to rdd.toDebugString
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1535#discussion_r15258957 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1294,7 +1307,11 @@ abstract class RDD[T: ClassTag]( val partitionStr = "(" + rdd.partitions.size + ")" val leftOffset = (partitionStr.length - 1) / 2 val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) - Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix) + + debugSelf(rdd).zipWithIndex.map{ +case (desc: String, 0) => partitionStr+" "+desc +case (desc: String, _) => nextPrefix+" "+desc --- End diff -- s"$partitionStr $desc" s"$nextPrefix $desc" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2490] Change recursive visiting on RDD ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1418#issuecomment-49794355 Jenkins, ok to test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1528#discussion_r15248491 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala --- @@ -32,11 +32,21 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { - val stageId1 = s1.stageId - val stageId2 = s2.stageId - res = math.signum(stageId1 - stageId2) + val jobId1 = s1.jobId + val jobId2 = s2.jobId + res = math.signum(jobId1 - jobId2) + if (res == 0) { +val stageId1 = s1.stageId +val stageId2 = s2.stageId +res = math.signum(stageId1 - stageId2) + } + if (res < 0) { +true + } else { +false + } --- End diff -- This `if..else` doesn't actually do anything. This whole comparator is needlessly complex. If the intent is lexicographical ordering with higher priority ahead of lower priority, lower jobId ahead of higher jobId, and lower stageId ahead of higher stageId, then this should be sufficient: ```scala private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { import scala.math.Ordering.Implicits._ (s1.priority, s2.jobId, s2.stageId) > (s2.priority, s1.jobId, s1.stageId) } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: use config spark.scheduler.priority for specif...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1528#discussion_r15246213 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -778,8 +778,10 @@ class DAGScheduler( logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) + val priority:String = properties.getProperty("spark.scheduler.priority", "0") taskScheduler.submitTasks( -new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) +new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, priority.toInt, + properties)) --- End diff -- `properties` is already being passed to the `TaskSet` ctor, so I'd prefer that extraction of `priority` happen there or elsewhere instead of doing `properties.getProperty` here and adding another parameter to the `TaskSet` ctor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2490] Change recursive visiting on RDD ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1418#issuecomment-49773532 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15242315 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala --- @@ -108,4 +108,8 @@ private[spark] class SparkDeploySchedulerBackend( logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason.toString) } + + override def checkRegisteredResources(): Boolean = { --- End diff -- or `sufficientResourcesRegistered` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15242266 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala --- @@ -108,4 +108,8 @@ private[spark] class SparkDeploySchedulerBackend( logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason.toString) } + + override def checkRegisteredResources(): Boolean = { --- End diff -- I'd prefer the name to indicate what condition is being checked, so something like `sufficientRegisteredResources`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1525#discussion_r15240513 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - var totalExpectedExecutors = new AtomicInteger(0) + var totalExecutors = new AtomicInteger(0) + var totalExpectedResources = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Submit tasks only after (registered executors / total expected executors) + // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. - var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0) if (minRegisteredRatio > 1) minRegisteredRatio = 1 - // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + // Whatever minRegisteredRatio is arrived, submit tasks after the time(milliseconds). --- End diff -- // Submit tasks time(milliseconds) after minRegisteredRatio is reached --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1362#discussion_r15076386 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1107,7 +1106,6 @@ class DAGScheduler( case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { -visitedStages += mapStage --- End diff -- @mateiz It looks to me that this is what was always intended, but has actually been missing the `visitedStages` check for the past couple of years: ```scala if (!mapStage.isAvailable && !visitedStages(mapStage)) { visitedStages += mapStage visit(mapStage.rdd) } // Otherwise there's no need to follow the dependency back ``` Making that change works for me in the sense that all of the test suites continue to pass, but I haven't yet got any relative performance numbers. Proabably needs to go in a separate JIRA & PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1362#discussion_r15041791 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1107,7 +1106,6 @@ class DAGScheduler( case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { -visitedStages += mapStage --- End diff -- Yup, it's odd -- and it looks to me like it has been that way since this section of code was introduced. Looks like there should be an `if (!visitedStages(mapStage))` guarding `visit(mapStage.rdd)`... testing... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1362#discussion_r15040842 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1107,7 +1106,6 @@ class DAGScheduler( case shufDep: ShuffleDependency[_, _, _] => val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { -visitedStages += mapStage --- End diff -- `visitStages` is built up and used repeatedly in the recursive calls to `visit` https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1101 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Async in progress
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1449#issuecomment-49241702 Please create a JIRA issue and a description for this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2519. Eliminate pattern-matching on Tupl...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1435#issuecomment-49195389 Got it. Thanks. That also helps to put some bound (for now) on where we will make such performance optimizations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2519. Eliminate pattern-matching on Tupl...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1435#issuecomment-49174657 Hmmm... not sure that I would go so far as to call it "nice". This does make the code slightly more difficult to read and understand, so can we hope that you've got some relative performance numbers that justify this compromise, @sryza ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14849312 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.sql.catalyst.expressions._ + +/** + * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new + * input [[Row]] for a fixed set of [[Expression Expressions]]. + */ +object GenerateMutableProjection extends CodeGenerator { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + // TODO: Should be weak references... bounded in size. + val projectionCache = new collection.mutable.HashMap[Seq[Expression], () => MutableProjection] + + def apply(expressions: Seq[Expression], inputSchema: Seq[Attribute]): (() => MutableProjection) = +apply(expressions.map(BindReferences.bindReference(_, inputSchema))) + + // TODO: Safe to fire up multiple instances of the compiler? + def apply(expressions: Seq[Expression]): () => MutableProjection = +globalLock.synchronized { + val cleanedExpressions = expressions.map(ExpressionCanonicalizer(_)) + projectionCache.getOrElseUpdate(cleanedExpressions, createProjection(cleanedExpressions)) +} + + val mutableRowName = newTermName("mutableRow") + + def createProjection(expressions: Seq[Expression]): (() => MutableProjection) = { --- End diff -- Should these `create*` functions be public? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14848366 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + val rowType = typeOf[Row] + val mutableRowType = typeOf[MutableRow] + val genericRowType = typeOf[GenericRow] + val genericMutableRowType = typeOf[GenericMutableRow] + + val projectionType = typeOf[Projection] + val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeperator = "$" + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}") + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm An possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns the code required to determine both if the result is NULL + * as well as the code required to compute the value. + */ + def expressionEvaluator(e: Expression): EvaluatedExpression = { +val primitiveTerm = freshName("primitiveTerm") +val nullTerm = freshName("nullTerm") +val objectTerm = freshName("objectTerm") + +implicit class Evaluate1(e: Expression) { + def castOrNull(f: TermName => Tree, dataType: DataType): Seq[Tree] = { +val eval = expressionEvaluator(e) +eval.code ++ + q""" + val $nullTerm = ${eval.nullTerm} + val $primitiveTerm = +if($nullTerm) + ${defaultPrimitive(dataType)} +else + ${f(eval.primitiveTerm)} +""".children + } +} + +implicit class Evaluate2(expressions: (Expression, Expression)) { + + /** + * Short hand for generating binary evaluation code, which depends on two sub-evaluations of + * the same type. If either of the sub-expressions is null, the results of this computation + * is a
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14847035 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + val rowType = typeOf[Row] + val mutableRowType = typeOf[MutableRow] + val genericRowType = typeOf[GenericRow] + val genericMutableRowType = typeOf[GenericMutableRow] + + val projectionType = typeOf[Projection] + val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeperator = "$" + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}") + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm An possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns the code required to determine both if the result is NULL + * as well as the code required to compute the value. + */ + def expressionEvaluator(e: Expression): EvaluatedExpression = { +val primitiveTerm = freshName("primitiveTerm") +val nullTerm = freshName("nullTerm") +val objectTerm = freshName("objectTerm") + +implicit class Evaluate1(e: Expression) { + def castOrNull(f: TermName => Tree, dataType: DataType): Seq[Tree] = { +val eval = expressionEvaluator(e) +eval.code ++ + q""" + val $nullTerm = ${eval.nullTerm} + val $primitiveTerm = +if($nullTerm) + ${defaultPrimitive(dataType)} +else + ${f(eval.primitiveTerm)} +""".children --- End diff -- Is this really the way we want to format `q"""`? To my eye, it would be much more readable as: ```scala q""" val $nullTerm = ${eval1.nullTerm} || ${eval2.nullTerm} val $primitiveTerm: ${termForType(resultType)} = if($nullTerm) { ${defaultPrimitive(resultType)}
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14846216 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + val rowType = typeOf[Row] + val mutableRowType = typeOf[MutableRow] + val genericRowType = typeOf[GenericRow] + val genericMutableRowType = typeOf[GenericMutableRow] + + val projectionType = typeOf[Projection] + val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeperator = "$" + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}") + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm An possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns the code required to determine both if the result is NULL + * as well as the code required to compute the value. --- End diff -- Description of the output seems less than precise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14846043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + val rowType = typeOf[Row] + val mutableRowType = typeOf[MutableRow] + val genericRowType = typeOf[GenericRow] + val genericMutableRowType = typeOf[GenericMutableRow] + + val projectionType = typeOf[Projection] + val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeperator = "$" + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}") + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm An possibly boxed version of the result of evaluating this expression. --- End diff -- s/A/An --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14845309 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual --- End diff -- sp. referring --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r14845258 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code that performs expression evaluation. Includes helpers + * for refering to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator extends Logging { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + val rowType = typeOf[Row] + val mutableRowType = typeOf[MutableRow] + val genericRowType = typeOf[GenericRow] + val genericMutableRowType = typeOf[GenericMutableRow] + + val projectionType = typeOf[Projection] + val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeperator = "$" --- End diff -- sp. separator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1360#issuecomment-48643262 @aarondav --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...
GitHub user markhamstra opened a pull request: https://github.com/apache/spark/pull/1360 SPARK-2425 Don't kill a still-running Application because of some misbehaving Executors Introduces a LOADING -> RUNNING ApplicationState transition and prevents Master from removing an Application with RUNNING Executors. Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor exits over the entire lifetime of the Application, allow that many since any Executor successfully began running the Application; 2) Don't remove the Application while Master still thinks that there are RUNNING Executors. This should be fine as long as the ApplicationInfo doesn't believe any Executors are forever RUNNING when they are not. I think that any non-RUNNING Executors will eventually no longer be RUNNING in Master's accounting, but another set of eyes should confirm that. This PR also doesn't try to detect which nodes have gone rogue or to kill off bad Workers, so repeatedly failing Executors will continue to fail and fill up log files with failure reports as long as the Application keeps running. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markhamstra/spark SPARK-2425 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1360.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1360 commit 5b85534d376d682b7e1f97f98acd532a305349f8 Author: Mark Hamstra Date: 2014-07-09T23:02:43Z SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition and prevent Master from removing Application with RUNNING Executors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra closed the pull request at: https://github.com/apache/spark/pull/686 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [MLLIB] SPARK-2329 Add multi-label evaluation ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1270#discussion_r14428640 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + +/** + * Evaluator for multilabel classification. + * NB: type Double both for prediction and label is retained + * for compatibility with model.predict that returns Double + * and MLUtils.loadLibSVMFile that loads class labels as Double + * + * @param predictionAndLabels an RDD of (predictions, labels) pairs, both are non-null sets. + */ +class MultilabelMetrics(predictionAndLabels:RDD[(Set[Double], Set[Double])]) extends Logging{ + + private lazy val numDocs = predictionAndLabels.count + + private lazy val numLabels = predictionAndLabels.flatMap{case(_, labels) => labels}.distinct.count + + /** + * Returns strict Accuracy + * (for equal sets of labels) + * @return strictAccuracy. + */ + lazy val strictAccuracy = predictionAndLabels.filter{case(predictions, labels) => +predictions == labels}.count.toDouble / numDocs + + /** + * Returns Accuracy + * @return Accuracy. + */ + lazy val accuracy = predictionAndLabels.map{ case(predictions, labels) => +labels.intersect(predictions).size.toDouble / labels.union(predictions).size}. +fold(0.0)(_ + _) / numDocs + --- End diff -- After ``import org.apache.spark.SparkContext._``, it should already be there as an implicit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r14211667 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} --- End diff -- Ok, either tonight or tomorrow I can update this PR to reflect that strategy, or you can go ahead and make the change @pwendell. Outside the immediate scope of this PR, what prevents Mesos from being able to kill tasks, and when do we expect that to change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r14041700 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} --- End diff -- Do we have the meaning of all the listener events fully documented someplace? Or perhaps that needs to be done in a separate PR and then DAGScheduler updated to match the documented expectation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r14041296 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} --- End diff -- What you suggest could be done, but there's a question of whether or not notification of cancellation of the job should be made regardless of whether any stages and task are successfully cancelled as a consequence. I don't really know how to answer that because I don't know how all of the listeners are using the notification or whether they are all expecting the same semantics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r14040631 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1062,10 +1062,15 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} --- End diff -- Hmmm... not sure that I agree. A job being cancelled, stages being cancelled, and tasks being cancelled are all different things. The expectation is that job cancellation will lead to cancellation of independent stages and their associated tasks; but if no stages and tasks get cancelled, it's probably still worthwhile for the information to be sent that the job itself was cancelled. I expect that eventually all of the backends will support task killing, so this whole no-kill path should never be hit. But moving the job cancellation notification within the try-to-cancelTasks block will result in multiple notifications that the parent job was cancelled -- one for each independent stage cancellation. Or am I misunderstanding what you are suggesting? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r14032954 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("job cancellation no-kill backend") { +// make sure that the DAGScheduler doesn't crash when the TaskScheduler +// doesn't implement killTask() +val noKillTaskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { +// normally done by TaskSetManager +taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) --- End diff -- Sure, doing nothing is easy. On Fri, Jun 20, 2014 at 10:47 AM, Kay Ousterhout wrote: > In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: > > > @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F > > assertDataStructuresEmpty > >} > > > > + test("job cancellation no-kill backend") { > > +// make sure that the DAGScheduler doesn't crash when the TaskScheduler > > +// doesn't implement killTask() > > +val noKillTaskScheduler = new TaskScheduler() { > > + override def rootPool: Pool = null > > + override def schedulingMode: SchedulingMode = SchedulingMode.NONE > > + override def start() = {} > > + override def stop() = {} > > + override def submitTasks(taskSet: TaskSet) = { > > +// normally done by TaskSetManager > > +taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) > > are these lines necessary (can you just do nothing here?) > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/686/files#r14031997>. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/686#issuecomment-46684950 ping: This should go into 1.0.1 @pwendell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Branch 1.0 Add ZLIBCompressionCodec code
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1115#issuecomment-46479466 Yes, this PR is not in a useful state right now. It's hard to even find the proposed changes because of all the clutter of unnecessary commits, but it looks to me that you are creating new API? If that is the case, then the rebase definitely needs to be against master, since branch-1.0 is a maintenance branch for bug-fixes, not new API. Additionally, you should create a JIRA issue explaining what this PR is trying to address. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2060][SQL] Querying JSON Datasets with ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/999#issuecomment-46436035 Hmmm, that doesn't precisely match my recollection or understanding. Certainly we discussed that alpha components aren't required to maintain a stable API, but I don't recall an explicit decision that changes to alpha components would routinely be merged back into maintenance releases. I could be mistaken, and merging new alpha API into maintenance branches may be the right strategy, but this did take me a little by surprise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2060][SQL] Querying JSON Datasets with ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/999#issuecomment-46389597 Is that the basic strategy we are going to use with AlphaComponents -- merging new APIs at both the minor and maintenance levels? I don't know that I have any objection to that, but I don't recall any discussion directly on point, and this is the first such addition that has been made to branch-1.0 while I was paying attention. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2158 Clean up core/stdout file from File...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1100#issuecomment-46265806 jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2158 Clean up core/stdout file from File...
GitHub user markhamstra opened a pull request: https://github.com/apache/spark/pull/1100 SPARK-2158 Clean up core/stdout file from FileAppenderSuite @tdas You can merge this pull request into a Git repository by running: $ git pull https://github.com/markhamstra/spark SPARK-2158 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1100.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1100 commit d95b49c830b5b2ed99842845260a050b0e519d80 Author: Mark Hamstra Date: 2014-06-16T23:12:39Z Cleanup 'stdout' file within FileAppenderSuite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1715: Ensure actor is self-contained in ...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/637#issuecomment-46104906 It means that the check for binary compatibility after your patch is applied has failed because the checker thinks that there previously was a default/automatic setter method for the dagScheduler var in SparkContext and that there isn't one anymore after applying your patch. See SPARK-2069 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SQL] Update SparkSQL and ScalaTest in branch-...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/1078#issuecomment-46071589 FYI Bumping all the way to the current scalatest 2.2.0 also works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/999#discussion_r13520869 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/json/JsonTable.scala --- @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetField} + +import com.fasterxml.jackson.databind.ObjectMapper + +import scala.collection.JavaConversions._ +import scala.math.BigDecimal +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types.StructField +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.catalyst.types.ArrayType +import org.apache.spark.sql.catalyst.expressions.GetField +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.catalyst.expressions.Alias + +sealed trait SchemaResolutionMode + +case object EAGER_SCHEMA_RESOLUTION extends SchemaResolutionMode +case class EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(val fraction: Double) extends SchemaResolutionMode +case object LAZY_SCHEMA_RESOLUTION extends SchemaResolutionMode + +/** + * :: Experimental :: + * Converts a JSON file to a SparkSQL logical query plan. This implementation is only designed to + * work on JSON files that have mostly uniform schema. The conversion suffers from the following + * limitation: + * - The data is optionally sampled to determine all of the possible fields. Any fields that do + *not appear in this sample will not be included in the final output. + */ +@Experimental +object JsonTable extends Serializable with Logging { + def inferSchema( + json: RDD[String], sampleSchema: Option[Double] = None): LogicalPlan = { +val schemaData = sampleSchema.map(json.sample(false, _, 1)).getOrElse(json) +val allKeys = parseJson(schemaData).map(getAllKeysWithValueTypes).reduce(_ ++ _) + +// Resolve type conflicts +val resolved = allKeys.groupBy { + case (key, dataType) => key +}.map { + // Now, keys and types are organized in the format of + // key -> Set(type1, type2, ...). + case (key, typeSet) => { +val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq +val dataType = typeSet.map { + case (_, dataType) => dataType +}.reduce((type1: DataType, type2: DataType) => getCompatibleType(type1, type2)) + +// Finally, we replace all NullType to StringType. We do not need to take care +// StructType because all fields with a StructType are represented by a placeholder +// StructType(Nil). +dataType match { + case NullType => (fieldName, StringType) + case ArrayType(NullType) => (fieldName, ArrayType(StringType)) + case other => (fieldName, other) +} + } +} + +def m
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/369#discussion_r13423508 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag]( def ++(other: RDD[T]): RDD[T] = this.union(other) /** + * Return this RDD sorted by the given key function. + */ + def sortBy[K]( + f: (T) â K, + ascending: Boolean = true, + numPartitions: Int = this.partitions.size) + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = --- End diff -- https://issues.apache.org/jira/browse/SPARK-1540 https://github.com/apache/spark/commit/640f9a0efefd42cff86aecd4878a3a57f5ae85fa On Wed, Jun 4, 2014 at 8:14 PM, Wenchen Fan wrote: > In core/src/main/scala/org/apache/spark/rdd/RDD.scala: > > > @@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag]( > >def ++(other: RDD[T]): RDD[T] = this.union(other) > > > >/** > > + * Return this RDD sorted by the given key function. > > + */ > > + def sortBy[K]( > > + f: (T) â K, > > + ascending: Boolean = true, > > + numPartitions: Int = this.partitions.size) > > + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = > > @markhamstra <https://github.com/markhamstra> I checked RDD.scala and > found the style you talked about. But one thing I don't understand is:def > countByValue()(implicit ord: Ordering[T] = null). I can't find the use of > implicit Ordering[T] anywhere inside the method, and I compiled spark > successfully with this implicit Ordering[T] removed. Did I miss something > here? > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/369/files#r13422474>. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/369#discussion_r13419732 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag]( def ++(other: RDD[T]): RDD[T] = this.union(other) /** + * Return this RDD sorted by the given key function. + */ + def sortBy[K]( + f: (T) â K, + ascending: Boolean = true, + numPartitions: Int = this.partitions.size) + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = --- End diff -- If I understand what you are claiming, then I think you are mistaken. There is only one implicit parameter list, though, so I don't think you can supply an argument for only one of the implicits: ``` scala> def foo[A: ClassTag](implicit ord: Ordering[A]) = (implicitly[ClassTag[A]], ord) foo: [A](implicit evidence$1: scala.reflect.ClassTag[A], implicit ord: scala.math.Ordering[A])(scala.reflect.ClassTag[A], scala.math.Ordering[A]) scala> foo[Int] res1: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = (Int,scala.math.Ordering$Int$@5e10a811) scala> foo[Int](scala.reflect.classTag[Int], Ordering[Int]) res2: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = (Int,scala.math.Ordering$Int$@5e10a811) scala> foo[Int](scala.reflect.classTag[Int], Ordering[Int].reverse) res3: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = (Int,scala.math.Ordering$$anon$4@7293296) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/686#issuecomment-45113730 @rxin merge to 1.0.1 and 1.1.0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/369#discussion_r13387198 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag]( def ++(other: RDD[T]): RDD[T] = this.union(other) /** + * Return this RDD sorted by the given key function. + */ + def sortBy[K]( + f: (T) â K, + ascending: Boolean = true, + numPartitions: Int = this.partitions.size) + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = --- End diff -- That substitution certainly could be made since it is functionally equivalent. Whether it should be made is mostly a question of style. Since `ord` and `ctag` are not used explicitly here, there is no real use to enumerating them as implicit parameters; but the rest of RDD.scala doesn't use the context bound sugar, so there is some value in consistently maintaining that style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1997] update breeze to version 0.8.1
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/940#issuecomment-44914945 Neither does spark 1.0.0. We've offered no guarantee that any spark 1.x will work with scala 2.11. If it turns out that we can't cross-compile for scala 2.10 and 2.11 with essentially the same source code and dependencies and preserving our intention to preserve binary compatibility, then that may be the occasion that forces the beginning of spark 2.0 development. In any event, the breaking of our intent to preserve binary compatibility is no small matter and shouldn't slip into a PR without considerable discussion. I'm not certain that that is what is happening with this PR, but I do think that it is critical that we understand how strong our checks for binary compatibility are and what are the bounds on dependency upgrades that we can make while preserving that compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP] update breeze to version 0.8.1
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/940#issuecomment-44889156 We shouldn't, so I think we maintain source compatibility without any trouble. Are the MIMA checks good enough to catch binary incompatibility when we make significant changes to library dependencies? Not that I am asserting that this Breeze change is particularly significant, since I haven't look at the diff from 0.7. I'm more interested in the general case of what needs to be checked or done when we want to update dependencies while staying within the bounds of 1.x. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP] update breeze to version 0.8.1
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/940#issuecomment-44861308 What is the reason for this change, and how does it affect our intention to maintain binary compatibility? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r13106282 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1055,10 +1055,16 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} finally { + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) --- End diff -- Upon further review, I can't see any use (other than misleading the user) for posting stage cancellation to the SparkListenerBus when the SchedulerBackend does not support cancellation, so we won't do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1938] [SQL] ApproxCountDistinctMergeFun...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/893#issuecomment-44309000 @ash211 Makes sense to me -- which doesn't necessarily mean a lot in this unfamiliar area of the code It looks to me like the dataType for each of CountDistinct, ApproxCountDistinctMerge and ApproxCountDistinct should be LongType. And while we are in this file, AggregateFunction#eval(input: Row): Any doesn't actually override anything and looks to me to be superfluous. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1868: Users should be allowed to cogroup...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/813#issuecomment-43663175 To throw another wrench into the Union analogy, there is also the little-used SparkContext#union, which has signatures for both Seq[RDD[T]] and varags RDD[T]. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
GitHub user markhamstra opened a pull request: https://github.com/apache/spark/pull/686 [SPARK-1749] Job cancellation when SchedulerBackend does not implement killTask It turns out that having the DAGScheduler tell the taskScheduler to cancelTasks when the backend does not implement killTask (e.g. Mesos) is not such a good idea. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markhamstra/spark SPARK-1749 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #686 commit 57fe87e75912cc9cd7ae9e5a0ed3f4236d46b80f Author: Mark Hamstra Date: 2014-05-07T22:56:27Z Catch UnsupportedOperationException when DAGScheduler tries to cancel a job on a SchedulerBackend that does not implement killTask --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r12497895 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) -dagScheduler.doCancelAllJobs() +try { + dagScheduler.doCancelAllJobs() +} catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) --- End diff -- https://github.com/apache/spark/pull/715? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/686#issuecomment-42495440 The INFO log should include the information that tasks were not cancelled. Where/how else do you want to see notification of those facts? Is adding more Listener events something we want to contemplate still in 1.0.0, or should something like that go into 1.1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r12498098 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1148,7 +1154,11 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) case x: Exception => logError("eventProcesserActor failed due to the error %s; shutting down SparkContext" .format(x.getMessage)) -dagScheduler.doCancelAllJobs() +try { + dagScheduler.doCancelAllJobs() +} catch { + case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) --- End diff -- @aarondav So shall I just back off to `case e: Exception =>` here and let Throwable be picked up in a larger refactoring of Akka exception handling? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/686#discussion_r12409225 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1055,10 +1055,16 @@ class DAGScheduler( // This is the only job that uses this stage, so fail the stage if it is running. val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { -taskScheduler.cancelTasks(stageId, shouldInterruptThread) -val stageInfo = stageToInfos(stage) -stageInfo.stageFailed(failureReason) - listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, shouldInterruptThread) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) +} finally { + val stageInfo = stageToInfos(stage) + stageInfo.stageFailed(failureReason) --- End diff -- Good question. I'm mostly just trying to keep the DAGScheduler in a consistent state even when the backend doesn't support killing tasks, and I'll admit to working quickly while trying to get this significant bug fix into 1.0.0, not having fully thought this part through. If you can't see any use for the finally block unless taskScheduler.cancelTasks is successful, then we can drop the finally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---