[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331043
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -355,14 +351,13 @@ class DAGScheduler(
   logDebug(Removing running stage %d.format(stageId))
   runningStages -= stage
 }
-stageToInfos -= stage
 for ((k, v) - shuffleToMapStage.find(_._2 == stage)) {
   shuffleToMapStage.remove(k)
 }
-if (pendingTasks.contains(stage)  
!pendingTasks(stage).isEmpty) {
+if (stage.pendingTasks.isDefined  
!stage.pendingTasks.isEmpty) {
   logDebug(Removing pending status for stage 
%d.format(stageId))
 }
-pendingTasks -= stage
+stage.pendingTasks = None
--- End diff --

Is it necessary to clear this?  Aren't all references to the Stage removed 
here anyway?  Is the issue that you want this cleared in case the stage is 
resubmitted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331051
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -225,9 +374,18 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   return ResultWithDroppedBlocks(success = false, droppedBlocks)
 }
 
-if (maxMemory - currentMemory  space) {
+// Take into account the amount of memory currently occupied by 
unrolling blocks
+val freeSpace =
+  if (!unrolling) {
+val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
+unrollMemoryMap.synchronized { freeMemory - 
unrollMemoryMap.values.sum }
--- End diff --

If you remove the separate locking on unrollMemoryMap you can get rid of 
the synchronized here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1188#discussion_r15331036
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---
@@ -99,19 +99,30 @@ private[ui] class StageTableBase(
 {s.name}
   /a
 
+val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions  0)
 val details = if (s.details.nonEmpty) {
   span 
onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
 class=expand-details
-+show details
-  /span
-  pre class=stage-details collapsed{s.details}/pre
++call stack
+  /span ++
+  div class=stage-details collapsed
+{if (cachedRddInfos.nonEmpty) {
+  strongPersisted RDD:/strong ++
+  cachedRddInfos.map { i =
+a 
href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)}
+  {i.name}
+/a
+  }
+}}
+pre{s.details}/pre
+  /div
 }
 
 val stageDataOption = listener.stageIdToData.get(s.stageId)
 // Too many nested map/flatMaps with options are just annoying to 
read. Do this imperatively.
 if (stageDataOption.isDefined  
stageDataOption.get.description.isDefined) {
   val desc = stageDataOption.get.description
-  divem{desc}/em/divdiv{nameLink} {killLink}/div
+  divem{desc}/em/divdiv{killLink} {nameLink} {details}/div
--- End diff --

Is this a separate fix here? (looks like we didn't correctly add details in 
this case)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331070
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -215,7 +361,10 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
*
* Return whether there is enough free space, along with the blocks 
dropped in the process.
*/
-  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): 
ResultWithDroppedBlocks = {
+  private def ensureFreeSpace(
+  blockIdToAdd: BlockId,
+  space: Long,
+  unrolling: Boolean = false): ResultWithDroppedBlocks = {
--- End diff --

Why is there a boolean parameter for unrolling? Some threads might be 
unrolling stuff while others are doing put() for other reasons, so we need to 
account for the unroll memory the same way both times to avoid overcommitting 
it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread nevillelyh
Github user nevillelyh commented on a diff in the pull request:

https://github.com/apache/spark/pull/1188#discussion_r15331106
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---
@@ -99,19 +99,30 @@ private[ui] class StageTableBase(
 {s.name}
   /a
 
+val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions  0)
 val details = if (s.details.nonEmpty) {
   span 
onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
 class=expand-details
-+show details
-  /span
-  pre class=stage-details collapsed{s.details}/pre
++call stack
+  /span ++
+  div class=stage-details collapsed
+{if (cachedRddInfos.nonEmpty) {
+  strongPersisted RDD:/strong ++
+  cachedRddInfos.map { i =
+a 
href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)}
+  {i.name}
+/a
+  }
+}}
+pre{s.details}/pre
+  /div
 }
 
 val stageDataOption = listener.stageIdToData.get(s.stageId)
 // Too many nested map/flatMaps with options are just annoying to 
read. Do this imperatively.
 if (stageDataOption.isDefined  
stageDataOption.get.description.isDefined) {
   val desc = stageDataOption.get.description
-  divem{desc}/em/divdiv{nameLink} {killLink}/div
+  divem{desc}/em/divdiv{killLink} {nameLink} {details}/div
--- End diff --

Yeah I reused SPARK-2035, the ticket that introduced details tab.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331108
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -215,7 +361,10 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
*
* Return whether there is enough free space, along with the blocks 
dropped in the process.
*/
-  private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): 
ResultWithDroppedBlocks = {
+  private def ensureFreeSpace(
+  blockIdToAdd: BlockId,
+  space: Long,
+  unrolling: Boolean = false): ResultWithDroppedBlocks = {
--- End diff --

If you want a different calculation of how much space to ensure, it would 
be better to just pass a different value of space when we're unrolling. That 
way this method always does the same thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 ---
@@ -29,6 +29,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
 
   override def eval(input: Row): Any = {
 children.size match {
+  case 0 = function.asInstanceOf[() = Any]()
--- End diff --

this is for another time, but if you add an explicitly init to expressions, 
we can move all of these branches from the inner loop (once per row) directly 
to the outer loop (once per partition).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331160
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
--- End diff --

we should comment on the behavior of this cache so readers don't have to go 
read Guava documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-24 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/1056#issuecomment-49970751
  
I don't entirely understand the advantage of having a separate 
PartialTaskMetrics.  Ultimately every field of TaskMetrics except for maybe 
shuffleFinishTime will be able to be updated incrementally.  Any time the 
driver is dealing with a TaskMetrics, the context makes it pretty clear on 
whether it's full or not.  We do already have code that touches the TaskMetrics 
from multiple threads.  For example, the ShuffleReadMetrics gets updated like 
this.  I think you're right that I've missed some race conditions - we probably 
need to clone the metrics in a synchronized block before sending them off, but 
I'm not convinced that a superclass makes these easier to fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331171
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either an array with the contents of the entire 
block or an iterator
+   * containing the values of the block (if the array would have exceeded 
available memory).
+   */
+  def unrollSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+// Number of elements unrolled so far
+var elementsUnrolled = 0
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-thread memory to request for unrolling blocks (bytes). 
Exposed for testing.
+val initialMemoryThreshold = 
conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024)
+// How often to check whether we need to request more memory. Exposed 
for testing.
+val memoryCheckPeriod = 
conf.getLong(spark.storage.unrollCheckPeriod, 16)
+// Memory currently reserved by this thread (bytes)
+var memoryThreshold = initialMemoryThreshold
+// Memory to request as a multiple of current vector size
+val memoryGrowthFactor = 1.5
+
+val threadId = Thread.currentThread().getId
+val unrollMemoryMap = SparkEnv.get.unrollMemoryMap
+var vector = new SizeTrackingVector[Any]
+
+// Request memory for our vector and return whether request is granted.
+// This involves synchronizing across all threads and can be expensive 
if called frequently.
+def requestMemory(memoryToRequest: Long): Boolean = {
+  unrollMemoryMap.synchronized {
+val previouslyOccupiedMemory = 
unrollMemoryMap.get(threadId).getOrElse(0L)
+val otherThreadsMemory = unrollMemoryMap.values.sum - 
previouslyOccupiedMemory
+val availableMemory = freeMemory - otherThreadsMemory
+val granted = availableMemory  memoryToRequest
+if (granted) { unrollMemoryMap(threadId) = memoryToRequest }
+granted
+  }
+}
+
+// Request enough memory to begin unrolling
+keepUnrolling = requestMemory(memoryThreshold)
+
+// Unroll this block safely, checking whether we have exceeded our 
threshold periodically
+try {
+  while (values.hasNext  keepUnrolling) {
+vector += values.next()
+if (elementsUnrolled % memoryCheckPeriod == 0) {
+  // If our vector's size has exceeded the threshold, request more 
memory
+  val currentSize = vector.estimateSize()
+  if (currentSize = memoryThreshold) {
+val amountToRequest = (currentSize * memoryGrowthFactor).toLong
+// Hold the put lock, in case another thread concurrently puts 
a block that
+// takes up the free space we just ensured for unrolling here
+putLock.synchronized {
+  if (!requestMemory(amountToRequest)) {
+// If the first request is not granted, try again after 
ensuring free space
+// If there is still not enough space, give up and drop 
the partition
+val result = ensureFreeSpace(blockId, globalUnrollMemory, 
unrolling = true)
--- End diff --

Instead of trying to ensureFreeSpace on all of the globalUnrollMemory at 
once, let's maybe ensure only enough for the amount we want to request.

If you change ensureFreeSpace to not do accounting differently when we're 
unrolling (as per my comment below), I think this will become something like 
this:
```
val maxWeCanFree = globalUnrollMemory - unrollMemory // (the sum of your 
map)
if (maxWeCanFree  extraAmountWeNeed) {
  ensureFreeSpace(blockId, extraAmountWeNeed)
}
```
Basically this means we can only request extra space until 20% of the 
memory pool is allocated to unrolling. If we can fit our next request within 
that 20%, we go ahead and do it. Otherwise we don't.


---
If your project is set up for 

[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331172
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -395,16 +389,9 @@ class DAGScheduler(
 activeJobs -= job
 
 if (resultStage.isEmpty) {
--- End diff --

Great question...I think we just didn't notice that.  I think it's safe to 
remove it, and you can also remove it as a parameter to 
failJobAndIndependentStages


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-49970876
  
QA results for PR 1165:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brcase class Sample(size: Long, numUpdates: Long)brbrFor 
more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17099/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331224
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either an array with the contents of the entire 
block or an iterator
+   * containing the values of the block (if the array would have exceeded 
available memory).
+   */
+  def unrollSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+// Number of elements unrolled so far
+var elementsUnrolled = 0
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-thread memory to request for unrolling blocks (bytes). 
Exposed for testing.
+val initialMemoryThreshold = 
conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024)
+// How often to check whether we need to request more memory. Exposed 
for testing.
+val memoryCheckPeriod = 
conf.getLong(spark.storage.unrollCheckPeriod, 16)
--- End diff --

Can we just keep this at 16? It will be slower if we make it configurable, 
because the compiler won't be able to optimize mod 16, and it also seems like 
something we don't need to add a config setting for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331245
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unroll the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unrolling the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unrolling blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either an array with the contents of the entire 
block or an iterator
+   * containing the values of the block (if the array would have exceeded 
available memory).
+   */
+  def unrollSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+// Number of elements unrolled so far
+var elementsUnrolled = 0
+// Whether there is still enough memory for us to continue unrolling 
this block
+var keepUnrolling = true
+// Initial per-thread memory to request for unrolling blocks (bytes). 
Exposed for testing.
+val initialMemoryThreshold = 
conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024)
+// How often to check whether we need to request more memory. Exposed 
for testing.
+val memoryCheckPeriod = 
conf.getLong(spark.storage.unrollCheckPeriod, 16)
--- End diff --

BTW this would also make the tests slightly more realistic in that there 
will be fewer differences between them and real-world operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331280
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -395,16 +389,9 @@ class DAGScheduler(
 activeJobs -= job
 
 if (resultStage.isEmpty) {
-  // Clean up result stages.
-  val resultStagesForJob = resultStageToJob.keySet.filter(
-stage = resultStageToJob(stage).jobId == job.jobId)
-  if (resultStagesForJob.size != 1) {
-logWarning(
-  s${resultStagesForJob.size} result stages for job ${job.jobId} 
(expect exactly 1))
-  }
-  resultStageToJob --= resultStagesForJob
+  job.finalStage.resultOfJob = None
 } else {
-  resultStageToJob -= resultStage.get
+  resultStage.get.resultOfJob = None
--- End diff --

And as a result of the above, you can consolidate this if/else into one 
thing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331310
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -355,14 +351,13 @@ class DAGScheduler(
   logDebug(Removing running stage %d.format(stageId))
   runningStages -= stage
 }
-stageToInfos -= stage
 for ((k, v) - shuffleToMapStage.find(_._2 == stage)) {
   shuffleToMapStage.remove(k)
 }
-if (pendingTasks.contains(stage)  
!pendingTasks(stage).isEmpty) {
+if (stage.pendingTasks.isDefined  
!stage.pendingTasks.isEmpty) {
   logDebug(Removing pending status for stage 
%d.format(stageId))
 }
-pendingTasks -= stage
+stage.pendingTasks = None
--- End diff --

Ok don't worry about it, as discussed.   I want to remove this pendingTasks 
anyway, which is redundant with information stored by the TSM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1051#issuecomment-49971143
  
QA tests have started for PR 1051. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17104/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2410][SQL] Cherry picked Hive Thrift/JD...

2014-07-24 Thread liancheng
Github user liancheng commented on the pull request:

https://github.com/apache/spark/pull/1399#issuecomment-49971137
  
Removed the WIP tag. It's ready to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r15331347
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -87,9 +97,47 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   values: Iterator[Any],
   level: StorageLevel,
   returnValues: Boolean): PutResult = {
-val valueEntries = new ArrayBuffer[Any]()
-valueEntries ++= values
-putValues(blockId, valueEntries, level, returnValues)
+putValues(blockId, values, level, returnValues, allowPersistToDisk = 
true)
+  }
+
+  /**
+   * Attempt to put the given block in memory store.
+   *
+   * There may not be enough space to fully unroll the iterator in memory, 
in which case we
+   * optionally drop the values to disk if
+   *   (1) the block's storage level specifies useDisk, and
+   *   (2) `allowPersistToDisk` is true.
+   *
+   * One scenario in which `allowPersistToDisk` is false is when the 
BlockManager reads a block
+   * back from disk and attempts to cache it in memory. In this case, we 
should not persist the
+   * block back on disk again, as it is already in disk store.
+   */
+  private[storage] def putValues(
--- End diff --

Nit: it's kind of weird that this is called putValues when it takes an 
Iterator, and BlockManager has something called putIterator. Maybe this should 
also be putIterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2310. Support arbitrary Spark properties...

2014-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1253


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15331341
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

shuffle output file in sortShuffleWritter do not get cleaned. We might need 
to add map to save registered shuffle handle, Then try to remove the data file 
in unregisterShuffle method. Though at present, in HashShuffleManager, this is 
also not implemented. But HashShuffleManager depends on shuffleBlockManager and 
the file will be cleaned there.

I have a PR to generalize shuffleBlockManager and hide it behind 
shuffleMananger. as in #1241 , and upon blockMananger do remove shuffle, will 
call into this unregisterShuffle method.  Will rebase upon this PR been merged.

Should we fix this issue in my PR, or add the store/clean shuffleHandle 
logic here in this PR firstly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331362
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
--- End diff --

nit: space before =

Also would be great to add inline doc on what these apply's are doing, 
since apply's are used quite a lot in catalyst with different semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2260] Fix standalone-cluster mode, whic...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1538#issuecomment-49971356
  
QA results for PR 1538:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17100/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331481
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
+import scala.collection.mutable.HashSet
--- End diff --

ha this is WIP. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331465
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
+import scala.collection.mutable.HashSet
--- End diff --

Mr. Xin your imports are ordered incorrectly! I expected better of you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331488
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -56,6 +58,16 @@ private[spark] class Stage(
   val numPartitions = rdd.partitions.size
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
+
+  /** List of jobs that this stage belong to. */
--- End diff --

as long as we're nit-ing...belong - belongs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331519
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -56,6 +58,16 @@ private[spark] class Stage(
   val numPartitions = rdd.partitions.size
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
+
+  /** List of jobs that this stage belong to. */
+  val jobIds = new HashSet[Int]
+  /** For stages that are the final (consists of only ResultTasks), link 
to the ActiveJob. */
+  var resultOfJob: Option[ActiveJob] = None
+  var pendingTasks: Option[HashSet[Task[_]]] = None
--- End diff --

Why use an option here, as opposed to just a (possibly empty) hash set?  
The option makes many of the accesses more clumsy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread kayousterhout
Github user kayousterhout commented on the pull request:

https://github.com/apache/spark/pull/1561#issuecomment-49971683
  
Love the cleanup here!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2410][SQL] Cherry picked Hive Thrift/JD...

2014-07-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1399#issuecomment-49971698
  
@liancheng I just merged a patch related to SparkSubmit... can you rebase?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2310. Support arbitrary Spark properties...

2014-07-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1253#issuecomment-49971888
  
@sryza I merged this just now because another patch was going to change 
this code and I wanted to avoid you having to rebase again. That said, I found 
an issue with this after merging. Would you be able to fix this?

https://issues.apache.org/jira/browse/SPARK-2664


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331775
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -355,14 +351,13 @@ class DAGScheduler(
   logDebug(Removing running stage %d.format(stageId))
   runningStages -= stage
 }
-stageToInfos -= stage
 for ((k, v) - shuffleToMapStage.find(_._2 == stage)) {
   shuffleToMapStage.remove(k)
 }
-if (pendingTasks.contains(stage)  
!pendingTasks(stage).isEmpty) {
+if (stage.pendingTasks.isDefined  
!stage.pendingTasks.isEmpty) {
   logDebug(Removing pending status for stage 
%d.format(stageId))
 }
-pendingTasks -= stage
+stage.pendingTasks = None
--- End diff --

The old pendingTasks needed the clearing step so that it wouldn't continue 
to have removed stages mapping to empty sets of remaining tasks.  I agree that 
a similar step shouldn't be necessary now that pendingTasks is encapsulated in 
a Stage that is going away.  In fact, in the event of resubmission don't we 
want this to _not_ be None, else stage.pendingTasks.get will fail?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15331819
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
+import scala.collection.mutable.HashSet
--- End diff --

I'll have to remember that one!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Build should not run hive tests by default.

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1565#issuecomment-49972683
  
QA results for PR 1565:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17095/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15332026
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
@@ -56,6 +58,16 @@ private[spark] class Stage(
   val numPartitions = rdd.partitions.size
   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
   var numAvailableOutputs = 0
+
+  /** List of jobs that this stage belong to. */
+  val jobIds = new HashSet[Int]
+  /** For stages that are the final (consists of only ResultTasks), link 
to the ActiveJob. */
+  var resultOfJob: Option[ActiveJob] = None
+  var pendingTasks: Option[HashSet[Task[_]]] = None
--- End diff --

And potentially error prone.  I don't think there is anyplace left where 
the distinction between None and Some(emptySet) is useful except for the check 
in removeStage 
(https://github.com/rxin/spark/blob/dagSchedulerHashMaps/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357),
 and that's not really even useful anymore.  On the other hand, having a None 
where we expect at least Some(emptySet) can be a problem.

I think we're better off without the Option. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15332107
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+
+/**
+ * Generates bytecode that produces a new [[Row]] object based on a fixed 
set of input
+ * [[Expression Expressions]] and a given input [[Row]].  The returned 
[[Row]] object is custom
+ * generated based on the output types of the [[Expression]] to avoid 
boxing of primitive values.
+ */
+object GenerateProjection extends CodeGenerator[Seq[Expression], 
Projection] {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  protected def canonicalize(in: Seq[Expression]): Seq[Expression] =
+in.map(ExpressionCanonicalizer(_))
+
+  protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): 
Seq[Expression] =
+in.map(BindReferences.bindReference(_, inputSchema))
+
+  // Make Mutablility optional...
+  protected def create(expressions: Seq[Expression]): Projection = {
+val tupleLength = ru.Literal(Constant(expressions.length))
+val lengthDef = qfinal val length = $tupleLength
+
+/* TODO: Configurable...
+val nullFunctions =
+  q
+private final val nullSet = new 
org.apache.spark.util.collection.BitSet(length)
+final def setNullAt(i: Int) = nullSet.set(i)
+final def isNullAt(i: Int) = nullSet.get(i)
+  
+ */
+
+val nullFunctions =
+  q
+private[this] var nullBits = new 
Array[Boolean](${expressions.size})
+final def setNullAt(i: Int) = { nullBits(i) = true }
+final def isNullAt(i: Int) = nullBits(i)
+  .children
+
+val tupleElements = expressions.zipWithIndex.flatMap {
+  case (e, i) =
+val elementName = newTermName(sc$i)
+val evaluatedExpression = expressionEvaluator(e)
--- End diff --

where is expressionEvaluator defined?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15332175
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
--- End diff --

actually after reading through the code more, i'd argue we should rename 
this function from apply to something more informative. unless semantically 
very explicit  obvious (e.g. array), the use of apply makes it harder to 
understand code around its usage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1188#discussion_r15332185
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---
@@ -99,19 +99,30 @@ private[ui] class StageTableBase(
 {s.name}
   /a
 
+val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions  0)
 val details = if (s.details.nonEmpty) {
   span 
onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
 class=expand-details
-+show details
-  /span
-  pre class=stage-details collapsed{s.details}/pre
++call stack
+  /span ++
+  div class=stage-details collapsed
+{if (cachedRddInfos.nonEmpty) {
+  strongPersisted RDD:/strong ++
+  cachedRddInfos.map { i =
+a 
href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)}
--- End diff --

I tested this locally and it would be good to pull this onto one line, 
because otherwise the hyperlink renders with a space at the beginning. If you 
need to violate the style guidelines you can disable scalastyle using 
`//scalastyle off` and `//scalastyle on` (see where we do this elsewhere in the 
UI code).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1188#discussion_r15332227
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---
@@ -99,19 +99,30 @@ private[ui] class StageTableBase(
 {s.name}
   /a
 
+val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions  0)
 val details = if (s.details.nonEmpty) {
   span 
onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
 class=expand-details
-+show details
-  /span
-  pre class=stage-details collapsed{s.details}/pre
++call stack
+  /span ++
+  div class=stage-details collapsed
+{if (cachedRddInfos.nonEmpty) {
+  strongPersisted RDD:/strong ++
--- End diff --

I prefer this being normal text instead of bold. It already pops out 
because the guidelines are all grey.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1188#issuecomment-49973457
  
Built and tested it locally. I was actually proposing pulling the Persisted 
RDD part outside of the toggle component. I think it's fine either way, but if 
we decide to put it in the toggle box, I agree the name should be more 
generically details. If we decide to pull it outside of the toggle box, then 
we can leave the box as stack trace. I slightly prefer leaving it out, but 
maybe if the name is really long that will be annoying? I'm worried people will 
never see it if it's hidden in the toggle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1051#issuecomment-49973607
  
QA results for PR 1051:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17104/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15332387
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Good question, but files on disk actually get cleaned by BlockManager 
directly if cleaning is turned on, and sort-based shuffle is set up to only use 
files. ShuffleBlockManager had extra state in memory in the form of metadata, 
that's why it needs its own cleaner. But I don't think there's an issue here.

It will be important to do something for unregisterShuffle, but right now 
nothing calls that. That's part of the API because I wanted to move the 
MapOutputTracker behind the ShuffleManager as well. But any patches to move 
more stuff behind it are welcome, and I agree the ShuffleBlockManager should be 
specific to hash-based shuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI

2014-07-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1188#issuecomment-49973700
  
One more thing - after playing in my browser I think just saying RDD: XX 
is sufficient. Not sure Persistent adds much, and it will save space. Still 
prefer it non-bold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1499#issuecomment-49973755
  
Added one more commit that fixes the type of ShuffledRDD, because in this 
new shuffle it's not possible to return a custom Product2 the way it's written 
now, and in the old one it wasn't possible anyway with aggregation. See 
https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55 
for details. @rxin you should look at this one.

With this commit there's now a version of ShuffleSuite that passes using 
SortBasedShuffle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...

2014-07-24 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/1568

[SPARK-2652] [PySpark] Turning some default configs for PySpark

Add several default configs for PySpark, related to serialization in JVM.

spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.kryo.referenceTracking =  False
spark.serializer.objectStreamReset = 1

This will help to reduce the memory usage during RDD.partitionBy()

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark conf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1568


commit c04a83dc311c5e073c8227665ac318647be9a0e4
Author: Davies Liu davies@gmail.com
Date:   2014-07-23T20:25:11Z

some default configs for PySpark




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1210#issuecomment-49973939
  
QA results for PR 1210:br- This patch PASSES unit tests.brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17098/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Update HiveMetastoreCatalog.scala

2014-07-24 Thread baishuo
GitHub user baishuo opened a pull request:

https://github.com/apache/spark/pull/1569

Update HiveMetastoreCatalog.scala

I think it's better to defined hiveQlTable as a val

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/baishuo/spark patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1569


commit a7b32a28a59886dfac45331d781a548fc18b098f
Author: baishuo(白硕) vc_j...@hotmail.com
Date:   2014-07-24T07:01:33Z

Update HiveMetastoreCatalog.scala

I think it's better to defined hiveQlTable as a val




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1568#issuecomment-49974134
  
QA tests have started for PR 1568. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17107/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2260] Fix standalone-cluster mode, whic...

2014-07-24 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1538#discussion_r15332601
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala ---
@@ -62,20 +62,23 @@ object CommandUtils extends Logging {
 val joined = 
command.libraryPathEntries.mkString(File.pathSeparator)
 Seq(s-Djava.library.path=$joined)
   } else {
- Seq()
+Seq()
   }
 
 val permGenOpt = Seq(-XX:MaxPermSize=128m)
 
+// Convert Spark properties to java system properties
+val sparkOpts = command.sparkProps.map { case (k, v) = s-D$k=$v }
--- End diff --

Yeah, I'd prefer not to change the executor code path here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Update HiveMetastoreCatalog.scala

2014-07-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1569#issuecomment-49974351
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1210#issuecomment-49974441
  
QA tests have started for PR 1210. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17108/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd

2014-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1519


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd

2014-07-24 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1519#issuecomment-49974606
  
Thanks Adrian, merged this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd

2014-07-24 Thread adrian-wang
Github user adrian-wang commented on the pull request:

https://github.com/apache/spark/pull/1519#issuecomment-49975077
  
Thanks Matei!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...

2014-07-24 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/1051#discussion_r15333083
  
--- Diff: python/pyspark/conf.py ---
@@ -99,6 +99,12 @@ def set(self, key, value):
 self._jconf.set(key, unicode(value))
 return self
 
+def setIfMissing(self, key, value):
+Set a configuration property, if not already set.
+if self.get(key) == None:
--- End diff --

We may have to make SparkConf iterable for this to work. It does not work 
as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1460#discussion_r15333099
  
--- Diff: python/pyspark/shuffle.py ---
@@ -0,0 +1,436 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the License); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an AS IS BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import platform
+import shutil
+import warnings
+
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+
+try:
+import psutil
+
+def get_used_memory():
+ Return the used memory in MB 
+process = psutil.Process(os.getpid())
+if hasattr(process, memory_info):
+info = process.memory_info()
+else:
+info = process.get_memory_info()
+return info.rss  20
+except ImportError:
+
+def get_used_memory():
+ Return the used memory in MB 
+if platform.system() == 'Linux':
+for line in open('/proc/self/status'):
+if line.startswith('VmRSS:'):
+return int(line.split()[1])  10
+else:
+warnings.warn(Please install psutil to have better 
+support with spilling)
+if platform.system() == Darwin:
+import resource
+rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
+return rss  20
+# TODO: support windows
+return 0
+
+
+class Aggregator(object):
+
+
+Aggregator has tree functions to merge values into combiner.
+
+createCombiner:  (value) - combiner
+mergeValue:  (combine, value) - combiner
+mergeCombiners:  (combiner, combiner) - combiner
+
+
+def __init__(self, createCombiner, mergeValue, mergeCombiners):
+self.createCombiner = createCombiner
+self.mergeValue = mergeValue
+self.mergeCombiners = mergeCombiners
+
+
+class SimpleAggregator(Aggregator):
+
+
+SimpleAggregator is useful for the cases that combiners have
+same type with values
+
+
+def __init__(self, combiner):
+Aggregator.__init__(self, lambda x: x, combiner, combiner)
+
+
+class Merger(object):
+
+
+Merge shuffled data together by aggregator
+
+
+def __init__(self, aggregator):
+self.agg = aggregator
+
+def mergeValues(self, iterator):
+ Combine the items by creator and combiner 
+raise NotImplementedError
+
+def mergeCombiners(self, iterator):
+ Merge the combined items by mergeCombiner 
+raise NotImplementedError
+
+def iteritems(self):
+ Return the merged items ad iterator 
+raise NotImplementedError
+
+
+class InMemoryMerger(Merger):
+
+
+In memory merger based on in-memory dict.
+
+
+def __init__(self, aggregator):
+Merger.__init__(self, aggregator)
+self.data = {}
+
+def mergeValues(self, iterator):
+ Combine the items by creator and combiner 
+# speed up attributes lookup
+d, creator = self.data, self.agg.createCombiner
+comb = self.agg.mergeValue
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else creator(v)
+
+def mergeCombiners(self, iterator):
+ Merge the combined items by mergeCombiner 
+# speed up attributes lookup
+d, comb = self.data, self.agg.mergeCombiners
+for k, v in iterator:
+d[k] = comb(d[k], v) if k in d else v
+
+def iteritems(self):
+ Return the merged items ad iterator 
+return self.data.iteritems()
+
+
+class ExternalMerger(Merger):
+
+
+External merger will dump the aggregated data into disks when
+memory usage goes above the limit, then merge them together.
+
+This 

[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-24 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1056#issuecomment-49975385
  
I was actually a bit confused why `updateShuffleReadMetrics` is 
synchronized. Can that be called from multiple threads as-is? I wasn't aware of 
cases where we had multi-threaded access to `TaskMetrics` prior to this patch.

To your point, there are really only two fields here that are not just 
commutative counters - `hostname` and `shuffleFinishTime`. Is there is a race 
where `hosntame` gets sent over as null in a heartbeat and it triggers an NPE 
downstream? Maybe we should set `shuffleFinishTime` to -1 by default to 
indicate it's not populated.

My motivation for adding extra typing was to make it explicit for people 
extending TaskMetrics in the future. We already have some proposals for 
augmenting the `TaskMetrics` class and some of them add more complicated 
fields. Perhaps an alternative is just to document clearly in the class that we 
might take a snapshot of the TaskMetrics at any time and send it to consumers 
of the SparkListener interface.

Over time, I'd actually hope these get re-implemented as accumulators...




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle

2014-07-24 Thread colorant
Github user colorant commented on a diff in the pull request:

https://github.com/apache/spark/pull/1499#discussion_r15333148
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{DataInputStream, FileInputStream}
+
+import org.apache.spark.shuffle._
+import org.apache.spark.{TaskContext, ShuffleDependency}
+import org.apache.spark.shuffle.hash.HashShuffleReader
+import org.apache.spark.storage.{DiskBlockManager, FileSegment, 
ShuffleBlockId}
+
+private[spark] class SortShuffleManager extends ShuffleManager {
+  /**
+   * Register a shuffle with the manager and obtain a handle for it to 
pass to tasks.
+   */
+  override def registerShuffle[K, V, C](
+  shuffleId: Int,
+  numMaps: Int,
+  dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
+new BaseShuffleHandle(shuffleId, numMaps, dependency)
+  }
+
+  /**
+   * Get a reader for a range of reduce partitions (startPartition to 
endPartition-1, inclusive).
+   * Called on executors by reduce tasks.
+   */
+  override def getReader[K, C](
+  handle: ShuffleHandle,
+  startPartition: Int,
+  endPartition: Int,
+  context: TaskContext): ShuffleReader[K, C] = {
+// We currently use the same block store shuffle fetcher as the 
hash-based shuffle.
+new HashShuffleReader(
+  handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, 
endPartition, context)
+  }
+
+  /** Get a writer for a given partition. Called on executors by map 
tasks. */
+  override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: 
TaskContext)
+  : ShuffleWriter[K, V] = {
+new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], 
mapId, context)
+  }
+
+  /** Remove a shuffle's metadata from the ShuffleManager. */
+  override def unregisterShuffle(shuffleId: Int): Unit = {}
--- End diff --

Maybe I miss some code? But from what I understanding, when the cleaning is 
turned on, it seems to me, if by timestamp approaching, blockManager won't 
remove shuffle data,ShuffleBlockManager will do the work by itself. And if by 
auto clean approaching when doCleanShuffle is called, it go through the current 
ShuffleBlockManager interface. In neither case, these sortShuffleWritter 
generated file will be cleaned? Or do you mean, as long as there are no 
metadata in memory, shuffle file on disk is ok to not be removed until 
application exit?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...

2014-07-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/1056#discussion_r15333186
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -348,4 +352,46 @@ private[spark] class Executor(
   }
 }
   }
+
+  def stop() {
+isStopped = true
+threadPool.shutdown()
+  }
+
+  def startDriverHeartbeater() {
+val interval = conf.getInt(spark.executor.heartbeatInterval, 2000)
+val timeout = AkkaUtils.lookupTimeout(conf)
+val retryAttempts = AkkaUtils.numRetries(conf)
+val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
+val heartbeatReceiverRef = 
AkkaUtils.makeDriverRef(HeartbeatReceiver, conf, env.actorSystem)
+
+val t = new Thread() {
+  override def run() {
+// Sleep a random interval so the heartbeats don't end up in sync
+Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
+
+while (!isStopped) {
+  val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
+  for (taskRunner - runningTasks.values()) {
+Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =
+  tasksMetrics += ((taskRunner.taskId, metrics))
+}
+  }
+
+  val message = Heartbeat(executorId, tasksMetrics.toArray,
--- End diff --

I'm getting 101 :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2492][Streaming] kafkaReceiver minor ch...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1420#issuecomment-49975753
  
QA tests have started for PR 1420. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17109/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1512
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1552
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1579
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...

2014-07-24 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/1051#discussion_r15333403
  
--- Diff: python/pyspark/conf.py ---
@@ -99,6 +99,12 @@ def set(self, key, value):
 self._jconf.set(key, unicode(value))
 return self
 
+def setIfMissing(self, key, value):
+Set a configuration property, if not already set.
+if self.get(key) == None:
--- End diff --

ah, yes. but self.get(key) is None sounds more Pythonic :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333476
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333558
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: Build should not run hive tests by default.

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1565#issuecomment-49976502
  
QA results for PR 1565:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17102/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333592
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -47,4 +47,30 @@ package org.apache.spark.sql.catalyst
  * ==Evaluation==
  * The result of expressions can be evaluated using the 
`Expression.apply(Row)` method.
  */
-package object expressions
+package object expressions  {
+
+  /**
+   * Converts a [[Row]] to another Row given a sequence of expression that 
define each column of the
+   * new row. If the schema of the input row is specified, then the given 
expression will be bound
+   * to that schema.
+   */
+  abstract class Projection extends (Row = Row)
+
+  /**
+   * Converts a [[Row]] to another Row given a sequence of expression that 
define each column of the
+   * new row. If the schema of the input row is specified, then the given 
expression will be bound
+   * to that schema.
+   *
+   * In contrast to a normal projection, a MutableProjection reuses the 
same underlying row object
+   * each time an input row is added.  This significantly reduces the cost 
of calculating the
+   * projection, but means that it is not safe to hold on to a reference 
to a [[Row]] after `next()`
+   * has been called on the [[Iterator]] that produced it. Instead, the 
user must call `Row.copy()`
+   * and hold on to the returned [[Row]] before calling `next()`.
+   */
+  abstract class MutableProjection extends Projection {
+def currentValue: Row
+
+/** Updates the target of this projection to a new MutableRow */
--- End diff --

maybe ```Uses the given row to store the output of the projection.``` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1051#issuecomment-49976557
  
QA tests have started for PR 1051. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17111/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1460#issuecomment-49976553
  
QA tests have started for PR 1460. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333688
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+
+/**
+ * Overrides our expression evaluation tests to use code generation for 
evaluation.
+ */
+class GeneratedEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+val plan = try {
+  GenerateMutableProjection(Alias(expression, 
sOptimized($expression))() :: Nil)()
+} catch {
+  case e: Throwable =
+val evaluated = GenerateProjection.expressionEvaluator(expression)
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual  = plan(inputRow).apply(0)
+if(actual != expected) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+
+
+  test(multithreaded eval) {
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+val futures = (1 to 20).map { _ =
+  future {
+GeneratePredicate(EqualTo(Literal(1), Literal(1)))
+GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil)
+  }
+}
+
+futures.foreach(Await.result(_, 10.seconds))
+  }
+}
+
+/**
+ * Overrides our expression evaluation tests to use generated code on 
mutable rows.
+ */
+class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite {
--- End diff --

Best to take this out into a separate file, since that's the common 
standard across all Spark modules (each test suite has its own file)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333694
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+
+/**
+ * Overrides our expression evaluation tests to use code generation for 
evaluation.
+ */
+class GeneratedEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+val plan = try {
+  GenerateMutableProjection(Alias(expression, 
sOptimized($expression))() :: Nil)()
+} catch {
+  case e: Throwable =
+val evaluated = GenerateProjection.expressionEvaluator(expression)
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual  = plan(inputRow).apply(0)
+if(actual != expected) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+
+
+  test(multithreaded eval) {
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+val futures = (1 to 20).map { _ =
+  future {
+GeneratePredicate(EqualTo(Literal(1), Literal(1)))
+GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil)
+  }
+}
+
+futures.foreach(Await.result(_, 10.seconds))
+  }
+}
+
+/**
+ * Overrides our expression evaluation tests to use generated code on 
mutable rows.
+ */
+class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+lazy val evaluated = GenerateProjection.expressionEvaluator(expression)
+
+val plan = try {
+  GenerateProjection(Alias(expression, sOptimized($expression))() :: 
Nil)
+} catch {
+  case e: Throwable =
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual = plan(inputRow)
+val expectedRow = new GenericRow(Array[Any](expected))
+if (actual.hashCode() != expectedRow.hashCode()) {
+  fail(
+s
+  |Mismatched hashCodes for values: $actual, $expectedRow
+  |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()}
+  |${evaluated.code.mkString(\n)}
+.stripMargin)
+}
+if (actual != expectedRow) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+}
--- End diff --

nitpick - add a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333762
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---
@@ -47,23 +47,29 @@ case class Generate(
 }
   }
 
-  override def output =
+  // This must be a val since the generator output expr ids are not 
preserved by serialization.
+  override val output =
 if (join) child.output ++ generatorOutput else generatorOutput
 
+  val boundGenerator = BindReferences.bindReference(generator, 
child.output)
+
+  /** Codegenned rows are not serializable... */
--- End diff --

TODO?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1568#issuecomment-49977116
  
QA results for PR 1568:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17107/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333832
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: Build should not run hive tests by default.

2014-07-24 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/1565#issuecomment-49977140
  
I feel, making those two examples to compile optionally(by isolating) is 
better than isolating all the hive tests like this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333936
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -18,22 +18,53 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Logging, Row}
+import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
 import org.apache.spark.sql.catalyst.plans.physical._
 
+
+object SparkPlan {
+  protected[sql] val currentContext = new ThreadLocal[SQLContext]()
+}
+
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
+abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with 
Serializable {
   self: Product =
 
+  /**
+   * A handle to the SQL Context that was used to create this plan.   
Since many operators need
+   * access to the sqlContext for RDD operations or configuration this 
field is automatically
+   * populated by the query planning infrastructure.
+   */
+  @transient
+  protected val sqlContext = SparkPlan.currentContext.get()
+
+  protected def sparkContext = sqlContext.sparkContext
+
+  def logger = log
+
+  val codegenEnabled: Boolean = if(sqlContext != null) {
--- End diff --

when is context null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging {
*/
   def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
 
-  protected def buildRow(values: Seq[Any]): Row =
-new GenericRow(values.toArray)
+  protected def newProjection(
+  expressions: Seq[Expression], inputSchema: Seq[Attribute]): 
Projection = {
+log.debug(
+  sCreating Projection: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if (codegenEnabled) {
+  GenerateProjection(expressions, inputSchema)
+} else {
+  new InterpretedProjection(expressions, inputSchema)
+}
+  }
+
+  protected def newMutableProjection(
+  expressions: Seq[Expression],
+  inputSchema: Seq[Attribute]): () = MutableProjection = {
+log.debug(
+  sCreating MutableProj: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if(codegenEnabled) {
+  GenerateMutableProjection(expressions, inputSchema)
--- End diff --

This is another use of apply that is very confusing, because it is not 
obvious GenerateMutableProjection is a closure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1210#issuecomment-49977441
  
QA results for PR 1210:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17108/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333978
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -192,9 +187,9 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 val relation =
   ParquetRelation.create(path, child, 
sparkContext.hadoopConfiguration)
 // Note: overwrite=false because otherwise the metadata we just 
created will be deleted
-InsertIntoParquetTable(relation, planLater(child), 
overwrite=false)(sqlContext) :: Nil
+InsertIntoParquetTable(relation, planLater(child), 
overwrite=false) :: Nil
--- End diff --

nit: space after/before =


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -300,8 +298,16 @@ case class LeftSemiJoinBNL(
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends 
BinaryNode {
   def output = left.output ++ right.output
 
-  def execute() = 
left.execute().map(_.copy()).cartesian(right.execute().map(_.copy())).map {
-case (l: Row, r: Row) = buildRow(l ++ r)
+  def execute() = {
+val leftResults = left.execute().map(_.copy())
+val rightResults = right.execute().map(_.copy())
+
+leftResults.cartesian(rightResults).mapPartitions { iter =
+  val joinedRow = new JoinedRow
+  iter.map {
+case (l: Row, r: Row) = joinedRow(l, r)
--- End diff --

maybe perf doesn't matter too much here since it is a cartesian product 
already, but you can remove the pattern matching to improve perf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334075
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import org.apache.spark.sql.execution.SparkPlan
--- End diff --

import order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15334329
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -341,8 +336,9 @@ class DAGScheduler(
 if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
   logError(No stages registered for job  + job.jobId)
 } else {
-  stageIdToJobIds.filterKeys(stageId = 
registeredStages.get.contains(stageId)).foreach {
-case (stageId, jobSet) =
+  stageIdToStage.filter(s = 
registeredStages.get.contains(s._1)).foreach {
--- End diff --

i'd need to do more hash lookups if it is filterKeys


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334407
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * 

[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1561#issuecomment-49978964
  
Thanks. Pushed a version that should have addressed most of the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1561#discussion_r15334478
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -728,8 +697,6 @@ class DAGScheduler(
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
-val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
--- End diff --

do i need to call ```stage.pendingTasks = new HashSet``` here? 
@kayousterhout @markhamstra 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2492][Streaming] kafkaReceiver minor ch...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1420#issuecomment-49979126
  
QA results for PR 1420:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17109/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests

2014-07-24 Thread chenghao-intel
GitHub user chenghao-intel opened a pull request:

https://github.com/apache/spark/pull/1570

[SPARK-2665] [SQL] Add EqualNS  Unit Tests

Hive Supports the operator =, which returns same result with EQUAL(=) 
operator for non-null operands, but returns TRUE if both are NULL, FALSE if one 
of the them is NULL.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenghao-intel/spark equalns

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1570


commit 7af4b0b3894fbd416ba2ecacbe2a705219f52619
Author: Cheng Hao hao.ch...@intel.com
Date:   2014-07-24T08:03:01Z

Add EqualNS  Unit Tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1561#issuecomment-49979231
  
QA tests have started for PR 1561. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17112/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests

2014-07-24 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1570#issuecomment-49979539
  
This is really nice - passes a lot more tests. I guess we will eventually 
run into the problem that the unit tests without parallelization will take too 
long...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests

2014-07-24 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1570#issuecomment-49979557
  
Do you mind looking into how we can parallelize the hive compatibility 
tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1570#issuecomment-49979649
  
QA tests have started for PR 1570. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17113/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2663] [SQL] Support the Grouping Set

2014-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1567#issuecomment-49979679
  
QA tests have started for PR 1567. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17114/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests

2014-07-24 Thread chenghao-intel
Github user chenghao-intel commented on the pull request:

https://github.com/apache/spark/pull/1570#issuecomment-49979808
  
Yes, I will think about how to parallelize those tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2657 Use more compact data structures th...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1555#discussion_r15334913
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * An append-only buffer similar to ArrayBuffer, but more memory-efficient 
for small buffers.
+ * ArrayBuffer always allocates an Object array to store the data, with 16 
entries by default,
+ * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer 
can keep up to two
+ * elements in fields of the main object, and only allocates an 
Array[AnyRef] if there are more
+ * entries than that. This makes it more efficient for operations like 
groupBy where we expect
+ * some keys to have very few elements.
+ */
+private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+  // First two elements
+  private var element0: T = _
+  private var element1: T = _
+
+  // Number of elements, including our two in the main object
+  private var curSize = 0
+
+  // Array for extra elements
+  private var otherElements: Array[AnyRef] = null
+
+  def apply(position: Int): T = {
+if (position  0 || position = curSize) {
+  throw new IndexOutOfBoundsException
+}
+if (position == 0) {
+  element0
+} else if (position == 1) {
+  element1
+} else {
+  otherElements(position - 2).asInstanceOf[T]
+}
+  }
+
+  def update(position: Int, value: T): Unit = {
--- End diff --

i guess internally. maybe private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >