spark git commit: [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e2546c227 -> aede729a9


[SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python

Could jkbradley and davies review it?

- Create a wrapper class: `LDAModelWrapper` for `LDAModel`. Because we can't 
deal with the return value of`describeTopics` in Scala from pyspark directly. 
`Array[(Array[Int], Array[Double])]` is too complicated to convert it.
- Add `loadLDAModel` in `PythonMLlibAPI`. Since `LDAModel` in Scala is an 
abstract class and we need to call `load` of `DistributedLDAModel`.

[[SPARK-8467] Add LDAModel.describeTopics() in Python - ASF 
JIRA](https://issues.apache.org/jira/browse/SPARK-8467)

Author: Yu ISHIKAWA 

Closes #8643 from yu-iskw/SPARK-8467-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aede729a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aede729a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aede729a

Branch: refs/heads/branch-1.6
Commit: aede729a9463e4450fc5e403cbc35bcd13ba40a3
Parents: e2546c2
Author: Yu ISHIKAWA 
Authored: Fri Nov 6 22:56:29 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 22:58:06 2015 -0800

--
 .../mllib/api/python/LDAModelWrapper.scala  | 46 
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 +-
 python/pyspark/mllib/clustering.py  | 33 +++---
 3 files changed, 75 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aede729a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
new file mode 100644
index 000..63282ee
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.api.python
+
+import scala.collection.JavaConverters
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.LDAModel
+import org.apache.spark.mllib.linalg.Matrix
+
+/**
+ * Wrapper around LDAModel to provide helper methods in Python
+ */
+private[python] class LDAModelWrapper(model: LDAModel) {
+
+  def topicsMatrix(): Matrix = model.topicsMatrix
+
+  def vocabSize(): Int = model.vocabSize
+
+  def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize)
+
+  def describeTopics(maxTermsPerTopic: Int): Array[Byte] = {
+val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, 
termWeights) =>
+  val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
+  val jTermWeights = 
JavaConverters.seqAsJavaListConverter(termWeights).asJava
+  Array[Any](jTerms, jTermWeights)
+}
+SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava)
+  }
+
+  def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/aede729a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 40c4180..54b03a9 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -517,7 +517,7 @@ private[python] class PythonMLLibAPI extends Serializable {
   topicConcentration: Double,
   seed: java.lang.Long,
   checkpointInterval: Int,
-  optimizer: String): LDAModel = {
+  optimizer: String): LDAModelWrapper = {
 val algo = new LDA()
   .setK(k)
   .setMaxIterations(maxIterations)
@@ -535,7 +535,16 @@ private[python] class PythonML

spark git commit: [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 7f741905b -> 2ff0e79a8


[SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python

Could jkbradley and davies review it?

- Create a wrapper class: `LDAModelWrapper` for `LDAModel`. Because we can't 
deal with the return value of`describeTopics` in Scala from pyspark directly. 
`Array[(Array[Int], Array[Double])]` is too complicated to convert it.
- Add `loadLDAModel` in `PythonMLlibAPI`. Since `LDAModel` in Scala is an 
abstract class and we need to call `load` of `DistributedLDAModel`.

[[SPARK-8467] Add LDAModel.describeTopics() in Python - ASF 
JIRA](https://issues.apache.org/jira/browse/SPARK-8467)

Author: Yu ISHIKAWA 

Closes #8643 from yu-iskw/SPARK-8467-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ff0e79a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ff0e79a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ff0e79a

Branch: refs/heads/master
Commit: 2ff0e79a8647cca5c9c57f613a07e739ac4f677e
Parents: 7f74190
Author: Yu ISHIKAWA 
Authored: Fri Nov 6 22:56:29 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 22:56:29 2015 -0800

--
 .../mllib/api/python/LDAModelWrapper.scala  | 46 
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 +-
 python/pyspark/mllib/clustering.py  | 33 +++---
 3 files changed, 75 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ff0e79a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
new file mode 100644
index 000..63282ee
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.mllib.api.python
+
+import scala.collection.JavaConverters
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.clustering.LDAModel
+import org.apache.spark.mllib.linalg.Matrix
+
+/**
+ * Wrapper around LDAModel to provide helper methods in Python
+ */
+private[python] class LDAModelWrapper(model: LDAModel) {
+
+  def topicsMatrix(): Matrix = model.topicsMatrix
+
+  def vocabSize(): Int = model.vocabSize
+
+  def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize)
+
+  def describeTopics(maxTermsPerTopic: Int): Array[Byte] = {
+val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, 
termWeights) =>
+  val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
+  val jTermWeights = 
JavaConverters.seqAsJavaListConverter(termWeights).asJava
+  Array[Any](jTerms, jTermWeights)
+}
+SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava)
+  }
+
+  def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2ff0e79a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 40c4180..54b03a9 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -517,7 +517,7 @@ private[python] class PythonMLLibAPI extends Serializable {
   topicConcentration: Double,
   seed: java.lang.Long,
   checkpointInterval: Int,
-  optimizer: String): LDAModel = {
+  optimizer: String): LDAModelWrapper = {
 val algo = new LDA()
   .setK(k)
   .setMaxIterations(maxIterations)
@@ -535,7 +535,16 @@ private[python] class PythonMLLibAPI e

spark git commit: [SPARK-11112] DAG visualization: display RDD callsite

2015-11-06 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 30b706b7b -> 7f741905b


[SPARK-2] DAG visualization: display RDD callsite

https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png";>
mateiz sarutak

Author: Andrew Or 

Closes #9398 from andrewor14/rdd-callsite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f741905
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f741905
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f741905

Branch: refs/heads/master
Commit: 7f741905b06ed6d3dfbff6db41a3355dab71aa3c
Parents: 30b706b
Author: Andrew Or 
Authored: Sat Nov 7 05:35:53 2015 +0100
Committer: Andrew Or 
Committed: Sat Nov 7 05:35:53 2015 +0100

--
 .../apache/spark/ui/static/spark-dag-viz.css|  4 +++
 .../org/apache/spark/storage/RDDInfo.scala  | 16 +++--
 .../spark/ui/scope/RDDOperationGraph.scala  | 10 +++---
 .../org/apache/spark/util/JsonProtocol.scala| 17 -
 .../scala/org/apache/spark/util/Utils.scala |  1 +
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 14 
 .../apache/spark/util/JsonProtocolSuite.scala   | 37 
 7 files changed, 79 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
--
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css 
b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
index 3b4ae2e..9cc5c79 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css
@@ -122,3 +122,7 @@
   stroke: #52C366;
   stroke-width: 2px;
 }
+
+.tooltip-inner {
+  white-space: pre-wrap;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala 
b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 9606262..3fa209b 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDDOperationScope, RDD}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallSite, Utils}
 
 @DeveloperApi
 class RDDInfo(
@@ -28,9 +28,20 @@ class RDDInfo(
 val numPartitions: Int,
 var storageLevel: StorageLevel,
 val parentIds: Seq[Int],
+val callSite: CallSite,
 val scope: Option[RDDOperationScope] = None)
   extends Ordered[RDDInfo] {
 
+  def this(
+  id: Int,
+  name: String,
+  numPartitions: Int,
+  storageLevel: StorageLevel,
+  parentIds: Seq[Int],
+  scope: Option[RDDOperationScope] = None) {
+this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, 
scope)
+  }
+
   var numCachedPartitions = 0
   var memSize = 0L
   var diskSize = 0L
@@ -56,6 +67,7 @@ private[spark] object RDDInfo {
   def fromRdd(rdd: RDD[_]): RDDInfo = {
 val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
 val parentIds = rdd.dependencies.map(_.rdd.id)
-new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, 
parentIds, rdd.scope)
+new RDDInfo(rdd.id, rddName, rdd.partitions.length,
+  rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 81f168a..2427456 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer}
 import org.apache.spark.Logging
 import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.CallSite
 
 /**
  * A representation of a generic cluster graph used for storing information on 
RDD operations.
@@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph(
 rootCluster: RDDOperationCluster)
 
 /** A node in an RDDOperationGraph. This represents an RDD. */
-private[ui] case class RDDOperationNode(id: Int, name: Strin

[2/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

2015-11-06 Thread joshrosen
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

In order to lay the groundwork for proper off-heap memory support in SQL / 
Tungsten, we need to extend our MemoryManager to perform bookkeeping for 
off-heap memory.

## User-facing changes

This PR introduces a new configuration, `spark.memory.offHeapSize` (name 
subject to change), which specifies the absolute amount of off-heap memory that 
Spark and Spark SQL can use. If Tungsten is configured to use off-heap 
execution memory for allocating data pages, then all data page allocations must 
fit within this size limit.

## Internals changes

This PR contains a lot of internal refactoring of the MemoryManager. The key 
change at the heart of this patch is the introduction of a `MemoryPool` class 
(name subject to change) to manage the bookkeeping for a particular category of 
memory (storage, on-heap execution, and off-heap execution). These MemoryPools 
are not fixed-size; they can be dynamically grown and shrunk according to the 
MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, 
proportional to the legacy `[storage|shuffle].memoryFraction`. In the new 
UnifiedMemoryManager, the sizes of these pools are dynamically adjusted 
according to its policies.

There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage 
memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager 
creates two execution pools, one for on-heap memory and one for off-heap. 
Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their 
pooled memory across running tasks (in other words, the 
ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed 
into these ExecutionMemoryPool instances).

I think that this design is substantially easier to understand and reason about 
than the previous design, where most of these responsibilities were handled by 
MemoryManager and its subclasses. To see this, take at look at how simple the 
logic in `UnifiedMemoryManager` has become: it's now very easy to see when 
memory is dynamically shifted between storage and execution.

## TODOs

- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
  - [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.

Author: Josh Rosen 

Closes #9344 from JoshRosen/offheap-memory-accounting.

(cherry picked from commit 30b706b7b36482921ec04145a0121ca147984fa8)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2546c22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2546c22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2546c22

Branch: refs/heads/branch-1.6
Commit: e2546c227b7eca092c6a7cf5f307d152f4dbe800
Parents: 68c1d9f
Author: Josh Rosen 
Authored: Fri Nov 6 18:17:34 2015 -0800
Committer: Josh Rosen 
Committed: Fri Nov 6 18:17:55 2015 -0800

--
 .../org/apache/spark/memory/MemoryConsumer.java |   7 +-
 .../org/apache/spark/memory/MemoryMode.java |  26 ++
 .../apache/spark/memory/TaskMemoryManager.java  |  72 --
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/memory/ExecutionMemoryPool.scala  | 153 
 .../org/apache/spark/memory/MemoryManager.scala | 246 ++-
 .../org/apache/spark/memory/MemoryPool.scala|  71 ++
 .../spark/memory/StaticMemoryManager.scala  |  75 +-
 .../apache/spark/memory/StorageMemoryPool.scala | 138 +++
 .../spark/memory/UnifiedMemoryManager.scala | 138 ++-
 .../scala/org/apache/spark/memory/package.scala |  75 ++
 .../spark/util/collection/Spillable.scala   |   8 +-
 .../spark/memory/TaskMemoryManagerSuite.java|   8 +-
 .../apache/spark/memory/TestMemoryConsumer.java |  10 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |   2 +-
 .../map/AbstractBytesToBytesMapSuite.java   |   4 +-
 .../spark/memory/MemoryManagerSuite.scala   | 104 +---
 .../spark/memory/StaticMemoryManagerSuite.scala |  39 +--
 .../apache/spark/memory/TestMemoryManager.scala |  20 +-
 .../memory/UnifiedMemoryManagerSuite.scala  |  93 +++
 .../spark/storage/BlockManagerSuite.scala   |   2 +-
 21 files changed, 828 insertions(+), 465 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72..36138cc 100644
--- a/core/src/main/java/org/apache/spar

[1/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

2015-11-06 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 68c1d9fa6 -> e2546c227


http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 885c450..54cb28c 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Mockito.when
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
 
-
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
   private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
   maxExecutionMem: Long,
   maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
 val mm = new StaticMemoryManager(
-  conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = 
maxStorageMem, numCores = 1)
+  conf,
+  maxOnHeapExecutionMemory = maxExecutionMem,
+  maxStorageMemory = maxStorageMem,
+  numCores = 1)
 val ms = makeMemoryStore(mm)
 (mm, ms)
   }
 
-  override protected def createMemoryManager(maxMemory: Long): MemoryManager = 
{
+  override protected def createMemoryManager(
+  maxOnHeapExecutionMemory: Long,
+  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
 new StaticMemoryManager(
-  conf,
-  maxExecutionMemory = maxMemory,
+  conf.clone
+.set("spark.memory.fraction", "1")
+.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+  maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
   maxStorageMemory = 0,
   numCores = 1)
   }
 
   test("basic execution memory") {
 val maxExecutionMem = 1000L
+val taskAttemptId = 0L
 val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
 assert(mm.executionMemoryUsed === 0L)
-assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
 assert(mm.executionMemoryUsed === 10L)
-assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 // Acquire up to the max
-assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
 assert(mm.executionMemoryUsed === maxExecutionMem)
-assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
 assert(mm.executionMemoryUsed === maxExecutionMem)
-mm.releaseExecutionMemory(800L)
+mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
 assert(mm.executionMemoryUsed === 200L)
 // Acquire after release
-assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
 assert(mm.executionMemoryUsed === 201L)
 // Release beyond what was acquired
-mm.releaseExecutionMemory(maxExecutionMem)
+mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, 
MemoryMode.ON_HEAP)
 assert(mm.executionMemoryUsed === 0L)
   }
 
@@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
   test("execution and storage isolation") {
 val maxExecutionMem = 200L
 val maxStorageMem = 1000L
+val taskAttemptId = 0L
 val dummyBlock = TestBlockId("ain't nobody love like you do")
 val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
 // Only execution memory should increase
-assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 assert(mm.storageMemoryUsed === 0L)
 assert(mm.executionMemoryUsed === 100L)
-assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 assert(mm.storageMemoryUsed === 0L)
 assert(mm.executionMemoryUsed === 200L)
 // Only storage memory should increase
@@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
 assert(mm.storageMemoryUsed === 50L)
 assert(mm.executionMemoryUsed === 200L)
 // Only execution memory should be released
-mm.releaseExecutionMemory(133L)
+

[1/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

2015-11-06 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 105732dcc -> 30b706b7b


http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 885c450..54cb28c 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Mockito.when
 import org.apache.spark.SparkConf
 import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, 
TestBlockId}
 
-
 class StaticMemoryManagerSuite extends MemoryManagerSuite {
   private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
   private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
   maxExecutionMem: Long,
   maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
 val mm = new StaticMemoryManager(
-  conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = 
maxStorageMem, numCores = 1)
+  conf,
+  maxOnHeapExecutionMemory = maxExecutionMem,
+  maxStorageMemory = maxStorageMem,
+  numCores = 1)
 val ms = makeMemoryStore(mm)
 (mm, ms)
   }
 
-  override protected def createMemoryManager(maxMemory: Long): MemoryManager = 
{
+  override protected def createMemoryManager(
+  maxOnHeapExecutionMemory: Long,
+  maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
 new StaticMemoryManager(
-  conf,
-  maxExecutionMemory = maxMemory,
+  conf.clone
+.set("spark.memory.fraction", "1")
+.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+  maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
   maxStorageMemory = 0,
   numCores = 1)
   }
 
   test("basic execution memory") {
 val maxExecutionMem = 1000L
+val taskAttemptId = 0L
 val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
 assert(mm.executionMemoryUsed === 0L)
-assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 10L)
 assert(mm.executionMemoryUsed === 10L)
-assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 // Acquire up to the max
-assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 890L)
 assert(mm.executionMemoryUsed === maxExecutionMem)
-assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 0L)
 assert(mm.executionMemoryUsed === maxExecutionMem)
-mm.releaseExecutionMemory(800L)
+mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
 assert(mm.executionMemoryUsed === 200L)
 // Acquire after release
-assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 1L)
 assert(mm.executionMemoryUsed === 201L)
 // Release beyond what was acquired
-mm.releaseExecutionMemory(maxExecutionMem)
+mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, 
MemoryMode.ON_HEAP)
 assert(mm.executionMemoryUsed === 0L)
   }
 
@@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite 
{
   test("execution and storage isolation") {
 val maxExecutionMem = 200L
 val maxStorageMem = 1000L
+val taskAttemptId = 0L
 val dummyBlock = TestBlockId("ain't nobody love like you do")
 val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
 // Only execution memory should increase
-assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 assert(mm.storageMemoryUsed === 0L)
 assert(mm.executionMemoryUsed === 100L)
-assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
+assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) 
=== 100L)
 assert(mm.storageMemoryUsed === 0L)
 assert(mm.executionMemoryUsed === 200L)
 // Only storage memory should increase
@@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
 assert(mm.storageMemoryUsed === 50L)
 assert(mm.executionMemoryUsed === 200L)
 // Only execution memory should be released
-mm.releaseExecutionMemory(133L)
+mm.r

[2/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

2015-11-06 Thread joshrosen
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager

In order to lay the groundwork for proper off-heap memory support in SQL / 
Tungsten, we need to extend our MemoryManager to perform bookkeeping for 
off-heap memory.

## User-facing changes

This PR introduces a new configuration, `spark.memory.offHeapSize` (name 
subject to change), which specifies the absolute amount of off-heap memory that 
Spark and Spark SQL can use. If Tungsten is configured to use off-heap 
execution memory for allocating data pages, then all data page allocations must 
fit within this size limit.

## Internals changes

This PR contains a lot of internal refactoring of the MemoryManager. The key 
change at the heart of this patch is the introduction of a `MemoryPool` class 
(name subject to change) to manage the bookkeeping for a particular category of 
memory (storage, on-heap execution, and off-heap execution). These MemoryPools 
are not fixed-size; they can be dynamically grown and shrunk according to the 
MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, 
proportional to the legacy `[storage|shuffle].memoryFraction`. In the new 
UnifiedMemoryManager, the sizes of these pools are dynamically adjusted 
according to its policies.

There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage 
memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager 
creates two execution pools, one for on-heap memory and one for off-heap. 
Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their 
pooled memory across running tasks (in other words, the 
ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed 
into these ExecutionMemoryPool instances).

I think that this design is substantially easier to understand and reason about 
than the previous design, where most of these responsibilities were handled by 
MemoryManager and its subclasses. To see this, take at look at how simple the 
logic in `UnifiedMemoryManager` has become: it's now very easy to see when 
memory is dynamically shifted between storage and execution.

## TODOs

- [x] Fix handful of test failures in the MemoryManagerSuites.
- [x] Fix remaining TODO comments in code.
- [ ] Document new configuration.
- [x] Fix commented-out tests / asserts:
  - [x] UnifiedMemoryManagerSuite.
- [x] Write tests that exercise the new off-heap memory management policies.

Author: Josh Rosen 

Closes #9344 from JoshRosen/offheap-memory-accounting.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30b706b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30b706b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30b706b7

Branch: refs/heads/master
Commit: 30b706b7b36482921ec04145a0121ca147984fa8
Parents: 105732d
Author: Josh Rosen 
Authored: Fri Nov 6 18:17:34 2015 -0800
Committer: Josh Rosen 
Committed: Fri Nov 6 18:17:34 2015 -0800

--
 .../org/apache/spark/memory/MemoryConsumer.java |   7 +-
 .../org/apache/spark/memory/MemoryMode.java |  26 ++
 .../apache/spark/memory/TaskMemoryManager.java  |  72 --
 .../main/scala/org/apache/spark/SparkEnv.scala  |   2 +-
 .../spark/memory/ExecutionMemoryPool.scala  | 153 
 .../org/apache/spark/memory/MemoryManager.scala | 246 ++-
 .../org/apache/spark/memory/MemoryPool.scala|  71 ++
 .../spark/memory/StaticMemoryManager.scala  |  75 +-
 .../apache/spark/memory/StorageMemoryPool.scala | 138 +++
 .../spark/memory/UnifiedMemoryManager.scala | 138 ++-
 .../scala/org/apache/spark/memory/package.scala |  75 ++
 .../spark/util/collection/Spillable.scala   |   8 +-
 .../spark/memory/TaskMemoryManagerSuite.java|   8 +-
 .../apache/spark/memory/TestMemoryConsumer.java |  10 +-
 .../shuffle/sort/UnsafeShuffleWriterSuite.java  |   2 +-
 .../map/AbstractBytesToBytesMapSuite.java   |   4 +-
 .../spark/memory/MemoryManagerSuite.scala   | 104 +---
 .../spark/memory/StaticMemoryManagerSuite.scala |  39 +--
 .../apache/spark/memory/TestMemoryManager.scala |  20 +-
 .../memory/UnifiedMemoryManagerSuite.scala  |  93 +++
 .../spark/storage/BlockManagerSuite.scala   |   2 +-
 21 files changed, 828 insertions(+), 465 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72..36138cc 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ 

spark git commit: [HOTFIX] Fix python tests after #9527

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9bf77d5c3 -> 68c1d9fa6


[HOTFIX] Fix python tests after #9527

#9527 missed updating the python tests.

Author: Michael Armbrust 

Closes #9533 from marmbrus/hotfixTextValue.

(cherry picked from commit 105732dcc6b651b9779f4a5773a759c5b4fbd21d)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68c1d9fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68c1d9fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68c1d9fa

Branch: refs/heads/branch-1.6
Commit: 68c1d9fa6e90636a6b6d2a7b4d2a7e35c936e852
Parents: 9bf77d5
Author: Michael Armbrust 
Authored: Fri Nov 6 17:22:30 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 17:22:39 2015 -0800

--
 python/pyspark/sql/readwriter.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68c1d9fa/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 97bd90c..927f407 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -203,7 +203,7 @@ class DataFrameReader(object):
 
 >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt')
 >>> df.collect()
-[Row(text=u'hello'), Row(text=u'this')]
+[Row(value=u'hello'), Row(value=u'this')]
 """
 return self._df(self._jreader.text(path))
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [HOTFIX] Fix python tests after #9527

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 1c80d66e5 -> 105732dcc


[HOTFIX] Fix python tests after #9527

#9527 missed updating the python tests.

Author: Michael Armbrust 

Closes #9533 from marmbrus/hotfixTextValue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/105732dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/105732dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/105732dc

Branch: refs/heads/master
Commit: 105732dcc6b651b9779f4a5773a759c5b4fbd21d
Parents: 1c80d66
Author: Michael Armbrust 
Authored: Fri Nov 6 17:22:30 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 17:22:30 2015 -0800

--
 python/pyspark/sql/readwriter.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/105732dc/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 97bd90c..927f407 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -203,7 +203,7 @@ class DataFrameReader(object):
 
 >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt')
 >>> df.collect()
-[Row(text=u'hello'), Row(text=u'this')]
+[Row(value=u'hello'), Row(value=u'this')]
 """
 return self._df(self._jreader.text(path))
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11546] Thrift server makes too many logs about result schema

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 162a7704c -> 9bf77d5c3


[SPARK-11546] Thrift server makes too many logs about result schema

SparkExecuteStatementOperation logs result schema for each getNextRowSet() 
calls which is by default every 1000 rows, overwhelming whole log file.

Author: navis.ryu 

Closes #9514 from navis/SPARK-11546.

(cherry picked from commit 1c80d66e52c0bcc4e5adda78b3d8e5bf55e4f128)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bf77d5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bf77d5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bf77d5c

Branch: refs/heads/branch-1.6
Commit: 9bf77d5c37c7f26450a68341ce52c7c9fed3c21f
Parents: 162a770
Author: navis.ryu 
Authored: Fri Nov 6 17:13:46 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 17:13:57 2015 -0800

--
 .../SparkExecuteStatementOperation.scala| 24 +++-
 1 file changed, 13 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9bf77d5c/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 719b03e..82fef92 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -53,6 +53,18 @@ private[hive] class SparkExecuteStatementOperation(
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
+  private lazy val resultSchema: TableSchema = {
+if (result == null || result.queryExecution.analyzed.output.size == 0) {
+  new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
+} else {
+  logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+  val schema = result.queryExecution.analyzed.output.map { attr =>
+new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+  }
+  new TableSchema(schema.asJava)
+}
+  }
+
   def close(): Unit = {
 // RDDs will be cleaned automatically upon garbage collection.
 hiveContext.sparkContext.clearJobGroup()
@@ -120,17 +132,7 @@ private[hive] class SparkExecuteStatementOperation(
 }
   }
 
-  def getResultSetSchema: TableSchema = {
-if (result == null || result.queryExecution.analyzed.output.size == 0) {
-  new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
-} else {
-  logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
-  val schema = result.queryExecution.analyzed.output.map { attr =>
-new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
-  }
-  new TableSchema(schema.asJava)
-}
-  }
+  def getResultSetSchema: TableSchema = resultSchema
 
   override def run(): Unit = {
 setState(OperationState.PENDING)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11546] Thrift server makes too many logs about result schema

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 6d0ead322 -> 1c80d66e5


[SPARK-11546] Thrift server makes too many logs about result schema

SparkExecuteStatementOperation logs result schema for each getNextRowSet() 
calls which is by default every 1000 rows, overwhelming whole log file.

Author: navis.ryu 

Closes #9514 from navis/SPARK-11546.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c80d66e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c80d66e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c80d66e

Branch: refs/heads/master
Commit: 1c80d66e52c0bcc4e5adda78b3d8e5bf55e4f128
Parents: 6d0ead3
Author: navis.ryu 
Authored: Fri Nov 6 17:13:46 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 17:13:46 2015 -0800

--
 .../SparkExecuteStatementOperation.scala| 24 +++-
 1 file changed, 13 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c80d66e/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 719b03e..82fef92 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -53,6 +53,18 @@ private[hive] class SparkExecuteStatementOperation(
   private var dataTypes: Array[DataType] = _
   private var statementId: String = _
 
+  private lazy val resultSchema: TableSchema = {
+if (result == null || result.queryExecution.analyzed.output.size == 0) {
+  new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
+} else {
+  logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+  val schema = result.queryExecution.analyzed.output.map { attr =>
+new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+  }
+  new TableSchema(schema.asJava)
+}
+  }
+
   def close(): Unit = {
 // RDDs will be cleaned automatically upon garbage collection.
 hiveContext.sparkContext.clearJobGroup()
@@ -120,17 +132,7 @@ private[hive] class SparkExecuteStatementOperation(
 }
   }
 
-  def getResultSetSchema: TableSchema = {
-if (result == null || result.queryExecution.analyzed.output.size == 0) {
-  new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
-} else {
-  logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
-  val schema = result.queryExecution.analyzed.output.map { attr =>
-new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
-  }
-  new TableSchema(schema.asJava)
-}
-  }
+  def getResultSetSchema: TableSchema = resultSchema
 
   override def run(): Unit = {
 setState(OperationState.PENDING)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 40a5db561 -> 162a7704c


[SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule

The second PR for SPARK-9241, this adds support for multiple distinct columns 
to the new aggregation code path.

This PR solves the multiple DISTINCT column problem by rewriting these 
Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA 
ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information 
on this. The advantages over the - competing - [first 
PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much 
memory. However, this will multiply the number of input rows by the number of 
distinct clauses (plus one), and puts a lot more memory pressure on the 
aggregation code path itself.

The location of this Rule is a bit funny, and should probably change when the 
old aggregation path is changed.

cc yhuai - Could you also tell me where to add tests for this?

Author: Herman van Hovell 

Closes #9406 from hvanhovell/SPARK-9241-rewriter.

(cherry picked from commit 6d0ead322e72303c6444c6ac641378a4690cde96)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162a7704
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162a7704
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162a7704

Branch: refs/heads/branch-1.6
Commit: 162a7704c90452d77b8a51fc5f5e3ec9bf14aaac
Parents: 40a5db5
Author: Herman van Hovell 
Authored: Fri Nov 6 16:04:20 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 16:04:32 2015 -0800

--
 .../catalyst/expressions/aggregate/Count.scala  |   2 +
 .../catalyst/expressions/aggregate/Utils.scala  | 186 ++-
 .../expressions/aggregate/interfaces.scala  |   6 +
 .../sql/catalyst/optimizer/Optimizer.scala  |   6 +-
 .../catalyst/plans/logical/basicOperators.scala |  80 
 .../spark/sql/execution/SparkStrategies.scala   |   2 +-
 6 files changed, 238 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/162a7704/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
index 54df96c..ec0c8b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
@@ -49,4 +49,6 @@ case class Count(child: Expression) extends 
DeclarativeAggregate {
   )
 
   override val evaluateExpression = Cast(count, LongType)
+
+  override def defaultResult: Option[Literal] = Option(Literal(0L))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/162a7704/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
index 644c621..39010c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.types.{StructType, MapType, ArrayType}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType}
 
 /**
  * Utility functions used by the query planner to convert our plan to new 
aggregation code path.
@@ -41,7 +42,7 @@ object Utils {
 
   private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match {
 case p: Aggregate if supportsGroupingKeySchema(p) =>
-  val converted = p.transformExpressionsDown {
+  val converted = 
MultipleDistinctRewriter.rewrite(p.transformExpressionsDown {
 case expressions.Average(child) =>
   aggregate.AggregateExpression2(
 aggregateFunction = aggregate.Avera

spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 1ab72b086 -> 6d0ead322


[SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule

The second PR for SPARK-9241, this adds support for multiple distinct columns 
to the new aggregation code path.

This PR solves the multiple DISTINCT column problem by rewriting these 
Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA 
ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information 
on this. The advantages over the - competing - [first 
PR](https://github.com/apache/spark/pull/9280) are:
- This can use the faster TungstenAggregate code path.
- It is impossible to OOM due to an ```OpenHashSet``` allocating to much 
memory. However, this will multiply the number of input rows by the number of 
distinct clauses (plus one), and puts a lot more memory pressure on the 
aggregation code path itself.

The location of this Rule is a bit funny, and should probably change when the 
old aggregation path is changed.

cc yhuai - Could you also tell me where to add tests for this?

Author: Herman van Hovell 

Closes #9406 from hvanhovell/SPARK-9241-rewriter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d0ead32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d0ead32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d0ead32

Branch: refs/heads/master
Commit: 6d0ead322e72303c6444c6ac641378a4690cde96
Parents: 1ab72b0
Author: Herman van Hovell 
Authored: Fri Nov 6 16:04:20 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 16:04:20 2015 -0800

--
 .../catalyst/expressions/aggregate/Count.scala  |   2 +
 .../catalyst/expressions/aggregate/Utils.scala  | 186 ++-
 .../expressions/aggregate/interfaces.scala  |   6 +
 .../sql/catalyst/optimizer/Optimizer.scala  |   6 +-
 .../catalyst/plans/logical/basicOperators.scala |  80 
 .../spark/sql/execution/SparkStrategies.scala   |   2 +-
 6 files changed, 238 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6d0ead32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
index 54df96c..ec0c8b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala
@@ -49,4 +49,6 @@ case class Count(child: Expression) extends 
DeclarativeAggregate {
   )
 
   override val evaluateExpression = Cast(count, LongType)
+
+  override def defaultResult: Option[Literal] = Option(Literal(0L))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6d0ead32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
index 644c621..39010c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.types.{StructType, MapType, ArrayType}
+import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType}
 
 /**
  * Utility functions used by the query planner to convert our plan to new 
aggregation code path.
@@ -41,7 +42,7 @@ object Utils {
 
   private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match {
 case p: Aggregate if supportsGroupingKeySchema(p) =>
-  val converted = p.transformExpressionsDown {
+  val converted = 
MultipleDistinctRewriter.rewrite(p.transformExpressionsDown {
 case expressions.Average(child) =>
   aggregate.AggregateExpression2(
 aggregateFunction = aggregate.Average(child),
@@ -144,7 +145,8 @@ object Utils {
 aggregateFunction = aggregate.VarianceSamp(child),
 

spark git commit: [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 02748c953 -> 40a5db561


[SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…

…ithinPartitions.

Author: Nong Li 

Closes #9504 from nongli/spark-11410.

(cherry picked from commit 1ab72b08601a1c8a674bdd3fab84d9804899b2c7)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40a5db56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40a5db56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40a5db56

Branch: refs/heads/branch-1.6
Commit: 40a5db56169b87eecf574cf8e15e89caf4836ee4
Parents: 02748c9
Author: Nong Li 
Authored: Fri Nov 6 15:48:20 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 15:48:45 2015 -0800

--
 python/pyspark/sql/dataframe.py | 117 ++-
 1 file changed, 101 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40a5db56/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 765a451..b97c94d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -423,6 +423,67 @@ class DataFrame(object):
 return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
 
 @since(1.3)
+def repartition(self, numPartitions, *cols):
+"""
+Returns a new :class:`DataFrame` partitioned by the given partitioning 
expressions. The
+resulting DataFrame is hash partitioned.
+
+``numPartitions`` can be an int to specify the target number of 
partitions or a Column.
+If it is a Column, it will be used as the first partitioning column. 
If not specified,
+the default number of partitions is used.
+
+.. versionchanged:: 1.6
+   Added optional arguments to specify the partitioning columns. Also 
made numPartitions
+   optional if partitioning columns are specified.
+
+>>> df.repartition(10).rdd.getNumPartitions()
+10
+>>> data = df.unionAll(df).repartition("age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  2|Alice|
+|  5|  Bob|
+|  5|  Bob|
++---+-+
+>>> data = data.repartition(7, "age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  5|  Bob|
+|  5|  Bob|
+|  2|Alice|
+|  2|Alice|
++---+-+
+>>> data.rdd.getNumPartitions()
+7
+>>> data = data.repartition("name", "age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  5|  Bob|
+|  5|  Bob|
+|  2|Alice|
+|  2|Alice|
++---+-+
+"""
+if isinstance(numPartitions, int):
+if len(cols) == 0:
+return DataFrame(self._jdf.repartition(numPartitions), 
self.sql_ctx)
+else:
+return DataFrame(
+self._jdf.repartition(numPartitions, self._jcols(*cols)), 
self.sql_ctx)
+elif isinstance(numPartitions, (basestring, Column)):
+cols = (numPartitions, ) + cols
+return DataFrame(self._jdf.repartition(self._jcols(*cols)), 
self.sql_ctx)
+else:
+raise TypeError("numPartitions should be an int or Column")
+
+@since(1.3)
 def distinct(self):
 """Returns a new :class:`DataFrame` containing the distinct rows in 
this :class:`DataFrame`.
 
@@ -589,6 +650,26 @@ class DataFrame(object):
 jdf = self._jdf.join(other._jdf, on._jc, how)
 return DataFrame(jdf, self.sql_ctx)
 
+@since(1.6)
+def sortWithinPartitions(self, *cols, **kwargs):
+"""Returns a new :class:`DataFrame` with each partition sorted by the 
specified column(s).
+
+:param cols: list of :class:`Column` or column names to sort by.
+:param ascending: boolean or list of boolean (default True).
+Sort ascending vs. descending. Specify list for multiple sort 
orders.
+If a list is specified, length of the list must equal length of 
the `cols`.
+
+>>> df.sortWithinPartitions("age", ascending=False).show()
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  5|  Bob|
++---+-+
+"""
+jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs))
+return DataFrame(jdf, self.sql_ctx)
+
 @ignore_unicode_prefix
 @since(1.3)
 def sort(self, *cols, **kwargs):
@@ -613,22 +694,7 @@ class DataFrame(object):
 >>> df.orderBy(["age", "name"], ascending=[

spark git commit: [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 7e9a9e603 -> 1ab72b086


[SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…

…ithinPartitions.

Author: Nong Li 

Closes #9504 from nongli/spark-11410.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ab72b08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ab72b08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ab72b08

Branch: refs/heads/master
Commit: 1ab72b08601a1c8a674bdd3fab84d9804899b2c7
Parents: 7e9a9e6
Author: Nong Li 
Authored: Fri Nov 6 15:48:20 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 15:48:20 2015 -0800

--
 python/pyspark/sql/dataframe.py | 117 ++-
 1 file changed, 101 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ab72b08/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 765a451..b97c94d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -423,6 +423,67 @@ class DataFrame(object):
 return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx)
 
 @since(1.3)
+def repartition(self, numPartitions, *cols):
+"""
+Returns a new :class:`DataFrame` partitioned by the given partitioning 
expressions. The
+resulting DataFrame is hash partitioned.
+
+``numPartitions`` can be an int to specify the target number of 
partitions or a Column.
+If it is a Column, it will be used as the first partitioning column. 
If not specified,
+the default number of partitions is used.
+
+.. versionchanged:: 1.6
+   Added optional arguments to specify the partitioning columns. Also 
made numPartitions
+   optional if partitioning columns are specified.
+
+>>> df.repartition(10).rdd.getNumPartitions()
+10
+>>> data = df.unionAll(df).repartition("age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  2|Alice|
+|  5|  Bob|
+|  5|  Bob|
++---+-+
+>>> data = data.repartition(7, "age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  5|  Bob|
+|  5|  Bob|
+|  2|Alice|
+|  2|Alice|
++---+-+
+>>> data.rdd.getNumPartitions()
+7
+>>> data = data.repartition("name", "age")
+>>> data.show()
++---+-+
+|age| name|
++---+-+
+|  5|  Bob|
+|  5|  Bob|
+|  2|Alice|
+|  2|Alice|
++---+-+
+"""
+if isinstance(numPartitions, int):
+if len(cols) == 0:
+return DataFrame(self._jdf.repartition(numPartitions), 
self.sql_ctx)
+else:
+return DataFrame(
+self._jdf.repartition(numPartitions, self._jcols(*cols)), 
self.sql_ctx)
+elif isinstance(numPartitions, (basestring, Column)):
+cols = (numPartitions, ) + cols
+return DataFrame(self._jdf.repartition(self._jcols(*cols)), 
self.sql_ctx)
+else:
+raise TypeError("numPartitions should be an int or Column")
+
+@since(1.3)
 def distinct(self):
 """Returns a new :class:`DataFrame` containing the distinct rows in 
this :class:`DataFrame`.
 
@@ -589,6 +650,26 @@ class DataFrame(object):
 jdf = self._jdf.join(other._jdf, on._jc, how)
 return DataFrame(jdf, self.sql_ctx)
 
+@since(1.6)
+def sortWithinPartitions(self, *cols, **kwargs):
+"""Returns a new :class:`DataFrame` with each partition sorted by the 
specified column(s).
+
+:param cols: list of :class:`Column` or column names to sort by.
+:param ascending: boolean or list of boolean (default True).
+Sort ascending vs. descending. Specify list for multiple sort 
orders.
+If a list is specified, length of the list must equal length of 
the `cols`.
+
+>>> df.sortWithinPartitions("age", ascending=False).show()
++---+-+
+|age| name|
++---+-+
+|  2|Alice|
+|  5|  Bob|
++---+-+
+"""
+jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs))
+return DataFrame(jdf, self.sql_ctx)
+
 @ignore_unicode_prefix
 @since(1.3)
 def sort(self, *cols, **kwargs):
@@ -613,22 +694,7 @@ class DataFrame(object):
 >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
 [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
 """
-if n

spark git commit: [SPARK-11269][SQL] Java API support & test cases for Dataset

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b58f1ce5b -> 02748c953


[SPARK-11269][SQL] Java API support & test cases for Dataset

This simply brings https://github.com/apache/spark/pull/9358 up-to-date.

Author: Wenchen Fan 
Author: Reynold Xin 

Closes #9528 from rxin/dataset-java.

(cherry picked from commit 7e9a9e603abce8689938bdd62d04b29299644aa4)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02748c95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02748c95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02748c95

Branch: refs/heads/branch-1.6
Commit: 02748c953cdc85fda054e366b095225f906b4548
Parents: b58f1ce
Author: Wenchen Fan 
Authored: Fri Nov 6 15:37:07 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 15:37:16 2015 -0800

--
 .../spark/sql/catalyst/encoders/Encoder.scala   | 123 ++-
 .../sql/catalyst/expressions/objects.scala  |  21 ++
 .../scala/org/apache/spark/sql/Dataset.scala| 126 ++-
 .../org/apache/spark/sql/DatasetHolder.scala|   6 +-
 .../org/apache/spark/sql/GroupedDataset.scala   |  17 +
 .../scala/org/apache/spark/sql/SQLContext.scala |   4 +
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 357 +++
 .../spark/sql/DatasetPrimitiveSuite.scala   |   2 +-
 8 files changed, 644 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/02748c95/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
index 329a132..f05e1828 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.catalyst.encoders
 
-
-
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.sql.types.{DataType, ObjectType, StructField, 
StructType}
+import org.apache.spark.sql.catalyst.expressions._
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -37,3 +37,120 @@ trait Encoder[T] extends Serializable {
   /** A ClassTag that can be used to construct and Array to contain a 
collection of `T`. */
   def clsTag: ClassTag[T]
 }
+
+object Encoder {
+  import scala.reflect.runtime.universe._
+
+  def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
+  def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
+  def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
+  def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
+  def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
+  def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
+  def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
+  def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
+
+  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
+tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  }
+
+  def tuple[T1, T2, T3](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  }
+
+  def tuple[T1, T2, T3, T4](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3],
+  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  }
+
+  def tuple[T1, T2, T3, T4, T5](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3],
+  enc4: Encoder[T4],
+  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
+tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
+  }
+
+  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
+assert(encoders.length > 1)
+// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
+
assert(encoders.forall(_.constructExpression.find(_.isInstanceOf[Attribute]).isEmpty))
+
+val schema = StructType(encoders.zipWithIndex.map {
+  case (e, i) => StructField(s"_${i + 1}", if

spark git commit: [SPARK-11269][SQL] Java API support & test cases for Dataset

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f6680cdc5 -> 7e9a9e603


[SPARK-11269][SQL] Java API support & test cases for Dataset

This simply brings https://github.com/apache/spark/pull/9358 up-to-date.

Author: Wenchen Fan 
Author: Reynold Xin 

Closes #9528 from rxin/dataset-java.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e9a9e60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e9a9e60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e9a9e60

Branch: refs/heads/master
Commit: 7e9a9e603abce8689938bdd62d04b29299644aa4
Parents: f6680cd
Author: Wenchen Fan 
Authored: Fri Nov 6 15:37:07 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 15:37:07 2015 -0800

--
 .../spark/sql/catalyst/encoders/Encoder.scala   | 123 ++-
 .../sql/catalyst/expressions/objects.scala  |  21 ++
 .../scala/org/apache/spark/sql/Dataset.scala| 126 ++-
 .../org/apache/spark/sql/DatasetHolder.scala|   6 +-
 .../org/apache/spark/sql/GroupedDataset.scala   |  17 +
 .../scala/org/apache/spark/sql/SQLContext.scala |   4 +
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 357 +++
 .../spark/sql/DatasetPrimitiveSuite.scala   |   2 +-
 8 files changed, 644 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e9a9e60/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
index 329a132..f05e1828 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.catalyst.encoders
 
-
-
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+import org.apache.spark.sql.types.{DataType, ObjectType, StructField, 
StructType}
+import org.apache.spark.sql.catalyst.expressions._
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -37,3 +37,120 @@ trait Encoder[T] extends Serializable {
   /** A ClassTag that can be used to construct and Array to contain a 
collection of `T`. */
   def clsTag: ClassTag[T]
 }
+
+object Encoder {
+  import scala.reflect.runtime.universe._
+
+  def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
+  def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
+  def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
+  def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
+  def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
+  def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
+  def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
+  def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
+
+  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
+tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  }
+
+  def tuple[T1, T2, T3](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  }
+
+  def tuple[T1, T2, T3, T4](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3],
+  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  }
+
+  def tuple[T1, T2, T3, T4, T5](
+  enc1: Encoder[T1],
+  enc2: Encoder[T2],
+  enc3: Encoder[T3],
+  enc4: Encoder[T4],
+  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
+tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
+  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
+  }
+
+  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
+assert(encoders.length > 1)
+// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
+
assert(encoders.forall(_.constructExpression.find(_.isInstanceOf[Attribute]).isEmpty))
+
+val schema = StructType(encoders.zipWithIndex.map {
+  case (e, i) => StructField(s"_${i + 1}", if (e.flat) 
e.schema.head.dataType else e.schema)
+})
+
+val cls = 
Utils.getContextOrSparkClassLoa

spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work

2015-11-06 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 dc058f2ff -> 8fb6696cd


[SPARK-11555] spark on yarn spark-class --num-workers doesn't work

I tested the various options with both spark-submit and spark-class of 
specifying number of executors in both client and cluster mode where it applied.

--num-workers, --num-executors, spark.executor.instances, 
SPARK_EXECUTOR_INSTANCES, default nothing supplied

Author: Thomas Graves 

Closes #9523 from tgravescs/SPARK-11555.

(cherry picked from commit f6680cdc5d2912dea9768ef5c3e2cc101b06daf8)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fb6696c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fb6696c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fb6696c

Branch: refs/heads/branch-1.5
Commit: 8fb6696cd6e066957e34e1123df5691d8e4682d2
Parents: dc058f2
Author: Thomas Graves 
Authored: Fri Nov 6 15:24:33 2015 -0800
Committer: Marcelo Vanzin 
Committed: Fri Nov 6 15:24:58 2015 -0800

--
 .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala  | 2 +-
 .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +--
 2 files changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8fb6696c/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1165061..a9f4374 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   .orNull
 // If dynamic allocation is enabled, start at the configured initial 
number of executors.
 // Default to minExecutors if no initialExecutors is set.
-numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
 principal = Option(principal)
   .orElse(sparkConf.getOption("spark.yarn.principal"))
   .orNull

http://git-wip-us.apache.org/repos/asf/spark/blob/8fb6696c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 991c909..73023d7 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -355,8 +355,11 @@ object YarnSparkHadoopUtil {
   /**
* Getting the initial target number of executors depends on whether dynamic 
allocation is
* enabled.
+   * If not using dynamic allocation it gets the number of executors reqeusted 
by the user.
*/
-  def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+  def getInitialTargetExecutorNumber(
+  conf: SparkConf,
+  numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
 if (Utils.isDynamicAllocationEnabled(conf)) {
   val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
   val initialNumExecutors =
@@ -369,7 +372,7 @@ object YarnSparkHadoopUtil {
   initialNumExecutors
 } else {
   val targetNumExecutors =
-
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
   // System property can override environment variable.
   conf.getInt("spark.executor.instances", targetNumExecutors)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work

2015-11-06 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e7e3bfda3 -> b58f1ce5b


[SPARK-11555] spark on yarn spark-class --num-workers doesn't work

I tested the various options with both spark-submit and spark-class of 
specifying number of executors in both client and cluster mode where it applied.

--num-workers, --num-executors, spark.executor.instances, 
SPARK_EXECUTOR_INSTANCES, default nothing supplied

Author: Thomas Graves 

Closes #9523 from tgravescs/SPARK-11555.

(cherry picked from commit f6680cdc5d2912dea9768ef5c3e2cc101b06daf8)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b58f1ce5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b58f1ce5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b58f1ce5

Branch: refs/heads/branch-1.6
Commit: b58f1ce5b768f77b86626767f9366c00d2232b04
Parents: e7e3bfd
Author: Thomas Graves 
Authored: Fri Nov 6 15:24:33 2015 -0800
Committer: Marcelo Vanzin 
Committed: Fri Nov 6 15:24:44 2015 -0800

--
 .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala  | 2 +-
 .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +--
 2 files changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b58f1ce5/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1165061..a9f4374 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   .orNull
 // If dynamic allocation is enabled, start at the configured initial 
number of executors.
 // Default to minExecutors if no initialExecutors is set.
-numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
 principal = Option(principal)
   .orElse(sparkConf.getOption("spark.yarn.principal"))
   .orNull

http://git-wip-us.apache.org/repos/asf/spark/blob/b58f1ce5/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 561ad79..a290ebe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -392,8 +392,11 @@ object YarnSparkHadoopUtil {
   /**
* Getting the initial target number of executors depends on whether dynamic 
allocation is
* enabled.
+   * If not using dynamic allocation it gets the number of executors reqeusted 
by the user.
*/
-  def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+  def getInitialTargetExecutorNumber(
+  conf: SparkConf,
+  numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
 if (Utils.isDynamicAllocationEnabled(conf)) {
   val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
   val initialNumExecutors =
@@ -406,7 +409,7 @@ object YarnSparkHadoopUtil {
   initialNumExecutors
 } else {
   val targetNumExecutors =
-
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
   // System property can override environment variable.
   conf.getInt("spark.executor.instances", targetNumExecutors)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work

2015-11-06 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master c447c9d54 -> f6680cdc5


[SPARK-11555] spark on yarn spark-class --num-workers doesn't work

I tested the various options with both spark-submit and spark-class of 
specifying number of executors in both client and cluster mode where it applied.

--num-workers, --num-executors, spark.executor.instances, 
SPARK_EXECUTOR_INSTANCES, default nothing supplied

Author: Thomas Graves 

Closes #9523 from tgravescs/SPARK-11555.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6680cdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6680cdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6680cdc

Branch: refs/heads/master
Commit: f6680cdc5d2912dea9768ef5c3e2cc101b06daf8
Parents: c447c9d
Author: Thomas Graves 
Authored: Fri Nov 6 15:24:33 2015 -0800
Committer: Marcelo Vanzin 
Committed: Fri Nov 6 15:24:33 2015 -0800

--
 .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala  | 2 +-
 .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +--
 2 files changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6680cdc/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 1165061..a9f4374 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   .orNull
 // If dynamic allocation is enabled, start at the configured initial 
number of executors.
 // Default to minExecutors if no initialExecutors is set.
-numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+numExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors)
 principal = Option(principal)
   .orElse(sparkConf.getOption("spark.yarn.principal"))
   .orNull

http://git-wip-us.apache.org/repos/asf/spark/blob/f6680cdc/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
--
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 561ad79..a290ebe 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -392,8 +392,11 @@ object YarnSparkHadoopUtil {
   /**
* Getting the initial target number of executors depends on whether dynamic 
allocation is
* enabled.
+   * If not using dynamic allocation it gets the number of executors reqeusted 
by the user.
*/
-  def getInitialTargetExecutorNumber(conf: SparkConf): Int = {
+  def getInitialTargetExecutorNumber(
+  conf: SparkConf,
+  numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
 if (Utils.isDynamicAllocationEnabled(conf)) {
   val minNumExecutors = 
conf.getInt("spark.dynamicAllocation.minExecutors", 0)
   val initialNumExecutors =
@@ -406,7 +409,7 @@ object YarnSparkHadoopUtil {
   initialNumExecutors
 } else {
   val targetNumExecutors =
-
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS)
+
sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors)
   // System property can override environment variable.
   conf.getInt("spark.executor.instances", targetNumExecutors)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11217][ML] save/load for non-meta estimators and transformers

2015-11-06 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 52e921c7c -> e7e3bfda3


[SPARK-11217][ML] save/load for non-meta estimators and transformers

This PR implements the default save/load for non-meta estimators and 
transformers using the JSON serialization of param values. The saved metadata 
includes:

* class name
* uid
* timestamp
* paramMap

The save/load interface is similar to DataFrames. We use the current active 
context by default, which should be sufficient for most use cases.

~~~scala
instance.save("path")
instance.write.context(sqlContext).overwrite().save("path")

Instance.load("path")
~~~

The param handling is different from the design doc. We didn't save default and 
user-set params separately, and when we load it back, all parameters are 
user-set. This does cause issues. But it also cause other issues if we modify 
the default params.

TODOs:

* [x] Java test
* [ ] a follow-up PR to implement default save/load for all non-meta estimators 
and transformers

cc jkbradley

Author: Xiangrui Meng 

Closes #9454 from mengxr/SPARK-11217.

(cherry picked from commit c447c9d54603890db7399fb80adc9fae40b71f64)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7e3bfda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7e3bfda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7e3bfda

Branch: refs/heads/branch-1.6
Commit: e7e3bfda315eb7809c8bdffa61a30b70a680611c
Parents: 52e921c
Author: Xiangrui Meng 
Authored: Fri Nov 6 14:51:03 2015 -0800
Committer: Joseph K. Bradley 
Committed: Fri Nov 6 14:51:15 2015 -0800

--
 .../org/apache/spark/ml/feature/Binarizer.scala |  11 +-
 .../org/apache/spark/ml/param/params.scala  |   2 +-
 .../org/apache/spark/ml/util/ReadWrite.scala| 220 +++
 .../ml/util/JavaDefaultReadWriteSuite.java  |  74 +++
 .../spark/ml/feature/BinarizerSuite.scala   |  11 +-
 .../spark/ml/util/DefaultReadWriteTest.scala| 110 ++
 .../apache/spark/ml/util/TempDirectory.scala|  45 
 7 files changed, 469 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index edad754..e5c2557 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.BinaryAttribute
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
-import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
+import org.apache.spark.ml.util._
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DoubleType, StructType}
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DoubleType, StructType}
  */
 @Experimental
 final class Binarizer(override val uid: String)
-  extends Transformer with HasInputCol with HasOutputCol {
+  extends Transformer with Writable with HasInputCol with HasOutputCol {
 
   def this() = this(Identifiable.randomUID("binarizer"))
 
@@ -86,4 +86,11 @@ final class Binarizer(override val uid: String)
   }
 
   override def copy(extra: ParamMap): Binarizer = defaultCopy(extra)
+
+  override def write: Writer = new DefaultParamsWriter(this)
+}
+
+object Binarizer extends Readable[Binarizer] {
+
+  override def read: Reader[Binarizer] = new DefaultParamsReader[Binarizer]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala 
b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
index 8361406..c932570 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
@@ -592,7 +592,7 @@ trait Params extends Identifiable with Serializable {
   /**
* Sets a parameter in the embedded param map.
*/
-  protected final def set[T](param: Param[T], value: T): this.type = {
+  final def set[T](param: Param[T], value: T): this.type = {
 set(param -> value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWr

spark git commit: [SPARK-11217][ML] save/load for non-meta estimators and transformers

2015-11-06 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 3a652f691 -> c447c9d54


[SPARK-11217][ML] save/load for non-meta estimators and transformers

This PR implements the default save/load for non-meta estimators and 
transformers using the JSON serialization of param values. The saved metadata 
includes:

* class name
* uid
* timestamp
* paramMap

The save/load interface is similar to DataFrames. We use the current active 
context by default, which should be sufficient for most use cases.

~~~scala
instance.save("path")
instance.write.context(sqlContext).overwrite().save("path")

Instance.load("path")
~~~

The param handling is different from the design doc. We didn't save default and 
user-set params separately, and when we load it back, all parameters are 
user-set. This does cause issues. But it also cause other issues if we modify 
the default params.

TODOs:

* [x] Java test
* [ ] a follow-up PR to implement default save/load for all non-meta estimators 
and transformers

cc jkbradley

Author: Xiangrui Meng 

Closes #9454 from mengxr/SPARK-11217.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c447c9d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c447c9d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c447c9d5

Branch: refs/heads/master
Commit: c447c9d54603890db7399fb80adc9fae40b71f64
Parents: 3a652f6
Author: Xiangrui Meng 
Authored: Fri Nov 6 14:51:03 2015 -0800
Committer: Joseph K. Bradley 
Committed: Fri Nov 6 14:51:03 2015 -0800

--
 .../org/apache/spark/ml/feature/Binarizer.scala |  11 +-
 .../org/apache/spark/ml/param/params.scala  |   2 +-
 .../org/apache/spark/ml/util/ReadWrite.scala| 220 +++
 .../ml/util/JavaDefaultReadWriteSuite.java  |  74 +++
 .../spark/ml/feature/BinarizerSuite.scala   |  11 +-
 .../spark/ml/util/DefaultReadWriteTest.scala| 110 ++
 .../apache/spark/ml/util/TempDirectory.scala|  45 
 7 files changed, 469 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index edad754..e5c2557 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.attribute.BinaryAttribute
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
-import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
+import org.apache.spark.ml.util._
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DoubleType, StructType}
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DoubleType, StructType}
  */
 @Experimental
 final class Binarizer(override val uid: String)
-  extends Transformer with HasInputCol with HasOutputCol {
+  extends Transformer with Writable with HasInputCol with HasOutputCol {
 
   def this() = this(Identifiable.randomUID("binarizer"))
 
@@ -86,4 +86,11 @@ final class Binarizer(override val uid: String)
   }
 
   override def copy(extra: ParamMap): Binarizer = defaultCopy(extra)
+
+  override def write: Writer = new DefaultParamsWriter(this)
+}
+
+object Binarizer extends Readable[Binarizer] {
+
+  override def read: Reader[Binarizer] = new DefaultParamsReader[Binarizer]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala 
b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
index 8361406..c932570 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
@@ -592,7 +592,7 @@ trait Params extends Identifiable with Serializable {
   /**
* Sets a parameter in the embedded param map.
*/
-  protected final def set[T](param: Param[T], value: T): this.type = {
+  final def set[T](param: Param[T], value: T): this.type = {
 set(param -> value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
new file mode 100644
index 000..e

spark git commit: [SPARK-11561][SQL] Rename text data source's column name to value.

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 efa1e4a25 -> 52e921c7c


[SPARK-11561][SQL] Rename text data source's column name to value.

Author: Reynold Xin 

Closes #9527 from rxin/SPARK-11561.

(cherry picked from commit 3a652f691b220fada0286f8d0a562c5657973d4d)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52e921c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52e921c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52e921c7

Branch: refs/heads/branch-1.6
Commit: 52e921c7cc6ba6dddb923221c42f52eb4cd98f0f
Parents: efa1e4a
Author: Reynold Xin 
Authored: Fri Nov 6 14:47:41 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 14:47:49 2015 -0800

--
 .../spark/sql/execution/datasources/text/DefaultSource.scala   | 6 ++
 .../spark/sql/execution/datasources/text/TextSuite.scala   | 2 +-
 2 files changed, 3 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52e921c7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 52c4421..4b8b8e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -30,14 +30,12 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, 
BufferHolder}
-import org.apache.spark.sql.columnar.MutableUnsafeRow
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -78,7 +76,7 @@ private[sql] class TextRelation(
   extends HadoopFsRelation(maybePartitionSpec) {
 
   /** Data schema is always a single column, named "text". */
-  override def dataSchema: StructType = new StructType().add("text", 
StringType)
+  override def dataSchema: StructType = new StructType().add("value", 
StringType)
 
   /** This is an internal data source that outputs internal row format. */
   override val needConversion: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/52e921c7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 0a2306c..914e516 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -65,7 +65,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   /** Verifies data and schema. */
   private def verifyFrame(df: DataFrame): Unit = {
 // schema
-assert(df.schema == new StructType().add("text", StringType))
+assert(df.schema == new StructType().add("value", StringType))
 
 // verify content
 val data = df.collect()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11561][SQL] Rename text data source's column name to value.

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f328fedaf -> 3a652f691


[SPARK-11561][SQL] Rename text data source's column name to value.

Author: Reynold Xin 

Closes #9527 from rxin/SPARK-11561.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a652f69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a652f69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a652f69

Branch: refs/heads/master
Commit: 3a652f691b220fada0286f8d0a562c5657973d4d
Parents: f328fed
Author: Reynold Xin 
Authored: Fri Nov 6 14:47:41 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 14:47:41 2015 -0800

--
 .../spark/sql/execution/datasources/text/DefaultSource.scala   | 6 ++
 .../spark/sql/execution/datasources/text/TextSuite.scala   | 2 +-
 2 files changed, 3 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3a652f69/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 52c4421..4b8b8e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -30,14 +30,12 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, 
BufferHolder}
-import org.apache.spark.sql.columnar.MutableUnsafeRow
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 
 /**
@@ -78,7 +76,7 @@ private[sql] class TextRelation(
   extends HadoopFsRelation(maybePartitionSpec) {
 
   /** Data schema is always a single column, named "text". */
-  override def dataSchema: StructType = new StructType().add("text", 
StringType)
+  override def dataSchema: StructType = new StructType().add("value", 
StringType)
 
   /** This is an internal data source that outputs internal row format. */
   override val needConversion: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/3a652f69/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 0a2306c..914e516 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -65,7 +65,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
   /** Verifies data and schema. */
   private def verifyFrame(df: DataFrame): Unit = {
 // schema
-assert(df.schema == new StructType().add("text", StringType))
+assert(df.schema == new StructType().add("value", StringType))
 
 // verify content
 val data = df.collect()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11450] [SQL] Add Unsafe Row processing to Expand

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7755b50b4 -> efa1e4a25


[SPARK-11450] [SQL] Add Unsafe Row processing to Expand

This PR enables the Expand operator to process and produce Unsafe Rows.

Author: Herman van Hovell 

Closes #9414 from hvanhovell/SPARK-11450.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efa1e4a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efa1e4a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efa1e4a2

Branch: refs/heads/branch-1.6
Commit: efa1e4a25cd6f4311faf9fc6c551ef1c71e2227b
Parents: 7755b50
Author: Herman van Hovell 
Authored: Fri Nov 6 12:21:53 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 12:25:10 2015 -0800

--
 .../sql/catalyst/expressions/Projection.scala   |  6 ++-
 .../org/apache/spark/sql/execution/Expand.scala | 19 ---
 .../spark/sql/execution/basicOperators.scala|  8 +--
 .../spark/sql/execution/ExpandSuite.scala   | 54 
 4 files changed, 73 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index a6fe730..79dabe8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -128,7 +128,11 @@ object UnsafeProjection {
* Returns an UnsafeProjection for given sequence of Expressions (bounded).
*/
   def create(exprs: Seq[Expression]): UnsafeProjection = {
-GenerateUnsafeProjection.generate(exprs)
+val unsafeExprs = exprs.map(_ transform {
+  case CreateStruct(children) => CreateStructUnsafe(children)
+  case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+})
+GenerateUnsafeProjection.generate(unsafeExprs)
   }
 
   def create(expr: Expression): UnsafeProjection = create(Seq(expr))

http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index a458881..55e9576 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -41,14 +41,21 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
+  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
+  override def canProcessUnsafeRows: Boolean = true
+  override def canProcessSafeRows: Boolean = true
+
+  private[this] val projection = {
+if (outputsUnsafeRows) {
+  (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
+} else {
+  (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
+}
+  }
+
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
 child.execute().mapPartitions { iter =>
-  // TODO Move out projection objects creation and transfer to
-  // workers via closure. However we can't assume the Projection
-  // is serializable because of the code gen, so we have to
-  // create the projections within each of the partition processing.
-  val groups = projections.map(ee => newProjection(ee, 
child.output)).toArray
-
+  val groups = projections.map(projection).toArray
   new Iterator[InternalRow] {
 private[this] var result: InternalRow = _
 private[this] var idx = -1  // -1 means the initial state

http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index d5a803f..799650a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -67,16 +67,10 @@ case class TungstenProject(projectList: 
Seq[NamedExpression], child: SparkPlan)
 
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
-  /** Rewrite the

spark git commit: [SPARK-11450] [SQL] Add Unsafe Row processing to Expand

2015-11-06 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 49f1a8203 -> f328fedaf


[SPARK-11450] [SQL] Add Unsafe Row processing to Expand

This PR enables the Expand operator to process and produce Unsafe Rows.

Author: Herman van Hovell 

Closes #9414 from hvanhovell/SPARK-11450.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f328feda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f328feda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f328feda

Branch: refs/heads/master
Commit: f328fedafd7bd084470a5e402de0429b5b7f8cd7
Parents: 49f1a82
Author: Herman van Hovell 
Authored: Fri Nov 6 12:21:53 2015 -0800
Committer: Michael Armbrust 
Committed: Fri Nov 6 12:21:53 2015 -0800

--
 .../sql/catalyst/expressions/Projection.scala   |  6 ++-
 .../org/apache/spark/sql/execution/Expand.scala | 19 ---
 .../spark/sql/execution/basicOperators.scala|  8 +--
 .../spark/sql/execution/ExpandSuite.scala   | 54 
 4 files changed, 73 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index a6fe730..79dabe8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -128,7 +128,11 @@ object UnsafeProjection {
* Returns an UnsafeProjection for given sequence of Expressions (bounded).
*/
   def create(exprs: Seq[Expression]): UnsafeProjection = {
-GenerateUnsafeProjection.generate(exprs)
+val unsafeExprs = exprs.map(_ transform {
+  case CreateStruct(children) => CreateStructUnsafe(children)
+  case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
+})
+GenerateUnsafeProjection.generate(unsafeExprs)
   }
 
   def create(expr: Expression): UnsafeProjection = create(Seq(expr))

http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index a458881..55e9576 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -41,14 +41,21 @@ case class Expand(
   // as UNKNOWN partitioning
   override def outputPartitioning: Partitioning = UnknownPartitioning(0)
 
+  override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
+  override def canProcessUnsafeRows: Boolean = true
+  override def canProcessSafeRows: Boolean = true
+
+  private[this] val projection = {
+if (outputsUnsafeRows) {
+  (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
+} else {
+  (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
+}
+  }
+
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
 child.execute().mapPartitions { iter =>
-  // TODO Move out projection objects creation and transfer to
-  // workers via closure. However we can't assume the Projection
-  // is serializable because of the code gen, so we have to
-  // create the projections within each of the partition processing.
-  val groups = projections.map(ee => newProjection(ee, 
child.output)).toArray
-
+  val groups = projections.map(projection).toArray
   new Iterator[InternalRow] {
 private[this] var result: InternalRow = _
 private[this] var idx = -1  // -1 means the initial state

http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index d5a803f..799650a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -67,16 +67,10 @@ case class TungstenProject(projectList: 
Seq[NamedExpression], child: SparkPlan)
 
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
-  /** Rewrite the project

spark git commit: [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 62bb29077 -> 49f1a8203


[SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits

https://issues.apache.org/jira/browse/SPARK-10116

This is really trivial, just happened to notice it -- if 
`XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as 
the comment implies), it needs to do something for the conversion to `long`.

mengxr mkolod

Author: Imran Rashid 

Closes #8314 from squito/SPARK-10116.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49f1a820
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49f1a820
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49f1a820

Branch: refs/heads/master
Commit: 49f1a820372d1cba41f3f00d07eb5728f2ed6705
Parents: 62bb290
Author: Imran Rashid 
Authored: Fri Nov 6 20:06:24 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 20:06:24 2015 +

--
 R/pkg/inst/tests/test_sparkSQL.R|  8 +--
 .../spark/util/random/XORShiftRandom.scala  |  6 ++-
 .../java/org/apache/spark/JavaAPISuite.java | 20 +---
 .../spark/rdd/PairRDDFunctionsSuite.scala   | 52 ++--
 .../spark/util/random/XORShiftRandomSuite.scala | 15 ++
 .../MultilayerPerceptronClassifierSuite.scala   |  5 +-
 .../apache/spark/ml/feature/Word2VecSuite.scala | 16 --
 .../mllib/clustering/StreamingKMeansSuite.scala | 13 +++--
 python/pyspark/ml/feature.py| 20 
 python/pyspark/ml/recommendation.py |  6 +--
 python/pyspark/mllib/recommendation.py  |  4 +-
 python/pyspark/sql/dataframe.py |  6 +--
 .../sql/catalyst/expressions/RandomSuite.scala  |  8 +--
 .../apache/spark/sql/JavaDataFrameSuite.java|  6 ++-
 .../apache/spark/sql/DataFrameStatSuite.scala   |  4 +-
 15 files changed, 128 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 816315b..92cff1f 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -875,9 +875,9 @@ test_that("column binary mathfunctions", {
   expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
   expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
   expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
-  expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01)
+  expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
   expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
-  expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01)
+  expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
 })
 
 test_that("string operators", {
@@ -1458,8 +1458,8 @@ test_that("sampleBy() on a DataFrame", {
   fractions <- list("0" = 0.1, "1" = 0.2)
   sample <- sampleBy(df, "key", fractions, 0)
   result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
-  expect_identical(as.list(result[1, ]), list(key = "0", count = 2))
-  expect_identical(as.list(result[2, ]), list(key = "1", count = 10))
+  expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
+  expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
 })
 
 test_that("SQL error message is returned from JVM", {

http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala 
b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
index 85fb923..e8cdb6e 100644
--- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
@@ -60,9 +60,11 @@ private[spark] class XORShiftRandom(init: Long) extends 
JavaRandom(init) {
 private[spark] object XORShiftRandom {
 
   /** Hash seeds to have 0/1 bits throughout. */
-  private def hashSeed(seed: Long): Long = {
+  private[random] def hashSeed(seed: Long): Long = {
 val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array()
-MurmurHash3.bytesHash(bytes)
+val lowBits = MurmurHash3.bytesHash(bytes)
+val highBits = MurmurHash3.bytesHash(bytes, lowBits)
+(highBits.toLong << 32) | (lowBits.toLong & 0xL)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/core/src/test/java/org/apache/spark/JavaAPISuite.java
--
diff --git

spark git commit: [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e86499954 -> 7755b50b4


[SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits

https://issues.apache.org/jira/browse/SPARK-10116

This is really trivial, just happened to notice it -- if 
`XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as 
the comment implies), it needs to do something for the conversion to `long`.

mengxr mkolod

Author: Imran Rashid 

Closes #8314 from squito/SPARK-10116.

(cherry picked from commit 49f1a820372d1cba41f3f00d07eb5728f2ed6705)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7755b50b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7755b50b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7755b50b

Branch: refs/heads/branch-1.6
Commit: 7755b50b4352432c1ab9d600dd4f4b52727bfa40
Parents: e864999
Author: Imran Rashid 
Authored: Fri Nov 6 20:06:24 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 20:06:34 2015 +

--
 R/pkg/inst/tests/test_sparkSQL.R|  8 +--
 .../spark/util/random/XORShiftRandom.scala  |  6 ++-
 .../java/org/apache/spark/JavaAPISuite.java | 20 +---
 .../spark/rdd/PairRDDFunctionsSuite.scala   | 52 ++--
 .../spark/util/random/XORShiftRandomSuite.scala | 15 ++
 .../MultilayerPerceptronClassifierSuite.scala   |  5 +-
 .../apache/spark/ml/feature/Word2VecSuite.scala | 16 --
 .../mllib/clustering/StreamingKMeansSuite.scala | 13 +++--
 python/pyspark/ml/feature.py| 20 
 python/pyspark/ml/recommendation.py |  6 +--
 python/pyspark/mllib/recommendation.py  |  4 +-
 python/pyspark/sql/dataframe.py |  6 +--
 .../sql/catalyst/expressions/RandomSuite.scala  |  8 +--
 .../apache/spark/sql/JavaDataFrameSuite.java|  6 ++-
 .../apache/spark/sql/DataFrameStatSuite.scala   |  4 +-
 15 files changed, 128 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/R/pkg/inst/tests/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 816315b..92cff1f 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -875,9 +875,9 @@ test_that("column binary mathfunctions", {
   expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4)
   expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4)
   expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric")
-  expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01)
+  expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01)
   expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric")
-  expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01)
+  expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01)
 })
 
 test_that("string operators", {
@@ -1458,8 +1458,8 @@ test_that("sampleBy() on a DataFrame", {
   fractions <- list("0" = 0.1, "1" = 0.2)
   sample <- sampleBy(df, "key", fractions, 0)
   result <- collect(orderBy(count(groupBy(sample, "key")), "key"))
-  expect_identical(as.list(result[1, ]), list(key = "0", count = 2))
-  expect_identical(as.list(result[2, ]), list(key = "1", count = 10))
+  expect_identical(as.list(result[1, ]), list(key = "0", count = 3))
+  expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
 })
 
 test_that("SQL error message is returned from JVM", {

http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala 
b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
index 85fb923..e8cdb6e 100644
--- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala
@@ -60,9 +60,11 @@ private[spark] class XORShiftRandom(init: Long) extends 
JavaRandom(init) {
 private[spark] object XORShiftRandom {
 
   /** Hash seeds to have 0/1 bits throughout. */
-  private def hashSeed(seed: Long): Long = {
+  private[random] def hashSeed(seed: Long): Long = {
 val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array()
-MurmurHash3.bytesHash(bytes)
+val lowBits = MurmurHash3.bytesHash(bytes)
+val highBits = MurmurHash3.bytesHash(bytes, lowBits)
+(highBits.toLong << 32) | (lowBits.toLong & 0xL)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/core/src/test/java/org/apache/s

spark git commit: Typo fixes + code readability improvements

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 8211aab07 -> 62bb29077


Typo fixes + code readability improvements

Author: Jacek Laskowski 

Closes #9501 from jaceklaskowski/typos-with-style.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62bb2907
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62bb2907
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62bb2907

Branch: refs/heads/master
Commit: 62bb290773c9f9fa53cbe6d4eedc6e153761a763
Parents: 8211aab
Author: Jacek Laskowski 
Authored: Fri Nov 6 20:05:18 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 20:05:18 2015 +

--
 .../main/scala/org/apache/spark/rdd/HadoopRDD.scala   | 14 ++
 .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +---
 .../org/apache/spark/scheduler/ShuffleMapTask.scala   | 10 +-
 .../scala/org/apache/spark/scheduler/TaskSet.scala|  2 +-
 4 files changed, 21 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62bb2907/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index d841f05..0453614 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -88,8 +88,8 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, s: 
InputSplit)
  *
  * @param sc The SparkContext to associate the RDD with.
  * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. 
If the enclosed
- * variabe references an instance of JobConf, then that JobConf will be 
used for the Hadoop job.
- * Otherwise, a new JobConf will be created on each slave using the 
enclosed Configuration.
+ *   variable references an instance of JobConf, then that JobConf will be 
used for the Hadoop job.
+ *   Otherwise, a new JobConf will be created on each slave using the enclosed 
Configuration.
  * @param initLocalJobConfFuncOpt Optional closure used to initialize any 
JobConf that HadoopRDD
  * creates.
  * @param inputFormatClass Storage format of the data to be read.
@@ -123,7 +123,7 @@ class HadoopRDD[K, V](
   sc,
   sc.broadcast(new SerializableConfiguration(conf))
 .asInstanceOf[Broadcast[SerializableConfiguration]],
-  None /* initLocalJobConfFuncOpt */,
+  initLocalJobConfFuncOpt = None,
   inputFormatClass,
   keyClass,
   valueClass,
@@ -184,8 +184,9 @@ class HadoopRDD[K, V](
   protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
 val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
   .asInstanceOf[InputFormat[K, V]]
-if (newInputFormat.isInstanceOf[Configurable]) {
-  newInputFormat.asInstanceOf[Configurable].setConf(conf)
+newInputFormat match {
+  case c: Configurable => c.setConf(conf)
+  case _ =>
 }
 newInputFormat
   }
@@ -195,9 +196,6 @@ class HadoopRDD[K, V](
 // add the credentials here as this can be called before SparkContext 
initialized
 SparkHadoopUtil.get.addCredentials(jobConf)
 val inputFormat = getInputFormat(jobConf)
-if (inputFormat.isInstanceOf[Configurable]) {
-  inputFormat.asInstanceOf[Configurable].setConf(jobConf)
-}
 val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
 val array = new Array[Partition](inputSplits.size)
 for (i <- 0 until inputSplits.size) {

http://git-wip-us.apache.org/repos/asf/spark/blob/62bb2907/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a1f0fd0..4a9518f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -541,8 +541,7 @@ class DAGScheduler(
   }
 
   /**
-   * Submit an action job to the scheduler and get a JobWaiter object back. 
The JobWaiter object
-   * can be used to block until the the job finishes executing or can be used 
to cancel the job.
+   * Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
@@ -551,6 +550,11 @@ class DAGScheduler(
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair 
scheduler pool name
+   

[3/4] spark git commit: [SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark various dialects as private.

2015-11-06 Thread rxin
[SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark 
various dialects as private.

Author: Reynold Xin 

Closes #9511 from rxin/SPARK-11541.

(cherry picked from commit bc5d6c03893a9bd340d6b94d3550e25648412241)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/641cdfd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/641cdfd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/641cdfd8

Branch: refs/heads/branch-1.6
Commit: 641cdfd869cf6096274f55adcccdf01caa9752b5
Parents: a015f81
Author: Reynold Xin 
Authored: Thu Nov 5 22:03:26 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 11:57:43 2015 -0800

--
 project/MimaExcludes.scala  |  19 +-
 .../org/apache/spark/sql/GroupedData.scala  |   2 +-
 .../spark/sql/jdbc/AggregatedDialect.scala  |  44 +
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala  |  32 
 .../apache/spark/sql/jdbc/DerbyDialect.scala|  44 +
 .../apache/spark/sql/jdbc/JdbcDialects.scala| 190 +--
 .../spark/sql/jdbc/MsSqlServerDialect.scala |  41 
 .../apache/spark/sql/jdbc/MySQLDialect.scala|  48 +
 .../apache/spark/sql/jdbc/OracleDialect.scala   |  45 +
 .../apache/spark/sql/jdbc/PostgresDialect.scala |  54 ++
 10 files changed, 332 insertions(+), 187 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/641cdfd8/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 40f5c9f..dacef91 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -116,7 +116,24 @@ object MimaExcludes {
   "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
   ) ++ Seq(
 // SPARK-11485
-
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df")
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),
+// SPARK-11541 mark various JDBC dialects as private
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productElement"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productArity"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.canEqual"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productIterator"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productPrefix"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.toString"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.hashCode"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.PostgresDialect$"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productElement"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productArity"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.canEqual"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productIterator"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productPrefix"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.toString"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.hashCode"),
+
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$")
   )
 case v if v.startsWith("1.5") =>
   Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/641cdfd8/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index 7cf66b6..f9eab5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType
 class GroupedData protected[sql](
 df: DataFrame,
 groupingExprs: Seq[Expression],
-private val groupType: GroupedData.GroupType) {
+groupType: GroupedData.GroupType) {
 
   private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
 val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColum

[1/4] spark git commit: [SPARK-11538][BUILD] Force guava 14 in sbt build.

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9d6238859 -> e86499954


[SPARK-11538][BUILD] Force guava 14 in sbt build.

sbt's version resolution code always picks the most recent version, and we
don't want that for guava.

Author: Marcelo Vanzin 

Closes #9508 from vanzin/SPARK-11538.

(cherry picked from commit 5e31db70bb783656ba042863fcd3c223e17a8f81)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/051b2ca3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/051b2ca3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/051b2ca3

Branch: refs/heads/branch-1.6
Commit: 051b2ca3a0aa18c5d805cbd183bca504865297c4
Parents: 9d62388
Author: Marcelo Vanzin 
Authored: Thu Nov 5 18:05:58 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 11:57:29 2015 -0800

--
 project/SparkBuild.scala | 11 ++-
 1 file changed, 10 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/051b2ca3/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 75c3693..b75ed13 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -207,7 +207,8 @@ object SparkBuild extends PomBuild {
   // Note ordering of these settings matter.
   /* Enable shared settings on all projects */
   (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, 
tools))
-.foreach(enable(sharedSettings ++ ExcludedDependencies.settings ++ 
Revolver.settings))
+.foreach(enable(sharedSettings ++ DependencyOverrides.settings ++
+  ExcludedDependencies.settings ++ Revolver.settings))
 
   /* Enable tests settings for all projects except examples, assembly and 
tools */
   (allProjects ++ 
optionallyEnabledProjects).foreach(enable(TestSettings.settings))
@@ -292,6 +293,14 @@ object Flume {
 }
 
 /**
+ * Overrides to work around sbt's dependency resolution being different from 
Maven's.
+ */
+object DependencyOverrides {
+  lazy val settings = Seq(
+dependencyOverrides += "com.google.guava" % "guava" % "14.0.1")
+}
+
+/**
   This excludes library dependencies in sbt, which are specified in maven but 
are
   not needed by sbt build.
   */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from checkpoint

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 089ba81d1 -> 9d6238859


[SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from 
checkpoint

Currently Yarn AM proxy filter configuration is recovered from checkpoint file 
when Spark Streaming application is restarted, which will lead to some unwanted 
behaviors:

1. Wrong RM address if RM is redeployed from failure.
2. Wrong proxyBase, since app id is updated, old app id for proxyBase is wrong.

So instead of recovering from checkpoint file, these configurations should be 
reloaded each time when app started.

This problem only exists in Yarn cluster mode, for Yarn client mode, these 
configurations will be updated with RPC message `AddWebUIFilter`.

Please help to review tdas harishreedharan vanzin , thanks a lot.

Author: jerryshao 

Closes #9412 from jerryshao/SPARK-11457.

(cherry picked from commit 468ad0ae874d5cf55712ee976faf77f19c937ccb)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d623885
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d623885
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d623885

Branch: refs/heads/branch-1.6
Commit: 9d6238859e98651f32b9dd733a83c0e6f45978a7
Parents: 089ba81
Author: jerryshao 
Authored: Thu Nov 5 18:03:12 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 11:57:16 2015 -0800

--
 .../scala/org/apache/spark/streaming/Checkpoint.scala  | 13 -
 1 file changed, 12 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9d623885/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b7de6dd..0cd55d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -55,7 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
   "spark.driver.port",
   "spark.master",
   "spark.yarn.keytab",
-  "spark.yarn.principal")
+  "spark.yarn.principal",
+  "spark.ui.filters")
 
 val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
   .remove("spark.driver.host")
@@ -66,6 +67,16 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
 newSparkConf.set(prop, value)
   }
 }
+
+// Add Yarn proxy filter specific configurations to the recovered SparkConf
+val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
+val filterPrefix = s"spark.$filter.param."
+newReloadConf.getAll.foreach { case (k, v) =>
+  if (k.startsWith(filterPrefix) && k.length > filterPrefix.length) {
+newSparkConf.set(k, v)
+  }
+}
+
 newSparkConf
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/4] spark git commit: [SPARK-11540][SQL] API audit for QueryExecutionListener.

2015-11-06 Thread rxin
[SPARK-11540][SQL] API audit for QueryExecutionListener.

Author: Reynold Xin 

Closes #9509 from rxin/SPARK-11540.

(cherry picked from commit 3cc2c053b5d68c747a30bd58cf388b87b1922f13)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a015f814
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a015f814
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a015f814

Branch: refs/heads/branch-1.6
Commit: a015f8141f4123d0389594e890d25b1a0f0adc71
Parents: 051b2ca
Author: Reynold Xin 
Authored: Thu Nov 5 18:12:54 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 11:57:36 2015 -0800

--
 .../spark/sql/execution/QueryExecution.scala|  30 +++---
 .../spark/sql/util/QueryExecutionListener.scala | 101 ++-
 2 files changed, 72 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a015f814/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index fc91745..c2142d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import com.google.common.annotations.VisibleForTesting
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
@@ -25,31 +27,33 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 /**
  * The primary workflow for executing relational queries using Spark.  
Designed to allow easy
  * access to the intermediate phases of query execution for developers.
+ *
+ * While this is not a public class, we should avoid changing the function 
names for the sake of
+ * changing them, because a lot of developers use the feature for debugging.
  */
 class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
-  val analyzer = sqlContext.analyzer
-  val optimizer = sqlContext.optimizer
-  val planner = sqlContext.planner
-  val cacheManager = sqlContext.cacheManager
-  val prepareForExecution = sqlContext.prepareForExecution
 
-  def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+  @VisibleForTesting
+  def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)
+
+  lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
 
-  lazy val analyzed: LogicalPlan = analyzer.execute(logical)
   lazy val withCachedData: LogicalPlan = {
 assertAnalyzed()
-cacheManager.useCachedData(analyzed)
+sqlContext.cacheManager.useCachedData(analyzed)
   }
-  lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+  lazy val optimizedPlan: LogicalPlan = 
sqlContext.optimizer.execute(withCachedData)
 
   // TODO: Don't just pick the first one...
   lazy val sparkPlan: SparkPlan = {
 SparkPlan.currentContext.set(sqlContext)
-planner.plan(optimizedPlan).next()
+sqlContext.planner.plan(optimizedPlan).next()
   }
+
   // executedPlan should not be used to initialize any SparkPlan. It should be
   // only used for execution.
-  lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+  lazy val executedPlan: SparkPlan = 
sqlContext.prepareForExecution.execute(sparkPlan)
 
   /** Internal version of the RDD. Avoids copies and has no schema */
   lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
@@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
   protected def stringOrError[A](f: => A): String =
 try f.toString catch { case e: Throwable => e.toString }
 
-  def simpleString: String =
+  def simpleString: String = {
 s"""== Physical Plan ==
|${stringOrError(executedPlan)}
   """.stripMargin.trim
-
+  }
 
   override def toString: String = {
 def output =

http://git-wip-us.apache.org/repos/asf/spark/blob/a015f814/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 909a8ab..ac432e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -19,36 +19,38 @@ package org.apache.spark.sql.util
 
 import java.util.concurrent.locks.ReentrantReadWriteLock

[4/4] spark git commit: [SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit

2015-11-06 Thread rxin
[SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit

A cleanup for https://github.com/apache/spark/pull/9085.

The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them.
Also added low level unit test at `SqlParserSuite`

Author: Wenchen Fan 

Closes #9482 from cloud-fan/parser.

(cherry picked from commit 253e87e8ab8717ffef40a6d0d376b1add155ef90)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8649995
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8649995
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8649995

Branch: refs/heads/branch-1.6
Commit: e8649995418fb9c3a172181f0d0fa6d49ba2f674
Parents: 641cdfd
Author: Wenchen Fan 
Authored: Fri Nov 6 06:38:49 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 11:57:50 2015 -0800

--
 .../sql/catalyst/AbstractSparkSQLParser.scala   | 23 +---
 .../apache/spark/sql/catalyst/SqlParser.scala   | 20 -
 .../spark/sql/catalyst/SqlParserSuite.scala | 21 ++
 3 files changed, 35 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8649995/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
index 04ac4f2..bdc52c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
@@ -78,10 +78,6 @@ private[sql] abstract class AbstractSparkSQLParser
 }
 
 class SqlLexical extends StdLexical {
-  case class FloatLit(chars: String) extends Token {
-override def toString: String = chars
-  }
-
   case class DecimalLit(chars: String) extends Token {
 override def toString: String = chars
   }
@@ -106,17 +102,16 @@ class SqlLexical extends StdLexical {
   }
 
   override lazy val token: Parser[Token] =
-( rep1(digit) ~ ('.' ~> digit.*).? ~ (exp ~> sign.? ~ rep1(digit)) ^^ {
-case i ~ None ~ (sig ~ rest) =>
-  DecimalLit(i.mkString + "e" + sig.mkString + rest.mkString)
-case i ~ Some(d) ~ (sig ~ rest) =>
-  DecimalLit(i.mkString + "." + d.mkString + "e" + sig.mkString + 
rest.mkString)
-  }
+( rep1(digit) ~ scientificNotation ^^ { case i ~ s => 
DecimalLit(i.mkString + s) }
+| '.' ~> (rep1(digit) ~ scientificNotation) ^^
+  { case i ~ s => DecimalLit("0." + i.mkString + s) }
+| rep1(digit) ~ ('.' ~> digit.*) ~ scientificNotation ^^
+  { case i1 ~ i2 ~ s => DecimalLit(i1.mkString + "." + i2.mkString + s) }
 | digit.* ~ identChar ~ (identChar | digit).* ^^
   { case first ~ middle ~ rest => processIdent((first ++ (middle :: 
rest)).mkString) }
 | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
 case i ~ None => NumericLit(i.mkString)
-case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+case i ~ Some(d) => DecimalLit(i.mkString + "." + d.mkString)
   }
 | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
   { case chars => StringLit(chars mkString "") }
@@ -133,8 +128,10 @@ class SqlLexical extends StdLexical {
 
   override def identChar: Parser[Elem] = letter | elem('_')
 
-  private lazy val sign: Parser[Elem] = elem("s", c => c == '+' || c == '-')
-  private lazy val exp: Parser[Elem] = elem("e", c => c == 'E' || c == 'e')
+  private lazy val scientificNotation: Parser[String] =
+(elem('e') | elem('E')) ~> (elem('+') | elem('-')).? ~ rep1(digit) ^^ {
+  case s ~ rest => "e" + s.mkString + rest.mkString
+}
 
   override def whitespace: Parser[Any] =
 ( whitespaceChar

http://git-wip-us.apache.org/repos/asf/spark/blob/e8649995/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 440e9e2..cd717c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -334,27 +334,15 @@ object SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
 
   protected lazy val numericLiteral: Parser[Literal] =
 ( integral  ^^ { case i => Literal(toNarrowestIntegerType(i)) }
-| sign.? ~ unsignedFloat ^^ {
-  case s ~ f => Literal(toDecimalOrDouble(s.getOrElse("") + f))
-}
-| sign.? ~ unsignedDecimal ^^ {

spark git commit: [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up)

2015-11-06 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fba48e733 -> 089ba81d1


[SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of 
post-shuffle partitions for aggregates and joins (follow-up)

https://issues.apache.org/jira/browse/SPARK-9858

This PR is the follow-up work of https://github.com/apache/spark/pull/9276. It 
addresses JoshRosen's comments.

Author: Yin Huai 

Closes #9453 from yhuai/numReducer-followUp.

(cherry picked from commit 8211aab0793cf64202b99be4f31bb8a9ae77050d)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/089ba81d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/089ba81d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/089ba81d

Branch: refs/heads/branch-1.6
Commit: 089ba81d10630dfb046a71e32c1d1804511f050f
Parents: fba48e7
Author: Yin Huai 
Authored: Fri Nov 6 11:13:51 2015 -0800
Committer: Yin Huai 
Committed: Fri Nov 6 11:13:58 2015 -0800

--
 .../catalyst/plans/physical/partitioning.scala  |   8 -
 .../apache/spark/sql/execution/Exchange.scala   |  40 +++--
 .../sql/execution/ExchangeCoordinator.scala |  31 ++--
 .../org/apache/spark/sql/CachedTableSuite.scala | 150 ++-
 4 files changed, 167 insertions(+), 62 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/089ba81d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 9312c81..86b9417 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -165,11 +165,6 @@ sealed trait Partitioning {
* produced by `A` could have also been produced by `B`.
*/
   def guarantees(other: Partitioning): Boolean = this == other
-
-  def withNumPartitions(newNumPartitions: Int): Partitioning = {
-throw new IllegalStateException(
-  s"It is not allowed to call withNumPartitions method of a 
${this.getClass.getSimpleName}")
-  }
 }
 
 object Partitioning {
@@ -254,9 +249,6 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
 case _ => false
   }
 
-  override def withNumPartitions(newNumPartitions: Int): HashPartitioning = {
-HashPartitioning(expressions, newNumPartitions)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/089ba81d/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 0f72ec6..a4ce328 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -242,7 +242,7 @@ case class Exchange(
 // update the number of post-shuffle partitions.
 specifiedPartitionStartIndices.foreach { indices =>
   assert(newPartitioning.isInstanceOf[HashPartitioning])
-  newPartitioning = newPartitioning.withNumPartitions(indices.length)
+  newPartitioning = UnknownPartitioning(indices.length)
 }
 new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
   }
@@ -262,7 +262,7 @@ case class Exchange(
 
 object Exchange {
   def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = {
-Exchange(newPartitioning, child, None: Option[ExchangeCoordinator])
+Exchange(newPartitioning, child, coordinator = None: 
Option[ExchangeCoordinator])
   }
 }
 
@@ -315,7 +315,7 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
 child.outputPartitioning match {
   case hash: HashPartitioning => true
   case collection: PartitioningCollection =>
-
collection.partitionings.exists(_.isInstanceOf[HashPartitioning])
+
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
   case _ => false
 }
 }
@@ -416,28 +416,48 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
   // First check if the existing partitions of the children all match. 
This means they are
   // partitioned by the same partitioning into the same number of 
partitions. In that case,
   // don't try to make them match `defaultPartitions`, just use the 
existing partitioning.
-  // TODO: this s

spark git commit: [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up)

2015-11-06 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master c048929c6 -> 8211aab07


[SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of 
post-shuffle partitions for aggregates and joins (follow-up)

https://issues.apache.org/jira/browse/SPARK-9858

This PR is the follow-up work of https://github.com/apache/spark/pull/9276. It 
addresses JoshRosen's comments.

Author: Yin Huai 

Closes #9453 from yhuai/numReducer-followUp.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8211aab0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8211aab0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8211aab0

Branch: refs/heads/master
Commit: 8211aab0793cf64202b99be4f31bb8a9ae77050d
Parents: c048929
Author: Yin Huai 
Authored: Fri Nov 6 11:13:51 2015 -0800
Committer: Yin Huai 
Committed: Fri Nov 6 11:13:51 2015 -0800

--
 .../catalyst/plans/physical/partitioning.scala  |   8 -
 .../apache/spark/sql/execution/Exchange.scala   |  40 +++--
 .../sql/execution/ExchangeCoordinator.scala |  31 ++--
 .../org/apache/spark/sql/CachedTableSuite.scala | 150 ++-
 4 files changed, 167 insertions(+), 62 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8211aab0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 9312c81..86b9417 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -165,11 +165,6 @@ sealed trait Partitioning {
* produced by `A` could have also been produced by `B`.
*/
   def guarantees(other: Partitioning): Boolean = this == other
-
-  def withNumPartitions(newNumPartitions: Int): Partitioning = {
-throw new IllegalStateException(
-  s"It is not allowed to call withNumPartitions method of a 
${this.getClass.getSimpleName}")
-  }
 }
 
 object Partitioning {
@@ -254,9 +249,6 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
 case _ => false
   }
 
-  override def withNumPartitions(newNumPartitions: Int): HashPartitioning = {
-HashPartitioning(expressions, newNumPartitions)
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8211aab0/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 0f72ec6..a4ce328 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -242,7 +242,7 @@ case class Exchange(
 // update the number of post-shuffle partitions.
 specifiedPartitionStartIndices.foreach { indices =>
   assert(newPartitioning.isInstanceOf[HashPartitioning])
-  newPartitioning = newPartitioning.withNumPartitions(indices.length)
+  newPartitioning = UnknownPartitioning(indices.length)
 }
 new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
   }
@@ -262,7 +262,7 @@ case class Exchange(
 
 object Exchange {
   def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = {
-Exchange(newPartitioning, child, None: Option[ExchangeCoordinator])
+Exchange(newPartitioning, child, coordinator = None: 
Option[ExchangeCoordinator])
   }
 }
 
@@ -315,7 +315,7 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
 child.outputPartitioning match {
   case hash: HashPartitioning => true
   case collection: PartitioningCollection =>
-
collection.partitionings.exists(_.isInstanceOf[HashPartitioning])
+
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
   case _ => false
 }
 }
@@ -416,28 +416,48 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
   // First check if the existing partitions of the children all match. 
This means they are
   // partitioned by the same partitioning into the same number of 
partitions. In that case,
   // don't try to make them match `defaultPartitions`, just use the 
existing partitioning.
-  // TODO: this should be a cost based decision. For example, a big 
relation should probably
-  // maintain its exi

spark git commit: [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399

2015-11-06 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 574141a29 -> c048929c6


[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399

This PR adds test cases that test various column pruning and filter push-down 
cases.

Author: Cheng Lian 

Closes #9468 from liancheng/spark-10978.follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c048929c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c048929c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c048929c

Branch: refs/heads/master
Commit: c048929c6a9f7ce57f384037cd6c0bf5751c447a
Parents: 574141a
Author: Cheng Lian 
Authored: Fri Nov 6 11:11:36 2015 -0800
Committer: Yin Huai 
Committed: Fri Nov 6 11:11:36 2015 -0800

--
 .../spark/sql/sources/FilteredScanSuite.scala   |  21 +-
 .../SimpleTextHadoopFsRelationSuite.scala   | 335 +--
 .../spark/sql/sources/SimpleTextRelation.scala  |  11 +
 3 files changed, 321 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c048929c/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 7541e72..2cad964 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -17,16 +17,15 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
 import scala.language.existentials
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
-
+import org.apache.spark.unsafe.types.UTF8String
 
 class FilteredScanSource extends RelationProvider {
   override def createRelation(
@@ -130,7 +129,7 @@ object ColumnsRequired {
   var set: Set[String] = Set.empty
 }
 
-class FilteredScanSuite extends DataSourceTest with SharedSQLContext {
+class FilteredScanSuite extends DataSourceTest with SharedSQLContext with 
PredicateHelper {
   protected override lazy val sql = caseInsensitiveContext.sql _
 
   override def beforeAll(): Unit = {
@@ -144,9 +143,6 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext {
 |  to '10'
 |)
   """.stripMargin)
-
-// UDF for testing filter push-down
-caseInsensitiveContext.udf.register("udf_gt3", (_: Int) > 3)
   }
 
   sqlTest(
@@ -276,14 +272,15 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext {
   testPushDown("SELECT c FROM oneToTenFiltered WHERE c = 'aA'", 1, 
Set("c"))
   testPushDown("SELECT c FROM oneToTenFiltered WHERE c IN ('aA', 
'foo')", 1, Set("c"))
 
-  // Columns only referenced by UDF filter must be required, as UDF filters 
can't be pushed down.
-  testPushDown("SELECT c FROM oneToTenFiltered WHERE udf_gt3(A)", 10, Set("a", 
"c"))
+  // Filters referencing multiple columns are not convertible, all referenced 
columns must be
+  // required.
+  testPushDown("SELECT c FROM oneToTenFiltered WHERE A + b > 9", 10, Set("a", 
"b", "c"))
 
-  // A query with an unconvertible filter, an unhandled filter, and a handled 
filter.
+  // A query with an inconvertible filter, an unhandled filter, and a handled 
filter.
   testPushDown(
 """SELECT a
   |  FROM oneToTenFiltered
-  | WHERE udf_gt3(b)
+  | WHERE a + b > 9
   |   AND b < 16
   |   AND c IN ('bB', 'cC', 'dD', 'foo')
 """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))

http://git-wip-us.apache.org/repos/asf/spark/blob/c048929c/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index d945408..9251a69 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -17,15 +17,21 @@
 
 package org.apache.spark.sql.sources
 
+import java.io.File
+
 import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org

spark git commit: [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399

2015-11-06 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d69bc9e47 -> fba48e733


[SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399

This PR adds test cases that test various column pruning and filter push-down 
cases.

Author: Cheng Lian 

Closes #9468 from liancheng/spark-10978.follow-up.

(cherry picked from commit c048929c6a9f7ce57f384037cd6c0bf5751c447a)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba48e73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba48e73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba48e73

Branch: refs/heads/branch-1.6
Commit: fba48e7332b58169bda215c2a261ae9195c67744
Parents: d69bc9e
Author: Cheng Lian 
Authored: Fri Nov 6 11:11:36 2015 -0800
Committer: Yin Huai 
Committed: Fri Nov 6 11:11:45 2015 -0800

--
 .../spark/sql/sources/FilteredScanSuite.scala   |  21 +-
 .../SimpleTextHadoopFsRelationSuite.scala   | 335 +--
 .../spark/sql/sources/SimpleTextRelation.scala  |  11 +
 3 files changed, 321 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fba48e73/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 7541e72..2cad964 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -17,16 +17,15 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
 import scala.language.existentials
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
-
+import org.apache.spark.unsafe.types.UTF8String
 
 class FilteredScanSource extends RelationProvider {
   override def createRelation(
@@ -130,7 +129,7 @@ object ColumnsRequired {
   var set: Set[String] = Set.empty
 }
 
-class FilteredScanSuite extends DataSourceTest with SharedSQLContext {
+class FilteredScanSuite extends DataSourceTest with SharedSQLContext with 
PredicateHelper {
   protected override lazy val sql = caseInsensitiveContext.sql _
 
   override def beforeAll(): Unit = {
@@ -144,9 +143,6 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext {
 |  to '10'
 |)
   """.stripMargin)
-
-// UDF for testing filter push-down
-caseInsensitiveContext.udf.register("udf_gt3", (_: Int) > 3)
   }
 
   sqlTest(
@@ -276,14 +272,15 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext {
   testPushDown("SELECT c FROM oneToTenFiltered WHERE c = 'aA'", 1, 
Set("c"))
   testPushDown("SELECT c FROM oneToTenFiltered WHERE c IN ('aA', 
'foo')", 1, Set("c"))
 
-  // Columns only referenced by UDF filter must be required, as UDF filters 
can't be pushed down.
-  testPushDown("SELECT c FROM oneToTenFiltered WHERE udf_gt3(A)", 10, Set("a", 
"c"))
+  // Filters referencing multiple columns are not convertible, all referenced 
columns must be
+  // required.
+  testPushDown("SELECT c FROM oneToTenFiltered WHERE A + b > 9", 10, Set("a", 
"b", "c"))
 
-  // A query with an unconvertible filter, an unhandled filter, and a handled 
filter.
+  // A query with an inconvertible filter, an unhandled filter, and a handled 
filter.
   testPushDown(
 """SELECT a
   |  FROM oneToTenFiltered
-  | WHERE udf_gt3(b)
+  | WHERE a + b > 9
   |   AND b < 16
   |   AND c IN ('bB', 'cC', 'dD', 'foo')
 """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b"))

http://git-wip-us.apache.org/repos/asf/spark/blob/fba48e73/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
index d945408..9251a69 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -17,15 +17,21 @@
 
 package org.apache.spark.sql.sources
 
+import java.io.File
+
 impo

spark git commit: [SPARK-9162] [SQL] Implement code generation for ScalaUDF

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 0a430f04e -> d69bc9e47


[SPARK-9162] [SQL] Implement code generation for ScalaUDF

JIRA: https://issues.apache.org/jira/browse/SPARK-9162

Currently ScalaUDF extends CodegenFallback and doesn't provide code generation 
implementation. This path implements code generation for ScalaUDF.

Author: Liang-Chi Hsieh 

Closes #9270 from viirya/scalaudf-codegen.

(cherry picked from commit 574141a29835ce78d68c97bb54336cf4fd3c39d3)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d69bc9e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d69bc9e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d69bc9e4

Branch: refs/heads/branch-1.6
Commit: d69bc9e4754d9638ca525a0880118bcf5ab2ad05
Parents: 0a430f0
Author: Liang-Chi Hsieh 
Authored: Fri Nov 6 10:52:04 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 10:52:14 2015 -0800

--
 .../sql/catalyst/expressions/ScalaUDF.scala | 85 +++-
 .../scala/org/apache/spark/sql/UDFSuite.scala   | 41 ++
 2 files changed, 124 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d69bc9e4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 11c7950..3388cc2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -31,7 +31,7 @@ case class ScalaUDF(
 dataType: DataType,
 children: Seq[Expression],
 inputTypes: Seq[DataType] = Nil)
-  extends Expression with ImplicitCastInputTypes with CodegenFallback {
+  extends Expression with ImplicitCastInputTypes {
 
   override def nullable: Boolean = true
 
@@ -60,6 +60,10 @@ case class ScalaUDF(
 
   */
 
+  // Accessors used in genCode
+  def userDefinedFunc(): AnyRef = function
+  def getChildren(): Seq[Expression] = children
+
   private[this] val f = children.size match {
 case 0 =>
   val func = function.asInstanceOf[() => Any]
@@ -960,6 +964,83 @@ case class ScalaUDF(
   }
 
   // scalastyle:on
+
+  // Generate codes used to convert the arguments to Scala type for 
user-defined funtions
+  private[this] def genCodeForConverter(ctx: CodeGenContext, index: Int): 
String = {
+val converterClassName = classOf[Any => Any].getName
+val typeConvertersClassName = CatalystTypeConverters.getClass.getName + 
".MODULE$"
+val expressionClassName = classOf[Expression].getName
+val scalaUDFClassName = classOf[ScalaUDF].getName
+
+val converterTerm = ctx.freshName("converter")
+val expressionIdx = ctx.references.size - 1
+ctx.addMutableState(converterClassName, converterTerm,
+  s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
+
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
+  
s"expressions[$expressionIdx]).getChildren().apply($index))).dataType());")
+converterTerm
+  }
+
+  override def genCode(
+  ctx: CodeGenContext,
+  ev: GeneratedExpressionCode): String = {
+
+ctx.references += this
+
+val scalaUDFClassName = classOf[ScalaUDF].getName
+val converterClassName = classOf[Any => Any].getName
+val typeConvertersClassName = CatalystTypeConverters.getClass.getName + 
".MODULE$"
+val expressionClassName = classOf[Expression].getName
+
+// Generate codes used to convert the returned value of user-defined 
functions to Catalyst type
+val catalystConverterTerm = ctx.freshName("catalystConverter")
+val catalystConverterTermIdx = ctx.references.size - 1
+ctx.addMutableState(converterClassName, catalystConverterTerm,
+  s"this.$catalystConverterTerm = 
($converterClassName)$typeConvertersClassName" +
+s".createToCatalystConverter((($scalaUDFClassName)expressions" +
+  s"[$catalystConverterTermIdx]).dataType());")
+
+val resultTerm = ctx.freshName("result")
+
+// This must be called before children expressions' codegen
+// because ctx.references is used in genCodeForConverter
+val converterTerms = (0 until chil

spark git commit: [SPARK-9162] [SQL] Implement code generation for ScalaUDF

2015-11-06 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master cf69ce136 -> 574141a29


[SPARK-9162] [SQL] Implement code generation for ScalaUDF

JIRA: https://issues.apache.org/jira/browse/SPARK-9162

Currently ScalaUDF extends CodegenFallback and doesn't provide code generation 
implementation. This path implements code generation for ScalaUDF.

Author: Liang-Chi Hsieh 

Closes #9270 from viirya/scalaudf-codegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/574141a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/574141a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/574141a2

Branch: refs/heads/master
Commit: 574141a29835ce78d68c97bb54336cf4fd3c39d3
Parents: cf69ce1
Author: Liang-Chi Hsieh 
Authored: Fri Nov 6 10:52:04 2015 -0800
Committer: Davies Liu 
Committed: Fri Nov 6 10:52:04 2015 -0800

--
 .../sql/catalyst/expressions/ScalaUDF.scala | 85 +++-
 .../scala/org/apache/spark/sql/UDFSuite.scala   | 41 ++
 2 files changed, 124 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/574141a2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 11c7950..3388cc2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -31,7 +31,7 @@ case class ScalaUDF(
 dataType: DataType,
 children: Seq[Expression],
 inputTypes: Seq[DataType] = Nil)
-  extends Expression with ImplicitCastInputTypes with CodegenFallback {
+  extends Expression with ImplicitCastInputTypes {
 
   override def nullable: Boolean = true
 
@@ -60,6 +60,10 @@ case class ScalaUDF(
 
   */
 
+  // Accessors used in genCode
+  def userDefinedFunc(): AnyRef = function
+  def getChildren(): Seq[Expression] = children
+
   private[this] val f = children.size match {
 case 0 =>
   val func = function.asInstanceOf[() => Any]
@@ -960,6 +964,83 @@ case class ScalaUDF(
   }
 
   // scalastyle:on
+
+  // Generate codes used to convert the arguments to Scala type for 
user-defined funtions
+  private[this] def genCodeForConverter(ctx: CodeGenContext, index: Int): 
String = {
+val converterClassName = classOf[Any => Any].getName
+val typeConvertersClassName = CatalystTypeConverters.getClass.getName + 
".MODULE$"
+val expressionClassName = classOf[Expression].getName
+val scalaUDFClassName = classOf[ScalaUDF].getName
+
+val converterTerm = ctx.freshName("converter")
+val expressionIdx = ctx.references.size - 1
+ctx.addMutableState(converterClassName, converterTerm,
+  s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
+
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
+  
s"expressions[$expressionIdx]).getChildren().apply($index))).dataType());")
+converterTerm
+  }
+
+  override def genCode(
+  ctx: CodeGenContext,
+  ev: GeneratedExpressionCode): String = {
+
+ctx.references += this
+
+val scalaUDFClassName = classOf[ScalaUDF].getName
+val converterClassName = classOf[Any => Any].getName
+val typeConvertersClassName = CatalystTypeConverters.getClass.getName + 
".MODULE$"
+val expressionClassName = classOf[Expression].getName
+
+// Generate codes used to convert the returned value of user-defined 
functions to Catalyst type
+val catalystConverterTerm = ctx.freshName("catalystConverter")
+val catalystConverterTermIdx = ctx.references.size - 1
+ctx.addMutableState(converterClassName, catalystConverterTerm,
+  s"this.$catalystConverterTerm = 
($converterClassName)$typeConvertersClassName" +
+s".createToCatalystConverter((($scalaUDFClassName)expressions" +
+  s"[$catalystConverterTermIdx]).dataType());")
+
+val resultTerm = ctx.freshName("result")
+
+// This must be called before children expressions' codegen
+// because ctx.references is used in genCodeForConverter
+val converterTerms = (0 until children.size).map(genCodeForConverter(ctx, 
_))
+
+// Initialize user-defined function
+val funcClas

spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 b8b1fbfc8 -> dc058f2ff


[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

Just ignored `InputDStream`s that have null `rememberDuration` in 
`DStreamGraph.getMaxInputStreamRememberDuration`.

Author: Shixiong Zhu 

Closes #9476 from zsxwing/SPARK-11511.

(cherry picked from commit cf69ce136590fea51843bc54f44f0f45c7d0ac36)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc058f2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc058f2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc058f2f

Branch: refs/heads/branch-1.5
Commit: dc058f2fff6ee378aeaccd756a272a5e44be1c6c
Parents: b8b1fbf
Author: Shixiong Zhu 
Authored: Fri Nov 6 14:51:53 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 14:52:21 2015 +

--
 .../org/apache/spark/streaming/DStreamGraph.scala   |  3 ++-
 .../spark/streaming/StreamingContextSuite.scala | 16 
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 40789c6..8138caa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -169,7 +169,8 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
* safe remember duration which can be used to perform cleanup operations.
*/
   def getMaxInputStreamRememberDuration(): Duration = {
-inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
+// If an InputDStream is not used, its `rememberDuration` will be null and 
we can ignore them
+inputStreams.map(_.rememberDuration).filter(_ != 
null).maxBy(_.milliseconds)
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index c7a8771..860fac2 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
   "Please don't use queueStream when checkpointing is enabled."))
   }
 
+  test("Creating an InputDStream but not using it should not crash") {
+ssc = new StreamingContext(master, appName, batchDuration)
+val input1 = addInputStream(ssc)
+val input2 = addInputStream(ssc)
+val output = new TestOutputStream(input2)
+output.register()
+val batchCount = new BatchCounter(ssc)
+ssc.start()
+// Just wait for completing 2 batches to make sure it triggers
+// `DStream.getMaxInputStreamRememberDuration`
+batchCount.waitUntilBatchesCompleted(2, 1)
+// Throw the exception if crash
+ssc.awaitTerminationOrTimeout(1)
+ssc.stop()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
 val input = (1 to 100).map(i => 1 to i)
 val inputStream = new TestInputStream(s, input, 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1cfad7d55 -> 0a430f04e


[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

Just ignored `InputDStream`s that have null `rememberDuration` in 
`DStreamGraph.getMaxInputStreamRememberDuration`.

Author: Shixiong Zhu 

Closes #9476 from zsxwing/SPARK-11511.

(cherry picked from commit cf69ce136590fea51843bc54f44f0f45c7d0ac36)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a430f04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a430f04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a430f04

Branch: refs/heads/branch-1.6
Commit: 0a430f04eef3445fb0095adc806d91759eea5d32
Parents: 1cfad7d
Author: Shixiong Zhu 
Authored: Fri Nov 6 14:51:53 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 14:52:08 2015 +

--
 .../org/apache/spark/streaming/DStreamGraph.scala   |  3 ++-
 .../spark/streaming/StreamingContextSuite.scala | 16 
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a430f04/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 1b0b789..7829f5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
* safe remember duration which can be used to perform cleanup operations.
*/
   def getMaxInputStreamRememberDuration(): Duration = {
-inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
+// If an InputDStream is not used, its `rememberDuration` will be null and 
we can ignore them
+inputStreams.map(_.rememberDuration).filter(_ != 
null).maxBy(_.milliseconds)
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/0a430f04/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index c7a8771..860fac2 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
   "Please don't use queueStream when checkpointing is enabled."))
   }
 
+  test("Creating an InputDStream but not using it should not crash") {
+ssc = new StreamingContext(master, appName, batchDuration)
+val input1 = addInputStream(ssc)
+val input2 = addInputStream(ssc)
+val output = new TestOutputStream(input2)
+output.register()
+val batchCount = new BatchCounter(ssc)
+ssc.start()
+// Just wait for completing 2 batches to make sure it triggers
+// `DStream.getMaxInputStreamRememberDuration`
+batchCount.waitUntilBatchesCompleted(2, 1)
+// Throw the exception if crash
+ssc.awaitTerminationOrTimeout(1)
+ssc.stop()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
 val input = (1 to 100).map(i => 1 to i)
 val inputStream = new TestInputStream(s, input, 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

2015-11-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 253e87e8a -> cf69ce136


[SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used

Just ignored `InputDStream`s that have null `rememberDuration` in 
`DStreamGraph.getMaxInputStreamRememberDuration`.

Author: Shixiong Zhu 

Closes #9476 from zsxwing/SPARK-11511.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf69ce13
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf69ce13
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf69ce13

Branch: refs/heads/master
Commit: cf69ce136590fea51843bc54f44f0f45c7d0ac36
Parents: 253e87e
Author: Shixiong Zhu 
Authored: Fri Nov 6 14:51:53 2015 +
Committer: Sean Owen 
Committed: Fri Nov 6 14:51:53 2015 +

--
 .../org/apache/spark/streaming/DStreamGraph.scala   |  3 ++-
 .../spark/streaming/StreamingContextSuite.scala | 16 
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cf69ce13/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 1b0b789..7829f5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
* safe remember duration which can be used to perform cleanup operations.
*/
   def getMaxInputStreamRememberDuration(): Duration = {
-inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds }
+// If an InputDStream is not used, its `rememberDuration` will be null and 
we can ignore them
+inputStreams.map(_.rememberDuration).filter(_ != 
null).maxBy(_.milliseconds)
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/spark/blob/cf69ce13/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index c7a8771..860fac2 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
   "Please don't use queueStream when checkpointing is enabled."))
   }
 
+  test("Creating an InputDStream but not using it should not crash") {
+ssc = new StreamingContext(master, appName, batchDuration)
+val input1 = addInputStream(ssc)
+val input2 = addInputStream(ssc)
+val output = new TestOutputStream(input2)
+output.register()
+val batchCount = new BatchCounter(ssc)
+ssc.start()
+// Just wait for completing 2 batches to make sure it triggers
+// `DStream.getMaxInputStreamRememberDuration`
+batchCount.waitUntilBatchesCompleted(2, 1)
+// Throw the exception if crash
+ssc.awaitTerminationOrTimeout(1)
+ssc.stop()
+  }
+
   def addInputStream(s: StreamingContext): DStream[Int] = {
 val input = (1 to 100).map(i => 1 to i)
 val inputStream = new TestInputStream(s, input, 1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit

2015-11-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master bc5d6c038 -> 253e87e8a


[SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit

A cleanup for https://github.com/apache/spark/pull/9085.

The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them.
Also added low level unit test at `SqlParserSuite`

Author: Wenchen Fan 

Closes #9482 from cloud-fan/parser.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/253e87e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/253e87e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/253e87e8

Branch: refs/heads/master
Commit: 253e87e8ab8717ffef40a6d0d376b1add155ef90
Parents: bc5d6c0
Author: Wenchen Fan 
Authored: Fri Nov 6 06:38:49 2015 -0800
Committer: Reynold Xin 
Committed: Fri Nov 6 06:38:49 2015 -0800

--
 .../sql/catalyst/AbstractSparkSQLParser.scala   | 23 +---
 .../apache/spark/sql/catalyst/SqlParser.scala   | 20 -
 .../spark/sql/catalyst/SqlParserSuite.scala | 21 ++
 3 files changed, 35 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/253e87e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
index 04ac4f2..bdc52c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
@@ -78,10 +78,6 @@ private[sql] abstract class AbstractSparkSQLParser
 }
 
 class SqlLexical extends StdLexical {
-  case class FloatLit(chars: String) extends Token {
-override def toString: String = chars
-  }
-
   case class DecimalLit(chars: String) extends Token {
 override def toString: String = chars
   }
@@ -106,17 +102,16 @@ class SqlLexical extends StdLexical {
   }
 
   override lazy val token: Parser[Token] =
-( rep1(digit) ~ ('.' ~> digit.*).? ~ (exp ~> sign.? ~ rep1(digit)) ^^ {
-case i ~ None ~ (sig ~ rest) =>
-  DecimalLit(i.mkString + "e" + sig.mkString + rest.mkString)
-case i ~ Some(d) ~ (sig ~ rest) =>
-  DecimalLit(i.mkString + "." + d.mkString + "e" + sig.mkString + 
rest.mkString)
-  }
+( rep1(digit) ~ scientificNotation ^^ { case i ~ s => 
DecimalLit(i.mkString + s) }
+| '.' ~> (rep1(digit) ~ scientificNotation) ^^
+  { case i ~ s => DecimalLit("0." + i.mkString + s) }
+| rep1(digit) ~ ('.' ~> digit.*) ~ scientificNotation ^^
+  { case i1 ~ i2 ~ s => DecimalLit(i1.mkString + "." + i2.mkString + s) }
 | digit.* ~ identChar ~ (identChar | digit).* ^^
   { case first ~ middle ~ rest => processIdent((first ++ (middle :: 
rest)).mkString) }
 | rep1(digit) ~ ('.' ~> digit.*).? ^^ {
 case i ~ None => NumericLit(i.mkString)
-case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString)
+case i ~ Some(d) => DecimalLit(i.mkString + "." + d.mkString)
   }
 | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^
   { case chars => StringLit(chars mkString "") }
@@ -133,8 +128,10 @@ class SqlLexical extends StdLexical {
 
   override def identChar: Parser[Elem] = letter | elem('_')
 
-  private lazy val sign: Parser[Elem] = elem("s", c => c == '+' || c == '-')
-  private lazy val exp: Parser[Elem] = elem("e", c => c == 'E' || c == 'e')
+  private lazy val scientificNotation: Parser[String] =
+(elem('e') | elem('E')) ~> (elem('+') | elem('-')).? ~ rep1(digit) ^^ {
+  case s ~ rest => "e" + s.mkString + rest.mkString
+}
 
   override def whitespace: Parser[Any] =
 ( whitespaceChar

http://git-wip-us.apache.org/repos/asf/spark/blob/253e87e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 440e9e2..cd717c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -334,27 +334,15 @@ object SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
 
   protected lazy val numericLiteral: Parser[Literal] =
 ( integral  ^^ { case i => Literal(toNarrowestIntegerType(i)) }
-| sign.? ~ unsignedFloat ^^ {
-  case s ~ f => Literal(toDecimalOrDouble(s.getOrElse("") + f))
-}
-| sign.? ~ unsignedDecimal ^^ {
-  case s ~ d =>