[GitHub] spark pull request: SPARK-2380: Support displaying accumulator val...

2014-07-29 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1309#discussion_r15557746
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -809,12 +810,25 @@ class DAGScheduler(
   listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
   runningStages -= stage
 }
+
 event.reason match {
   case Success =>
 logInfo("Completed " + task)
 if (event.accumUpdates != null) {
   // TODO: fail the stage if the accumulator update fails...
   Accumulators.add(event.accumUpdates) // TODO: do this only if 
task wasn't resubmitted
+  event.accumUpdates.foreach { case (id, partialValue) =>
+val acc = 
Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
+val name = acc.name
+// To avoid UI cruft, ignore cases where value wasn't updated
+if (partialValue != acc.zero) {
+  val stringPartialValue = acc.prettyPartialValue(partialValue)
+  val stringValue = acc.prettyValue(acc.value)
+  stageToInfos(stage).accumulables(id) = AccumulableInfo(id, 
acc.name, stringValue)
+  event.taskInfo.accumulables +=
+AccumulableInfo(id, name, Some(stringPartialValue), 
stringValue)
+}
+  }
--- End diff --

Can this be moved to a method on the Accumulators companion object or 
something?  These details about AccumulableInfo, prettyPartialValues, etc. 
aren't things that need to appear in the DAGScheduler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-29 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1056#discussion_r15543587
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -38,8 +37,10 @@ import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.storage._
 import org.apache.spark.storage.{BlockId, BlockManager, 
BlockManagerMaster, RDDBlockId}
--- End diff --

redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-29 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1056#discussion_r15542781
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -348,4 +353,48 @@ private[spark] class Executor(
   }
 }
   }
+
+  def stop() {
+isStopped = true
+threadPool.shutdown()
+  }
+
+  def startDriverHeartbeater() {
+val interval = conf.getInt("spark.executor.heartbeatInterval", 2000)
+val timeout = AkkaUtils.lookupTimeout(conf)
+val retryAttempts = AkkaUtils.numRetries(conf)
+val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
+val heartbeatReceiverRef = 
AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)
+
+val t = new Thread() {
--- End diff --

use Utils.namedThreadFactory or otherwise generate a named thread


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2638 MapOutputTracker concurrency improv...

2014-07-29 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1542#issuecomment-50511022
  
I dunno, merging a PR with no changed files doesn't sound too scary to me.

Something is definitely messed up in this PR, with both `Commits` and 
`Files changed` showing invalid results. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-29 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1056#discussion_r15538656
  
--- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import akka.actor.Actor
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.scheduler.TaskScheduler
+
+/**
+ * A heartbeat from executors to the driver. This is a shared message used 
by several internal
+ * components to convey liveness or execution information for in-progress 
tasks.
+ */
+private[spark] case class Heartbeat(
+executorId: String,
+taskMetrics: Array[(Long, TaskMetrics)],
+blockManagerId: BlockManagerId)
+  extends Serializable
--- End diff --

case class is inherently Serializable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15499105
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug("submitMissingTasks(" + stage + ")")
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =>
+abortStage(stage, "Task not serializable: " + e.toString)
+runningStages -= stage
--- End diff --

Yeah, I think the intention was to avoid SparkListenerStageCompleted when 
Listeners had never seen a prior StageSubmitted event.  If you want to handle 
Completed without Submitted in the UI, I don't think those changes will break 
anything else -- but it may be unexpected behavior for other Listeners. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498156
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{NotSerializableException, PrintWriter, StringWriter}
+import java.io.{NotSerializableException}
--- End diff --

must be wip :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [Build] SPARK-2614: (2nd patch) Create a spark...

2014-07-27 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1611#discussion_r15442657
  
--- Diff: assembly/src/deb/control/examples/control ---
@@ -0,0 +1,8 @@
+Package: [[deb.pkg.name]]-examples
+Version: [[version]]-[[buildNumber]]
+Section: misc
+Priority: extra
+Architecture: all
--- End diff --

probably a good idea to add ``Depends: [[deb.pkg.name]] (= 
[[version]]-[[buildNumber]])`` -- i.e. an explicit dependency on the `spark` 
package with the same version number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...

2014-07-27 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1360#issuecomment-50281668
  
ping

Probably too late for a 1.0.2-rc, but this should go into 1.0.3 and 1.1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2568] RangePartitioner should run only ...

2014-07-27 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1562#discussion_r15439847
  
--- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala ---
@@ -105,24 +108,91 @@ class RangePartitioner[K : Ordering : ClassTag, V](
 
   private var ordering = implicitly[Ordering[K]]
 
+  @transient private[spark] var singlePass = true // for unit tests
+
   // An array of upper bounds for the first (partitions - 1) partitions
   private var rangeBounds: Array[K] = {
 if (partitions == 1) {
-  Array()
+  Array.empty
 } else {
-  val rddSize = rdd.count()
-  val maxSampleSize = partitions * 20.0
-  val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
-  val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
-  if (rddSample.length == 0) {
-Array()
+  // This is the sample size we need to have roughly balanced output 
partitions.
+  val sampleSize = 20.0 * partitions
+  // Assume the input partitions are roughly balanced and over-sample 
a little bit.
+  val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 
rdd.partitions.size).toInt
+  val shift = rdd.id
+  val classTagK = classTag[K]
+  val sketch = rdd.mapPartitionsWithIndex { (idx, iter) =>
+val seed = byteswap32(idx + shift)
+val (sample, n) = SamplingUtils.reservoirSampleAndCount(
+  iter.map(_._1), sampleSizePerPartition, seed)(classTagK)
+Iterator((idx, n, sample))
+  }.collect()
+  var numItems = 0L
+  sketch.foreach { case (_, n, _) =>
+numItems += n
+  }
--- End diff --

`val numItems = sketch.map(_._2.toLong).sum`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2684: Update ExternalAppendOnlyMap to ta...

2014-07-27 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1607#discussion_r15439805
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C](
 
   /**
* Insert the given key and value into the map.
+   */
+  def insert(key: K, value: V) {
+insertAll(Iterator((key, value)))
+  }
+
+  /**
+   * Insert the given iterator of keys and values into the map.
*
-   * If the underlying map is about to grow, check if the global pool of 
shuffle memory has
+   * When the underlying map needs to grow, check if the global pool of 
shuffle memory has
* enough room for this to happen. If so, allocate the memory required 
to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is 
not tracked.
*/
-  def insert(key: K, value: V) {
+  def insertAll(entries: Iterator[Product2[K, V]]) {
+// An update function for the map that we reuse across entries to 
avoid allocating
+// a new closure each time
+var curEntry: Product2[K, V] = null
 val update: (Boolean, C) => C = (hadVal, oldVal) => {
-  if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+  if (hadVal) mergeValue(oldVal, curEntry._2) else 
createCombiner(curEntry._2)
 }
-if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
-  val mapSize = currentMap.estimateSize()
-  var shouldSpill = false
-  val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
-  // Atomically check whether there is sufficient memory in the global 
pool for
-  // this map to grow and, if possible, allocate the required amount
-  shuffleMemoryMap.synchronized {
-val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
-val availableMemory = maxMemoryThreshold -
-  (shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
-
-// Assume map growth factor is 2x
-shouldSpill = availableMemory < mapSize * 2
-if (!shouldSpill) {
-  shuffleMemoryMap(threadId) = mapSize * 2
+
+while (entries.hasNext) {
+  curEntry = entries.next()
+  if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
+val mapSize = currentMap.estimateSize()
+var shouldSpill = false
+val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+// Atomically check whether there is sufficient memory in the 
global pool for
+// this map to grow and, if possible, allocate the required amount
+shuffleMemoryMap.synchronized {
+  val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+  val availableMemory = maxMemoryThreshold -
+(shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
+
+  // Assume map growth factor is 2x
+  shouldSpill = availableMemory < mapSize * 2
+  if (!shouldSpill) {
+shuffleMemoryMap(threadId) = mapSize * 2
+  }
+}
+// Do not synchronize spills
+if (shouldSpill) {
+  spill(mapSize)
 }
   }
-  // Do not synchronize spills
-  if (shouldSpill) {
-spill(mapSize)
-  }
+  currentMap.changeValue(curEntry._1, update)
+  numPairsInMemory += 1
 }
-currentMap.changeValue(key, update)
-numPairsInMemory += 1
   }
--- End diff --

Yeah, I saw.  Thanks.  I'm not really sure how useful it is, because you 
are correct that we usually have an iterator anyway, but it was simple and 
clean to add the Iterable interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2684: Update ExternalAppendOnlyMap to ta...

2014-07-26 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1607#discussion_r15437121
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -110,42 +110,56 @@ class ExternalAppendOnlyMap[K, V, C](
 
   /**
* Insert the given key and value into the map.
+   */
+  def insert(key: K, value: V) {
+insertAll(Iterator((key, value)))
+  }
+
+  /**
+   * Insert the given iterator of keys and values into the map.
*
-   * If the underlying map is about to grow, check if the global pool of 
shuffle memory has
+   * When the underlying map needs to grow, check if the global pool of 
shuffle memory has
* enough room for this to happen. If so, allocate the memory required 
to grow the map;
* otherwise, spill the in-memory map to disk.
*
* The shuffle memory usage of the first trackMemoryThreshold entries is 
not tracked.
*/
-  def insert(key: K, value: V) {
+  def insertAll(entries: Iterator[Product2[K, V]]) {
+// An update function for the map that we reuse across entries to 
avoid allocating
+// a new closure each time
+var curEntry: Product2[K, V] = null
 val update: (Boolean, C) => C = (hadVal, oldVal) => {
-  if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
+  if (hadVal) mergeValue(oldVal, curEntry._2) else 
createCombiner(curEntry._2)
 }
-if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
-  val mapSize = currentMap.estimateSize()
-  var shouldSpill = false
-  val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
-
-  // Atomically check whether there is sufficient memory in the global 
pool for
-  // this map to grow and, if possible, allocate the required amount
-  shuffleMemoryMap.synchronized {
-val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
-val availableMemory = maxMemoryThreshold -
-  (shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
-
-// Assume map growth factor is 2x
-shouldSpill = availableMemory < mapSize * 2
-if (!shouldSpill) {
-  shuffleMemoryMap(threadId) = mapSize * 2
+
+while (entries.hasNext) {
+  curEntry = entries.next()
+  if (numPairsInMemory > trackMemoryThreshold && 
currentMap.atGrowThreshold) {
+val mapSize = currentMap.estimateSize()
+var shouldSpill = false
+val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
+
+// Atomically check whether there is sufficient memory in the 
global pool for
+// this map to grow and, if possible, allocate the required amount
+shuffleMemoryMap.synchronized {
+  val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
+  val availableMemory = maxMemoryThreshold -
+(shuffleMemoryMap.values.sum - 
previouslyOccupiedMemory.getOrElse(0L))
+
+  // Assume map growth factor is 2x
+  shouldSpill = availableMemory < mapSize * 2
+  if (!shouldSpill) {
+shuffleMemoryMap(threadId) = mapSize * 2
+  }
+}
+// Do not synchronize spills
+if (shouldSpill) {
+  spill(mapSize)
 }
   }
-  // Do not synchronize spills
-  if (shouldSpill) {
-spill(mapSize)
-  }
+  currentMap.changeValue(curEntry._1, update)
+  numPairsInMemory += 1
 }
-currentMap.changeValue(key, update)
-numPairsInMemory += 1
   }
--- End diff --

Does it makes sense to include a more collection-oriented interface, more 
like scala.collection.mutable.Buffer#insertAll:
```scala
def insertAll(coll: Iterable[Product2[K, V]]): Unit = 
insertAll(coll.iterator)
```
...so that you can do things like `externalAppendOnlyMap.insertAll(aMap)` 
instead of `externalAppendOnlyMap.insertAll(aMap.iterator)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-25 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1561#issuecomment-50201015
  
@JoshRosen Why a trait instead of an abstract class?  We're not expecting 
to need to mixin Stage outside of the Stage class hierarchy, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

2014-07-25 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1548#issuecomment-50174114
  
@YanTangZhai If you are searching for another solution and abandoning this 
PR, could you please close this PR and open a new one when you have something 
different for us to look at?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-25 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-50160338
  
After installing `hub` you can also do a bunch of new stuff on the command 
line, including `hub checkout https://github.com/apache/spark/pull/1499`

https://hub.github.com/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2529] Clean closures in foreach and for...

2014-07-24 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1583#issuecomment-50105915
  
Does anyone recall why we lost the closure cleaning in 
https://github.com/apache/spark/commit/6b288b75d4c05f42ad3612813dc77ff824bb6203 
?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1715: Ensure actor is self-contained in ...

2014-07-24 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/637#issuecomment-50057749
  
This rebases cleanly on top of https://github.com/apache/spark/pull/1561, 
so let's get that one in first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15361883
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -341,8 +336,9 @@ class DAGScheduler(
 if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
   logError("No stages registered for job " + job.jobId)
 } else {
-  stageIdToJobIds.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
-case (stageId, jobSet) =>
+  stageIdToStage.filter(s => 
registeredStages.get.contains(s._1)).foreach {
--- End diff --

Yes, and you've got it after filterKeys just like after filter.  The result 
of filterKeys is a Map[Int, Stage], and nothing needs to change within the 
foreach { ... }. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15356652
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -341,8 +336,9 @@ class DAGScheduler(
 if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
   logError("No stages registered for job " + job.jobId)
 } else {
-  stageIdToJobIds.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
-case (stageId, jobSet) =>
+  stageIdToStage.filter(s => 
registeredStages.get.contains(s._1)).foreach {
--- End diff --

What?  How does `stageIdToJobIds.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {` do more hash lookups than 
does `stageIdToStage.filter(s => registeredStages.get.contains(s._1)).foreach 
{`?  Looks the same to me: 
https://github.com/scala/scala/blob/v2.10.4/src/library/scala/collection/MapLike.scala#L230


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334407
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = "$"
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s"$prefix$javaSeparator${curId.getAndIncrement}")
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[E

[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15332026
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -56,6 +58,16 @@ private[spark] class Stage(
   val numPartitions = rdd.partitions.size
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
+
+  /** List of jobs that this stage belong to. */
+  val jobIds = new HashSet[Int]
+  /** For stages that are the final (consists of only ResultTasks), link 
to the ActiveJob. */
+  var resultOfJob: Option[ActiveJob] = None
+  var pendingTasks: Option[HashSet[Task[_]]] = None
--- End diff --

And potentially error prone.  I don't think there is anyplace left where 
the distinction between None and Some(emptySet) is useful except for the check 
in removeStage 
(https://github.com/rxin/spark/blob/dagSchedulerHashMaps/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357),
 and that's not really even useful anymore.  On the other hand, having a None 
where we expect at least Some(emptySet) can be a problem.

I think we're better off without the Option. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331819
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
+import scala.collection.mutable.HashSet
--- End diff --

I'll have to remember that one!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331775
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -355,14 +351,13 @@ class DAGScheduler(
   logDebug("Removing running stage %d".format(stageId))
   runningStages -= stage
 }
-stageToInfos -= stage
 for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
   shuffleToMapStage.remove(k)
 }
-if (pendingTasks.contains(stage) && 
!pendingTasks(stage).isEmpty) {
+if (stage.pendingTasks.isDefined && 
!stage.pendingTasks.isEmpty) {
   logDebug("Removing pending status for stage 
%d".format(stageId))
 }
-pendingTasks -= stage
+stage.pendingTasks = None
--- End diff --

The old pendingTasks needed the clearing step so that it wouldn't continue 
to have removed stages mapping to empty sets of remaining tasks.  I agree that 
a similar step shouldn't be necessary now that pendingTasks is encapsulated in 
a Stage that is going away.  In fact, in the event of resubmission don't we 
want this to _not_ be None, else stage.pendingTasks.get will fail?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1726] [SPARK-2567] Eliminate zombie sta...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1566#issuecomment-49968268
  
Makes sense.  LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1561#issuecomment-49967238
  
JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15328773
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -992,13 +974,14 @@ class DAGScheduler(
   }
 
   private[scheduler] def handleStageCancellation(stageId: Int) {
-if (stageIdToJobIds.contains(stageId)) {
-  val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
-  jobsThatUseStage.foreach(jobId => {
-handleJobCancellation(jobId, "because Stage %s was 
cancelled".format(stageId))
-  })
-} else {
-  logInfo("No active jobs to kill for Stage " + stageId)
+stageIdToStage.get(stageId) match {
+  case Some(stage) =>
+val jobsThatUseStage: Array[Int] = stage.jobIds.toArray
+jobsThatUseStage.foreach(jobId => {
+  handleJobCancellation(jobId, "because Stage %s was 
cancelled".format(stageId))
+})
--- End diff --

```scala
jobsThatUseStage.foreach { jobId => 
  handleJobCancellation(jobId, "because Stage %s was 
cancelled".format(stageId))
}
```
There are also opportunities throughout to switch from concatenated or 
formatted strings to string interpolation, which I favor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15328544
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -341,8 +336,9 @@ class DAGScheduler(
 if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
--- End diff --

update the description of `@param resultStage` above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15328468
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -341,8 +336,9 @@ class DAGScheduler(
 if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
   logError("No stages registered for job " + job.jobId)
 } else {
-  stageIdToJobIds.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
-case (stageId, jobSet) =>
+  stageIdToStage.filter(s => 
registeredStages.get.contains(s._1)).foreach {
--- End diff --

Why filter instead of filterKeys?  Looks to me like that only serves to 
make the predicate harder to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15328329
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -315,13 +309,14 @@ class DAGScheduler(
*/
   private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
 def updateJobIdStageIdMapsList(stages: List[Stage]) {
-  if (!stages.isEmpty) {
+  if (stages.nonEmpty) {
 val s = stages.head
-stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
+s.jobIds += jobId
 jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
-val parents = getParentStages(s.rdd, jobId)
-val parentsWithoutThisJobId = parents.filter(p =>
-  !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
+val parents: List[Stage] = getParentStages(s.rdd, jobId)
+val parentsWithoutThisJobId = parents.filter { p: Stage =>
+  !p.jobIds.contains(jobId)
--- End diff --

could now do this as `filter { !_.jobIds.contains(jobId) }`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15328234
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -315,13 +309,14 @@ class DAGScheduler(
*/
   private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
 def updateJobIdStageIdMapsList(stages: List[Stage]) {
-  if (!stages.isEmpty) {
+  if (stages.nonEmpty) {
 val s = stages.head
-stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
+s.jobIds += jobId
 jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
-val parents = getParentStages(s.rdd, jobId)
-val parentsWithoutThisJobId = parents.filter(p =>
-  !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
+val parents: List[Stage] = getParentStages(s.rdd, jobId)
--- End diff --

Why the type annotation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15327897
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -56,6 +58,16 @@ private[spark] class Stage(
   val numPartitions = rdd.partitions.size
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
+
+  /** List of jobs that this stage belong to. */
--- End diff --

nit: It's a Set not a List


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1528#issuecomment-49916553
  
Yeah, I'm wondering whether the actual problem is that creation and use of 
scheduler pools with different weights is unclear or too difficult; and that if 
we could resolve those issues, then the need for this PR would disappear. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2567] Resubmitted stage sometimes remai...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1516#issuecomment-49911032
  
This appears to be a reversion of d58502a1562bbfb1bb4e517ebcc8239efd639297 
while ignoring and misapplying the comment regarding ordering (which I'm not 
completely understanding.)

@xiajunluan ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2647] DAGScheduler plugs other JobSubmi...

2014-07-23 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1548#discussion_r15289573
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1202,8 +1202,12 @@ private[scheduler] class 
DAGSchedulerEventProcessActor(dagScheduler: DAGSchedule
*/
   def receive = {
 case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, 
listener, properties) =>
-  dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, 
allowLocal, callSite,
-listener, properties)
+  new Thread("JobSubmitted for " + jobId) {
+override def run() {
+  dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, 
allowLocal, callSite,
+listener, properties)
+}
+  }.start()
--- End diff --

Starting a new thread within eventProcessorActor to concurrently manipulate 
the DAGScheduler's state strikes me as a really bad idea.  The actor model and 
the DAGScheduler are built around the fundamental assumption that only one 
event will be processed at a time.  Breaking that model opens us up to numerous 
potential race conditions, and I am a long way from being convinced that this 
PR is either safe of desirable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Add caching information to rdd.toDebugString

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1535#issuecomment-49874874
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1528#issuecomment-49848309
  
Sorry, looks like you already have SPARK-2618, so change change the title 
of this PR to include that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-23 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1528#issuecomment-49848091
  
This looks like a clean implementation, but you still need to open a JIRA 
issue to explain why you want this; then edit the description of this PR to 
reference that JIRA. 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15272634
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -268,14 +264,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 }
   }
 
+  def sufficientResourcesRegistered(): Boolean = true
+
   override def isReady(): Boolean = {
-if (ready) {
+if (sufficientResourcesRegistered) {
+  logInfo("SchedulerBackend is ready for scheduling beginning" +
+", total expected resources: " + totalExpectedResources.get() +
+", minRegisteredResourcesRatio: " + minRegisteredRatio)
   return true
 }
 if ((System.currentTimeMillis() - createTime) >= 
maxRegisteredWaitingTime) {
-  ready = true
   logInfo("SchedulerBackend is ready for scheduling beginning after 
waiting " +
-"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+"maxRegisteredResourcesWaitingTime(ms): " + 
maxRegisteredWaitingTime)
--- End diff --

I'd do these two log messages with string interpolation instead of using 
`+`.  http://docs.scala-lang.org/overviews/core/string-interpolation.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1528#discussion_r15272274
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala ---
@@ -27,9 +27,18 @@ private[spark] class TaskSet(
 val tasks: Array[Task[_]],
 val stageId: Int,
 val attempt: Int,
-val priority: Int,
+val jobId: Int,
 val properties: Properties) {
 val id: String = stageId + "." + attempt
+val DEFAULT_PRIORITY: Int = 0
+
+  val priority:Int = {
+if(properties != null){
+  properties.getProperty("spark.scheduler.priority", "0").toInt
+}else{
+  DEFAULT_PRIORITY
+}
+  }
--- End diff --

Is the style checker ok with `val priority = if (...) {...` instead of `val 
priority = { if (...) {...`?  If it is, I'd rather do without the extra `{}`.  
You can also drop the `: Int` from `val DEFAULT_PRIORITY` and `val priority` -- 
the types are obvious without the annotations.  Also, I'm not sure that 
DEFAULT_PRIORITY really gains you anything -- I'd be fine with just `if (...) 
{...} else 0`.  And make sure you follow the style guide for spacing with 
parens and braces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15270909
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
   if (minRegisteredRatio > 1) minRegisteredRatio = 1
--- End diff --

Sorry, was doing something dumb.  Leave it a `var` but clean up the 
initialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15270679
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
   if (minRegisteredRatio > 1) minRegisteredRatio = 1
--- End diff --

...actually, it doesn't look like it's used at all anymore except in a log 
message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2635] Fix race condition at SchedulerBa...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15270593
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
   if (minRegisteredRatio > 1) minRegisteredRatio = 1
--- End diff --

Looks like `minRegisteredRatio` is only needed within this class and 
doesn't need to be a var:
```scala
private val minRegisteredRatio = math.min(1, 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1528#discussion_r15270239
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala ---
@@ -27,9 +27,18 @@ private[spark] class TaskSet(
 val tasks: Array[Task[_]],
 val stageId: Int,
 val attempt: Int,
-val priority: Int,
+val jobId: Int,
 val properties: Properties) {
 val id: String = stageId + "." + attempt
+val DEFAULT_PRIORITY: Int = 0
+
+  def priority:Int = {
+if(properties != null){
+  properties.getProperty("spark.scheduler.priority", "0").toInt
+}else{
+  DEFAULT_PRIORITY
+}
+  }
--- End diff --

Why the change from `val` to `def`?  I believe `val priority = if ... else 
...` will work fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1528#discussion_r15270158
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala ---
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import scala.math.Ordering.Implicits._
--- End diff --

Pulling in these implicits can have unintended consequences; that's why in 
my previous comment I kept the scope of the import as small as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15268935
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
   if (minRegisteredRatio > 1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  // Whatever minRegisteredRatio is arrived, submit tasks after the 
time(milliseconds).
--- End diff --

Ah, I see -- sorry.  Looks like this is what we want? `// Submit tasks 
after maxRegisteredWaitingTime milliseconds if minRegisteredRatio has not yet 
been reached`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Add caching information to rdd.toDebugString

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1535#discussion_r15259034
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1294,7 +1307,11 @@ abstract class RDD[T: ClassTag](
   val partitionStr = "(" + rdd.partitions.size + ")"
   val leftOffset = (partitionStr.length - 1) / 2
   val nextPrefix = (" " * leftOffset) + "|" + (" " * 
(partitionStr.length - leftOffset))
-  Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+
+  debugSelf(rdd).zipWithIndex.map{
+case (desc: String, 0) => partitionStr+" "+desc
+case (desc: String, _) => nextPrefix+" "+desc
--- End diff --

And elsewhere in this PR, avoid string concatenation with `+` when string 
interpolation would be equally clear or clearer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Add caching information to rdd.toDebugString

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1535#discussion_r15258957
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1294,7 +1307,11 @@ abstract class RDD[T: ClassTag](
   val partitionStr = "(" + rdd.partitions.size + ")"
   val leftOffset = (partitionStr.length - 1) / 2
   val nextPrefix = (" " * leftOffset) + "|" + (" " * 
(partitionStr.length - leftOffset))
-  Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+
+  debugSelf(rdd).zipWithIndex.map{
+case (desc: String, 0) => partitionStr+" "+desc
+case (desc: String, _) => nextPrefix+" "+desc
--- End diff --

s"$partitionStr $desc"
s"$nextPrefix $desc"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2490] Change recursive visiting on RDD ...

2014-07-22 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1418#issuecomment-49794355
  
Jenkins, ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1528#discussion_r15248491
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala ---
@@ -32,11 +32,21 @@ private[spark] class FIFOSchedulingAlgorithm extends 
SchedulingAlgorithm {
 val priority2 = s2.priority
 var res = math.signum(priority1 - priority2)
 if (res == 0) {
-  val stageId1 = s1.stageId
-  val stageId2 = s2.stageId
-  res = math.signum(stageId1 - stageId2)
+  val jobId1 = s1.jobId
+  val jobId2 = s2.jobId
+  res = math.signum(jobId1 - jobId2)
+  if (res == 0) {
+val stageId1 = s1.stageId
+val stageId2 = s2.stageId
+res = math.signum(stageId1 - stageId2)
+  }
+  if (res < 0) {
+true
+  } else {
+false
+  }
--- End diff --

This `if..else` doesn't actually do anything.  This whole comparator is 
needlessly complex.  If the intent is lexicographical ordering with higher 
priority ahead of lower priority, lower jobId ahead of higher jobId, and lower 
stageId ahead of higher stageId, then this should be sufficient:
```scala
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
import scala.math.Ordering.Implicits._
(s1.priority, s2.jobId, s2.stageId) > (s2.priority, s1.jobId, 
s1.stageId)
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: use config spark.scheduler.priority for specif...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1528#discussion_r15246213
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -778,8 +778,10 @@ class DAGScheduler(
   logInfo("Submitting " + tasks.size + " missing tasks from " + stage 
+ " (" + stage.rdd + ")")
   myPending ++= tasks
   logDebug("New pending tasks: " + myPending)
+  val priority:String = 
properties.getProperty("spark.scheduler.priority", "0")
   taskScheduler.submitTasks(
-new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), 
stage.jobId, properties))
+new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), 
stage.jobId, priority.toInt,
+  properties))
--- End diff --

`properties` is already being passed to the `TaskSet` ctor, so I'd prefer 
that extraction of `priority` happen there or elsewhere instead of doing 
`properties.getProperty` here and adding another parameter to the `TaskSet` 
ctor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2490] Change recursive visiting on RDD ...

2014-07-22 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1418#issuecomment-49773532
  
ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15242315
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -108,4 +108,8 @@ private[spark] class SparkDeploySchedulerBackend(
 logInfo("Executor %s removed: %s".format(fullId, message))
 removeExecutor(fullId.split("/")(1), reason.toString)
   }
+
+  override def checkRegisteredResources(): Boolean = {
--- End diff --

or `sufficientResourcesRegistered`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15242266
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 ---
@@ -108,4 +108,8 @@ private[spark] class SparkDeploySchedulerBackend(
 logInfo("Executor %s removed: %s".format(fullId, message))
 removeExecutor(fullId.split("/")(1), reason.toString)
   }
+
+  override def checkRegisteredResources(): Boolean = {
--- End diff --

I'd prefer the name to indicate what condition is being checked, so 
something like `sufficientRegisteredResources`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix race condition at SchedulerBackend.isReady...

2014-07-22 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1525#discussion_r15240513
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
-  var totalExpectedExecutors = new AtomicInteger(0)
+  var totalExecutors = new AtomicInteger(0)
+  var totalExpectedResources = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered executors / total expected 
executors) 
+  // Submit tasks only after (registered resources / total expected 
resources) 
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+  var minRegisteredRatio = 
conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)
   if (minRegisteredRatio > 1) minRegisteredRatio = 1
-  // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after 
the time(milliseconds).
+  // Whatever minRegisteredRatio is arrived, submit tasks after the 
time(milliseconds).
--- End diff --

// Submit tasks time(milliseconds) after minRegisteredRatio is reached


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

2014-07-17 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1362#discussion_r15076386
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1107,7 +1106,6 @@ class DAGScheduler(
 case shufDep: ShuffleDependency[_, _, _] =>
   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
   if (!mapStage.isAvailable) {
-visitedStages += mapStage
--- End diff --

@mateiz It looks to me that this is what was always intended, but has 
actually been missing the `visitedStages` check for the past couple of years:
```scala
if (!mapStage.isAvailable && !visitedStages(mapStage)) {
  visitedStages += mapStage
  visit(mapStage.rdd)
}  // Otherwise there's no need to follow the dependency back
```
Making that change works for me in the sense that all of the test suites 
continue to pass, but I haven't yet got any relative performance numbers.

Proabably needs to go in a separate JIRA & PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

2014-07-16 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1362#discussion_r15041791
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1107,7 +1106,6 @@ class DAGScheduler(
 case shufDep: ShuffleDependency[_, _, _] =>
   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
   if (!mapStage.isAvailable) {
-visitedStages += mapStage
--- End diff --

Yup, it's odd -- and it looks to me like it has been that way since this 
section of code was introduced.  Looks like there should be an `if 
(!visitedStages(mapStage))` guarding `visit(mapStage.rdd)`... testing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-695] In DAGScheduler's getPreferredLocs...

2014-07-16 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1362#discussion_r15040842
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1107,7 +1106,6 @@ class DAGScheduler(
 case shufDep: ShuffleDependency[_, _, _] =>
   val mapStage = getShuffleMapStage(shufDep, stage.jobId)
   if (!mapStage.isAvailable) {
-visitedStages += mapStage
--- End diff --

`visitStages` is built up and used repeatedly in the recursive calls to 
`visit` 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1101


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Async in progress

2014-07-16 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1449#issuecomment-49241702
  
Please create a JIRA issue and a description for this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2519. Eliminate pattern-matching on Tupl...

2014-07-16 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1435#issuecomment-49195389
  
Got it.  Thanks.  That also helps to put some bound (for now) on where we 
will make such performance optimizations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2519. Eliminate pattern-matching on Tupl...

2014-07-16 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1435#issuecomment-49174657
  
Hmmm... not sure that I would go so far as to call it "nice".  This does 
make the code slightly more difficult to read and understand, so can we hope 
that you've got some relative performance numbers that justify this compromise, 
@sryza ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14849312
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.sql.catalyst.expressions._
+
+/**
+ * Generates byte code that produces a [[MutableRow]] object that can 
update itself based on a new
+ * input [[Row]] for a fixed set of [[Expression Expressions]].
+ */
+object GenerateMutableProjection extends CodeGenerator {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  // TODO: Should be weak references... bounded in size.
+  val projectionCache = new collection.mutable.HashMap[Seq[Expression], () 
=> MutableProjection]
+
+  def apply(expressions: Seq[Expression], inputSchema: Seq[Attribute]): 
(() => MutableProjection) =
+apply(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  // TODO: Safe to fire up multiple instances of the compiler?
+  def apply(expressions: Seq[Expression]): () => MutableProjection =
+globalLock.synchronized {
+  val cleanedExpressions = expressions.map(ExpressionCanonicalizer(_))
+  projectionCache.getOrElseUpdate(cleanedExpressions, 
createProjection(cleanedExpressions))
+}
+
+  val mutableRowName = newTermName("mutableRow")
+
+  def createProjection(expressions: Seq[Expression]): (() => 
MutableProjection) = {
--- End diff --

Should these `create*` functions be public?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14848366
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator extends Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  val rowType = typeOf[Row]
+  val mutableRowType = typeOf[MutableRow]
+  val genericRowType = typeOf[GenericRow]
+  val genericMutableRowType = typeOf[GenericMutableRow]
+
+  val projectionType = typeOf[Projection]
+  val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeperator = "$"
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}")
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm An possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns the code required to determine both 
if the result is NULL
+   * as well as the code required to compute the value.
+   */
+  def expressionEvaluator(e: Expression): EvaluatedExpression = {
+val primitiveTerm = freshName("primitiveTerm")
+val nullTerm = freshName("nullTerm")
+val objectTerm = freshName("objectTerm")
+
+implicit class Evaluate1(e: Expression) {
+  def castOrNull(f: TermName => Tree, dataType: DataType): Seq[Tree] = 
{
+val eval = expressionEvaluator(e)
+eval.code ++
+  q"""
+  val $nullTerm = ${eval.nullTerm}
+  val $primitiveTerm =
+if($nullTerm)
+  ${defaultPrimitive(dataType)}
+else
+  ${f(eval.primitiveTerm)}
+""".children
+  }
+}
+
+implicit class Evaluate2(expressions: (Expression, Expression)) {
+
+  /**
+   * Short hand for generating binary evaluation code, which depends 
on two sub-evaluations of
+   * the same type.  If either of the sub-expressions is null, the 
results of this computation
+   * is a

[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14847035
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator extends Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  val rowType = typeOf[Row]
+  val mutableRowType = typeOf[MutableRow]
+  val genericRowType = typeOf[GenericRow]
+  val genericMutableRowType = typeOf[GenericMutableRow]
+
+  val projectionType = typeOf[Projection]
+  val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeperator = "$"
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}")
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm An possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns the code required to determine both 
if the result is NULL
+   * as well as the code required to compute the value.
+   */
+  def expressionEvaluator(e: Expression): EvaluatedExpression = {
+val primitiveTerm = freshName("primitiveTerm")
+val nullTerm = freshName("nullTerm")
+val objectTerm = freshName("objectTerm")
+
+implicit class Evaluate1(e: Expression) {
+  def castOrNull(f: TermName => Tree, dataType: DataType): Seq[Tree] = 
{
+val eval = expressionEvaluator(e)
+eval.code ++
+  q"""
+  val $nullTerm = ${eval.nullTerm}
+  val $primitiveTerm =
+if($nullTerm)
+  ${defaultPrimitive(dataType)}
+else
+  ${f(eval.primitiveTerm)}
+""".children
--- End diff --

Is this really the way we want to format `q"""`?  To my eye, it would be 
much more readable as:
```scala
q"""
  val $nullTerm = ${eval1.nullTerm} || ${eval2.nullTerm}
  val $primitiveTerm: ${termForType(resultType)} =
if($nullTerm) {
  ${defaultPrimitive(resultType)}
 

[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14846216
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator extends Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  val rowType = typeOf[Row]
+  val mutableRowType = typeOf[MutableRow]
+  val genericRowType = typeOf[GenericRow]
+  val genericMutableRowType = typeOf[GenericMutableRow]
+
+  val projectionType = typeOf[Projection]
+  val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeperator = "$"
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}")
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm An possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns the code required to determine both 
if the result is NULL
+   * as well as the code required to compute the value.
--- End diff --

Description of the output seems less than precise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14846043
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator extends Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  val rowType = typeOf[Row]
+  val mutableRowType = typeOf[MutableRow]
+  val genericRowType = typeOf[GenericRow]
+  val genericMutableRowType = typeOf[GenericMutableRow]
+
+  val projectionType = typeOf[Projection]
+  val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeperator = "$"
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s"$prefix$javaSeperator${curId.getAndIncrement}")
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm An possibly boxed version of the result of 
evaluating this expression.
--- End diff --

s/A/An


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14845309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
--- End diff --

sp. referring


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r14845258
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code that performs expression 
evaluation.  Includes helpers
+ * for refering to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator extends Logging {
+  import scala.reflect.runtime.{universe => ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  val rowType = typeOf[Row]
+  val mutableRowType = typeOf[MutableRow]
+  val genericRowType = typeOf[GenericRow]
+  val genericMutableRowType = typeOf[GenericMutableRow]
+
+  val projectionType = typeOf[Projection]
+  val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeperator = "$"
--- End diff --

sp. separator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...

2014-07-10 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1360#issuecomment-48643262
  
@aarondav


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...

2014-07-10 Thread markhamstra
GitHub user markhamstra opened a pull request:

https://github.com/apache/spark/pull/1360

SPARK-2425 Don't kill a still-running Application because of some 
misbehaving Executors 

Introduces a LOADING -> RUNNING ApplicationState transition and prevents 
Master from removing an Application with RUNNING Executors.

Two basic changes: 1) Instead of allowing MAX_NUM_RETRY abnormal Executor 
exits over the entire lifetime of the Application, allow that many since any 
Executor successfully began running the Application; 2) Don't remove the 
Application while Master still thinks that there are RUNNING Executors.

This should be fine as long as the ApplicationInfo doesn't believe any 
Executors are forever RUNNING when they are not.  I think that any non-RUNNING 
Executors will eventually no longer be RUNNING in Master's accounting, but 
another set of eyes should confirm that.  This PR also doesn't try to detect 
which nodes have gone rogue or to kill off bad Workers, so repeatedly failing 
Executors will continue to fail and fill up log files with failure reports as 
long as the Application keeps running.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markhamstra/spark SPARK-2425

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1360.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1360


commit 5b85534d376d682b7e1f97f98acd532a305349f8
Author: Mark Hamstra 
Date:   2014-07-09T23:02:43Z

SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition
and prevent Master from removing Application with RUNNING Executors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-07-03 Thread markhamstra
Github user markhamstra closed the pull request at:

https://github.com/apache/spark/pull/686


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLLIB] SPARK-2329 Add multi-label evaluation ...

2014-07-01 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1270#discussion_r14428640
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/evaluation/MultilabelMetrics.scala 
---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+
+/**
+ * Evaluator for multilabel classification.
+ * NB: type Double both for prediction and label is retained
+ * for compatibility with model.predict that returns Double
+ * and MLUtils.loadLibSVMFile that loads class labels as Double
+ *
+ * @param predictionAndLabels an RDD of (predictions, labels) pairs, both 
are non-null sets.
+ */
+class MultilabelMetrics(predictionAndLabels:RDD[(Set[Double], 
Set[Double])]) extends Logging{
+
+  private lazy val numDocs = predictionAndLabels.count
+
+  private lazy val numLabels = predictionAndLabels.flatMap{case(_, labels) 
=> labels}.distinct.count
+
+  /**
+   * Returns strict Accuracy
+   * (for equal sets of labels)
+   * @return strictAccuracy.
+   */
+  lazy val strictAccuracy = predictionAndLabels.filter{case(predictions, 
labels) =>
+predictions == labels}.count.toDouble / numDocs
+
+  /**
+   * Returns Accuracy
+   * @return Accuracy.
+   */
+  lazy val accuracy = predictionAndLabels.map{ case(predictions, labels) =>
+labels.intersect(predictions).size.toDouble / 
labels.union(predictions).size}.
+fold(0.0)(_ + _) / numDocs
+
--- End diff --

After ``import org.apache.spark.SparkContext._``, it should already be 
there as an implicit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r14211667
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1062,10 +1062,15 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
+  
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+}
--- End diff --

Ok, either tonight or tomorrow I can update this PR to reflect that 
strategy, or you can go ahead and make the change @pwendell.  Outside the 
immediate scope of this PR, what prevents Mesos from being able to kill tasks, 
and when do we expect that to change? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r14041700
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1062,10 +1062,15 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
+  
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+}
--- End diff --

Do we have the meaning of all the listener events fully documented 
someplace?  Or perhaps that needs to be done in a separate PR and then 
DAGScheduler updated to match the documented expectation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r14041296
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1062,10 +1062,15 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
+  
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+}
--- End diff --

What you suggest could be done, but there's a question of whether or not 
notification of cancellation of the job should be made regardless of whether 
any stages and task are successfully cancelled as a consequence.  I don't 
really know how to answer that because I don't  know how all of the listeners 
are using the notification or whether they are all expecting the same semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r14040631
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1062,10 +1062,15 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
+  
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+}
--- End diff --

Hmmm... not sure that I agree.  A job being cancelled, stages being 
cancelled, and tasks being cancelled are all different things.  The expectation 
is that job cancellation will lead to cancellation of independent stages and 
their associated tasks; but if no stages and tasks get cancelled, it's probably 
still worthwhile for the information to be sent that the job itself was 
cancelled.  I expect that eventually all of the backends will support task 
killing, so this whole no-kill path should never be hit.  But moving the job 
cancellation notification within the try-to-cancelTasks block will result in 
multiple notifications that the parent job was cancelled -- one for each 
independent stage cancellation.  Or am I misunderstanding what you are 
suggesting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-20 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r14032954
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -313,6 +314,47 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
 assertDataStructuresEmpty
   }
 
+  test("job cancellation no-kill backend") {
+// make sure that the DAGScheduler doesn't crash when the TaskScheduler
+// doesn't implement killTask()
+val noKillTaskScheduler = new TaskScheduler() {
+  override def rootPool: Pool = null
+  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+  override def start() = {}
+  override def stop() = {}
+  override def submitTasks(taskSet: TaskSet) = {
+// normally done by TaskSetManager
+taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
--- End diff --

Sure, doing nothing is easy.


On Fri, Jun 20, 2014 at 10:47 AM, Kay Ousterhout 
wrote:

> In core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
>
> > @@ -313,6 +314,47 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
> >  assertDataStructuresEmpty
> >}
> >
> > +  test("job cancellation no-kill backend") {
> > +// make sure that the DAGScheduler doesn't crash when the 
TaskScheduler
> > +// doesn't implement killTask()
> > +val noKillTaskScheduler = new TaskScheduler() {
> > +  override def rootPool: Pool = null
> > +  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
> > +  override def start() = {}
> > +  override def stop() = {}
> > +  override def submitTasks(taskSet: TaskSet) = {
> > +// normally done by TaskSetManager
> > +taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
>
> are these lines necessary (can you just do nothing here?)
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/686/files#r14031997>.
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-20 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/686#issuecomment-46684950
  
ping: This should go into 1.0.1
@pwendell


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Branch 1.0 Add ZLIBCompressionCodec code

2014-06-18 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1115#issuecomment-46479466
  
Yes, this PR is not in a useful state right now.  It's hard to even find 
the proposed changes because of all the clutter of unnecessary commits, but it 
looks to me that you are creating new API?  If that is the case, then the 
rebase definitely needs to be against master, since branch-1.0 is a maintenance 
branch for bug-fixes, not new API.  Additionally, you should create a JIRA 
issue explaining what this PR is trying to address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2060][SQL] Querying JSON Datasets with ...

2014-06-18 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/999#issuecomment-46436035
  
Hmmm, that doesn't precisely match my recollection or understanding.  
Certainly we discussed that alpha components aren't required to maintain a 
stable API, but I don't recall an explicit decision that changes to alpha 
components would routinely be merged back into maintenance releases.  I could 
be mistaken, and merging new alpha API into maintenance branches may be the 
right strategy, but this did take me a little by surprise. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2060][SQL] Querying JSON Datasets with ...

2014-06-17 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/999#issuecomment-46389597
  
Is that the basic strategy we are going to use with AlphaComponents -- 
merging new APIs at both the minor and maintenance levels?  I don't know that I 
have any objection to that, but I don't recall any discussion directly on 
point, and this is the first such addition that has been made to branch-1.0 
while I was paying attention.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2158 Clean up core/stdout file from File...

2014-06-16 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1100#issuecomment-46265806
  
jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2158 Clean up core/stdout file from File...

2014-06-16 Thread markhamstra
GitHub user markhamstra opened a pull request:

https://github.com/apache/spark/pull/1100

SPARK-2158 Clean up core/stdout file from FileAppenderSuite 

@tdas

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markhamstra/spark SPARK-2158

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1100


commit d95b49c830b5b2ed99842845260a050b0e519d80
Author: Mark Hamstra 
Date:   2014-06-16T23:12:39Z

Cleanup 'stdout' file within FileAppenderSuite




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1715: Ensure actor is self-contained in ...

2014-06-14 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/637#issuecomment-46104906
  
It means that the check for binary compatibility after your patch is 
applied has failed because the checker thinks that there previously was a 
default/automatic setter method for the dagScheduler var in SparkContext and 
that there isn't one anymore after applying your patch.

See SPARK-2069   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SQL] Update SparkSQL and ScalaTest in branch-...

2014-06-13 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/1078#issuecomment-46071589
  
FYI Bumping all the way to the current scalatest 2.2.0 also works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [Spark 2060][SQL] Querying JSON Datasets with ...

2014-06-07 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/999#discussion_r13520869
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/json/JsonTable.scala 
---
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.json
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.SchemaRDD
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.expressions.{Alias, 
AttributeReference, GetField}
+
+import com.fasterxml.jackson.databind.ObjectMapper
+
+import scala.collection.JavaConversions._
+import scala.math.BigDecimal
+import org.apache.spark.sql.catalyst.expressions.GetField
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.GetField
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.types.StructField
+import org.apache.spark.sql.catalyst.types.StructType
+import org.apache.spark.sql.catalyst.types.ArrayType
+import org.apache.spark.sql.catalyst.expressions.GetField
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.catalyst.expressions.Alias
+
+sealed trait SchemaResolutionMode
+
+case object EAGER_SCHEMA_RESOLUTION extends SchemaResolutionMode
+case class EAGER_SCHEMA_RESOLUTION_WITH_SAMPLING(val fraction: Double) 
extends SchemaResolutionMode
+case object LAZY_SCHEMA_RESOLUTION extends SchemaResolutionMode
+
+/**
+ * :: Experimental ::
+ * Converts a JSON file to a SparkSQL logical query plan.  This 
implementation is only designed to
+ * work on JSON files that have mostly uniform schema.  The conversion 
suffers from the following
+ * limitation:
+ *  - The data is optionally sampled to determine all of the possible 
fields. Any fields that do
+ *not appear in this sample will not be included in the final output.
+ */
+@Experimental
+object JsonTable extends Serializable with Logging {
+  def inferSchema(
+  json: RDD[String], sampleSchema: Option[Double] = None): LogicalPlan 
= {
+val schemaData = sampleSchema.map(json.sample(false, _, 
1)).getOrElse(json)
+val allKeys = 
parseJson(schemaData).map(getAllKeysWithValueTypes).reduce(_ ++ _)
+
+// Resolve type conflicts
+val resolved = allKeys.groupBy {
+  case (key, dataType) => key
+}.map {
+  // Now, keys and types are organized in the format of
+  // key -> Set(type1, type2, ...).
+  case (key, typeSet) => {
+val fieldName = key.substring(1, key.length - 1).split("`.`").toSeq
+val dataType = typeSet.map {
+  case (_, dataType) => dataType
+}.reduce((type1: DataType, type2: DataType) => 
getCompatibleType(type1, type2))
+
+// Finally, we replace all NullType to StringType. We do not need 
to take care
+// StructType because all fields with a StructType are represented 
by a placeholder
+// StructType(Nil).
+dataType match {
+  case NullType => (fieldName, StringType)
+  case ArrayType(NullType) => (fieldName, ArrayType(StringType))
+  case other => (fieldName, other)
+}
+  }
+}
+
+def m

[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD

2014-06-04 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/369#discussion_r13423508
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag](
   def ++(other: RDD[T]): RDD[T] = this.union(other)
 
   /**
+   * Return this RDD sorted by the given key function.
+   */
+  def sortBy[K](
+  f: (T) ⇒ K,
+  ascending: Boolean = true,
+  numPartitions: Int = this.partitions.size)
+  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
--- End diff --

https://issues.apache.org/jira/browse/SPARK-1540

https://github.com/apache/spark/commit/640f9a0efefd42cff86aecd4878a3a57f5ae85fa


On Wed, Jun 4, 2014 at 8:14 PM, Wenchen Fan 
wrote:

> In core/src/main/scala/org/apache/spark/rdd/RDD.scala:
>
> > @@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag](
> >def ++(other: RDD[T]): RDD[T] = this.union(other)
> >
> >/**
> > +   * Return this RDD sorted by the given key function.
> > +   */
> > +  def sortBy[K](
> > +  f: (T) ⇒ K,
> > +  ascending: Boolean = true,
> > +  numPartitions: Int = this.partitions.size)
> > +  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
>
> @markhamstra <https://github.com/markhamstra> I checked RDD.scala and
> found the style you talked about. But one thing I don't understand is:def
> countByValue()(implicit ord: Ordering[T] = null). I can't find the use of
> implicit Ordering[T] anywhere inside the method, and I compiled spark
> successfully with this implicit Ordering[T] removed. Did I miss something
> here?
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/369/files#r13422474>.
>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD

2014-06-04 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/369#discussion_r13419732
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag](
   def ++(other: RDD[T]): RDD[T] = this.union(other)
 
   /**
+   * Return this RDD sorted by the given key function.
+   */
+  def sortBy[K](
+  f: (T) ⇒ K,
+  ascending: Boolean = true,
+  numPartitions: Int = this.partitions.size)
+  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
--- End diff --

If I understand what you are claiming, then I think you are mistaken.  
There is only one implicit parameter list, though, so I don't think you can 
supply an argument for only one of the implicits:
```
scala> def foo[A: ClassTag](implicit ord: Ordering[A]) = 
(implicitly[ClassTag[A]], ord)
foo: [A](implicit evidence$1: scala.reflect.ClassTag[A], implicit ord: 
scala.math.Ordering[A])(scala.reflect.ClassTag[A], scala.math.Ordering[A])

scala> foo[Int]
res1: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = 
(Int,scala.math.Ordering$Int$@5e10a811)

scala> foo[Int](scala.reflect.classTag[Int], Ordering[Int])
res2: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = 
(Int,scala.math.Ordering$Int$@5e10a811)

scala> foo[Int](scala.reflect.classTag[Int], Ordering[Int].reverse)
res3: (scala.reflect.ClassTag[Int], scala.math.Ordering[Int]) = 
(Int,scala.math.Ordering$$anon$4@7293296)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-06-04 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/686#issuecomment-45113730
  
@rxin merge to 1.0.1 and 1.1.0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1063 Add .sortBy(f) method on RDD

2014-06-04 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/369#discussion_r13387198
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -423,6 +423,18 @@ abstract class RDD[T: ClassTag](
   def ++(other: RDD[T]): RDD[T] = this.union(other)
 
   /**
+   * Return this RDD sorted by the given key function.
+   */
+  def sortBy[K](
+  f: (T) ⇒ K,
+  ascending: Boolean = true,
+  numPartitions: Int = this.partitions.size)
+  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
--- End diff --

That substitution certainly could be made since it is functionally 
equivalent.  Whether it should be made is mostly a question of style.  Since 
`ord` and `ctag` are not used explicitly here, there is no real use to 
enumerating them as implicit parameters; but the rest of RDD.scala doesn't use 
the context bound sugar, so there is some value in consistently maintaining 
that style. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1997] update breeze to version 0.8.1

2014-06-02 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/940#issuecomment-44914945
  
Neither does spark 1.0.0.

We've offered no guarantee that any spark 1.x will work with scala 2.11.  
If it turns out that we can't cross-compile for scala 2.10 and 2.11 with 
essentially the same source code and dependencies and preserving our intention 
to preserve binary compatibility, then that may be the occasion that forces the 
beginning of spark 2.0 development.

In any event, the breaking of our intent to preserve binary compatibility 
is no small matter and shouldn't slip into a PR without considerable 
discussion.  I'm not certain that that is what is happening with this PR, but I 
do think that it is critical that we understand how strong our checks for 
binary compatibility are and what are the bounds on dependency upgrades that we 
can make while preserving that compatibility.   


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP] update breeze to version 0.8.1

2014-06-02 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/940#issuecomment-44889156
  
We shouldn't, so I think we maintain source compatibility without any 
trouble.  Are the MIMA checks good enough to catch binary incompatibility when 
we make significant changes to library dependencies?  Not that I am asserting 
that this Breeze change is particularly significant, since I haven't look at 
the diff from 0.7.  I'm more interested in the general case of what needs to be 
checked or done when we want to update dependencies while staying within the 
bounds of 1.x. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP] update breeze to version 0.8.1

2014-06-02 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/940#issuecomment-44861308
  
What is the reason for this change, and how does it affect our intention to 
maintain binary compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-27 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r13106282
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1055,10 +1055,16 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+} finally {
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
--- End diff --

Upon further review, I can't see any use (other than misleading the user) 
for posting stage cancellation to the SparkListenerBus when the 
SchedulerBackend does not support cancellation, so we won't do that.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1938] [SQL] ApproxCountDistinctMergeFun...

2014-05-27 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/893#issuecomment-44309000
  
@ash211 Makes sense to me -- which doesn't necessarily mean a lot in this 
unfamiliar area of the code  It looks to me like the dataType for each of 
CountDistinct, ApproxCountDistinctMerge and ApproxCountDistinct should be 
LongType.

And while we are in this file, AggregateFunction#eval(input: Row): Any 
doesn't actually override anything and looks to me to be superfluous.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1868: Users should be allowed to cogroup...

2014-05-20 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/813#issuecomment-43663175
  
To throw another wrench into the Union analogy, there is also the 
little-used SparkContext#union, which has signatures for both Seq[RDD[T]] and 
varags RDD[T].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-15 Thread markhamstra
GitHub user markhamstra opened a pull request:

https://github.com/apache/spark/pull/686

[SPARK-1749] Job cancellation when SchedulerBackend does not implement 
killTask

It turns out that having the DAGScheduler tell the taskScheduler to 
cancelTasks when the backend does not implement killTask (e.g. Mesos) is not 
such a good idea.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markhamstra/spark SPARK-1749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #686


commit 57fe87e75912cc9cd7ae9e5a0ed3f4236d46b80f
Author: Mark Hamstra 
Date:   2014-05-07T22:56:27Z

Catch UnsupportedOperationException when DAGScheduler tries to
cancel a job on a SchedulerBackend that does not implement killTask




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r12497895
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1148,7 +1154,11 @@ private[scheduler] class 
DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
   case x: Exception =>
 logError("eventProcesserActor failed due to the error %s; shutting 
down SparkContext"
   .format(x.getMessage))
-dagScheduler.doCancelAllJobs()
+try {
+  dagScheduler.doCancelAllJobs()
+} catch {
+  case t: Throwable => logError("DAGScheduler failed to cancel all 
jobs.", t)
--- End diff --

https://github.com/apache/spark/pull/715?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-15 Thread markhamstra
Github user markhamstra commented on the pull request:

https://github.com/apache/spark/pull/686#issuecomment-42495440
  
The INFO log should include the information that tasks were not cancelled.  
Where/how else do you want to see notification of those facts?  Is adding more 
Listener events something we want to contemplate still in 1.0.0, or should 
something like that go into 1.1? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r12498098
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1148,7 +1154,11 @@ private[scheduler] class 
DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
   case x: Exception =>
 logError("eventProcesserActor failed due to the error %s; shutting 
down SparkContext"
   .format(x.getMessage))
-dagScheduler.doCancelAllJobs()
+try {
+  dagScheduler.doCancelAllJobs()
+} catch {
+  case t: Throwable => logError("DAGScheduler failed to cancel all 
jobs.", t)
--- End diff --

@aarondav So shall I just back off to `case e: Exception =>` here and let 
Throwable be picked up in a larger refactoring of Akka exception handling?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1749] Job cancellation when SchedulerBa...

2014-05-15 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/686#discussion_r12409225
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1055,10 +1055,16 @@ class DAGScheduler(
   // This is the only job that uses this stage, so fail the stage 
if it is running.
   val stage = stageIdToStage(stageId)
   if (runningStages.contains(stage)) {
-taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-val stageInfo = stageToInfos(stage)
-stageInfo.stageFailed(failureReason)
-
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, shouldInterruptThread)
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
+} finally {
+  val stageInfo = stageToInfos(stage)
+  stageInfo.stageFailed(failureReason)
--- End diff --

Good question.  I'm mostly just trying to keep the DAGScheduler in a 
consistent state even when the backend doesn't support killing tasks, and I'll 
admit to working quickly while trying to get this significant bug fix into 
1.0.0, not having fully thought this part through.  If you can't see any use 
for the finally block unless taskScheduler.cancelTasks is successful, then we 
can drop the finally. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3   4   5   6   7   8   >