spark git commit: [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python
Repository: spark Updated Branches: refs/heads/branch-1.6 e2546c227 -> aede729a9 [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python Could jkbradley and davies review it? - Create a wrapper class: `LDAModelWrapper` for `LDAModel`. Because we can't deal with the return value of`describeTopics` in Scala from pyspark directly. `Array[(Array[Int], Array[Double])]` is too complicated to convert it. - Add `loadLDAModel` in `PythonMLlibAPI`. Since `LDAModel` in Scala is an abstract class and we need to call `load` of `DistributedLDAModel`. [[SPARK-8467] Add LDAModel.describeTopics() in Python - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-8467) Author: Yu ISHIKAWA Closes #8643 from yu-iskw/SPARK-8467-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aede729a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aede729a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aede729a Branch: refs/heads/branch-1.6 Commit: aede729a9463e4450fc5e403cbc35bcd13ba40a3 Parents: e2546c2 Author: Yu ISHIKAWA Authored: Fri Nov 6 22:56:29 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 22:58:06 2015 -0800 -- .../mllib/api/python/LDAModelWrapper.scala | 46 .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 +- python/pyspark/mllib/clustering.py | 33 +++--- 3 files changed, 75 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aede729a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala new file mode 100644 index 000..63282ee --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.api.python + +import scala.collection.JavaConverters + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.clustering.LDAModel +import org.apache.spark.mllib.linalg.Matrix + +/** + * Wrapper around LDAModel to provide helper methods in Python + */ +private[python] class LDAModelWrapper(model: LDAModel) { + + def topicsMatrix(): Matrix = model.topicsMatrix + + def vocabSize(): Int = model.vocabSize + + def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize) + + def describeTopics(maxTermsPerTopic: Int): Array[Byte] = { +val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) => + val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava + val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava + Array[Any](jTerms, jTermWeights) +} +SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava) + } + + def save(sc: SparkContext, path: String): Unit = model.save(sc, path) +} http://git-wip-us.apache.org/repos/asf/spark/blob/aede729a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 40c4180..54b03a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -517,7 +517,7 @@ private[python] class PythonMLLibAPI extends Serializable { topicConcentration: Double, seed: java.lang.Long, checkpointInterval: Int, - optimizer: String): LDAModel = { + optimizer: String): LDAModelWrapper = { val algo = new LDA() .setK(k) .setMaxIterations(maxIterations) @@ -535,7 +535,16 @@ private[python] class PythonML
spark git commit: [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python
Repository: spark Updated Branches: refs/heads/master 7f741905b -> 2ff0e79a8 [SPARK-8467] [MLLIB] [PYSPARK] Add LDAModel.describeTopics() in Python Could jkbradley and davies review it? - Create a wrapper class: `LDAModelWrapper` for `LDAModel`. Because we can't deal with the return value of`describeTopics` in Scala from pyspark directly. `Array[(Array[Int], Array[Double])]` is too complicated to convert it. - Add `loadLDAModel` in `PythonMLlibAPI`. Since `LDAModel` in Scala is an abstract class and we need to call `load` of `DistributedLDAModel`. [[SPARK-8467] Add LDAModel.describeTopics() in Python - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-8467) Author: Yu ISHIKAWA Closes #8643 from yu-iskw/SPARK-8467-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ff0e79a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ff0e79a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ff0e79a Branch: refs/heads/master Commit: 2ff0e79a8647cca5c9c57f613a07e739ac4f677e Parents: 7f74190 Author: Yu ISHIKAWA Authored: Fri Nov 6 22:56:29 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 22:56:29 2015 -0800 -- .../mllib/api/python/LDAModelWrapper.scala | 46 .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 +- python/pyspark/mllib/clustering.py | 33 +++--- 3 files changed, 75 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ff0e79a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala new file mode 100644 index 000..63282ee --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.api.python + +import scala.collection.JavaConverters + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.clustering.LDAModel +import org.apache.spark.mllib.linalg.Matrix + +/** + * Wrapper around LDAModel to provide helper methods in Python + */ +private[python] class LDAModelWrapper(model: LDAModel) { + + def topicsMatrix(): Matrix = model.topicsMatrix + + def vocabSize(): Int = model.vocabSize + + def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize) + + def describeTopics(maxTermsPerTopic: Int): Array[Byte] = { +val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) => + val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava + val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava + Array[Any](jTerms, jTermWeights) +} +SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava) + } + + def save(sc: SparkContext, path: String): Unit = model.save(sc, path) +} http://git-wip-us.apache.org/repos/asf/spark/blob/2ff0e79a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 40c4180..54b03a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -517,7 +517,7 @@ private[python] class PythonMLLibAPI extends Serializable { topicConcentration: Double, seed: java.lang.Long, checkpointInterval: Int, - optimizer: String): LDAModel = { + optimizer: String): LDAModelWrapper = { val algo = new LDA() .setK(k) .setMaxIterations(maxIterations) @@ -535,7 +535,16 @@ private[python] class PythonMLLibAPI e
spark git commit: [SPARK-11112] DAG visualization: display RDD callsite
Repository: spark Updated Branches: refs/heads/master 30b706b7b -> 7f741905b [SPARK-2] DAG visualization: display RDD callsite https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png";> mateiz sarutak Author: Andrew Or Closes #9398 from andrewor14/rdd-callsite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f741905 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f741905 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f741905 Branch: refs/heads/master Commit: 7f741905b06ed6d3dfbff6db41a3355dab71aa3c Parents: 30b706b Author: Andrew Or Authored: Sat Nov 7 05:35:53 2015 +0100 Committer: Andrew Or Committed: Sat Nov 7 05:35:53 2015 +0100 -- .../apache/spark/ui/static/spark-dag-viz.css| 4 +++ .../org/apache/spark/storage/RDDInfo.scala | 16 +++-- .../spark/ui/scope/RDDOperationGraph.scala | 10 +++--- .../org/apache/spark/util/JsonProtocol.scala| 17 - .../scala/org/apache/spark/util/Utils.scala | 1 + .../org/apache/spark/ui/UISeleniumSuite.scala | 14 .../apache/spark/util/JsonProtocolSuite.scala | 37 7 files changed, 79 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index 3b4ae2e..9cc5c79 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -122,3 +122,7 @@ stroke: #52C366; stroke-width: 2px; } + +.tooltip-inner { + white-space: pre-wrap; +} http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 9606262..3fa209b 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDDOperationScope, RDD} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CallSite, Utils} @DeveloperApi class RDDInfo( @@ -28,9 +28,20 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], +val callSite: CallSite, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { + def this( + id: Int, + name: String, + numPartitions: Int, + storageLevel: StorageLevel, + parentIds: Seq[Int], + scope: Option[RDDOperationScope] = None) { +this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) + } + var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L @@ -56,6 +67,7 @@ private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) val parentIds = rdd.dependencies.map(_.rdd.id) -new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) +new RDDInfo(rdd.id, rddName, rdd.partitions.length, + rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7f741905/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 81f168a..2427456 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer} import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.CallSite /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph( rootCluster: RDDOperationCluster) /** A node in an RDDOperationGraph. This represents an RDD. */ -private[ui] case class RDDOperationNode(id: Int, name: Strin
[2/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory. ## User-facing changes This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit. ## Internals changes This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies. There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances). I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution. ## TODOs - [x] Fix handful of test failures in the MemoryManagerSuites. - [x] Fix remaining TODO comments in code. - [ ] Document new configuration. - [x] Fix commented-out tests / asserts: - [x] UnifiedMemoryManagerSuite. - [x] Write tests that exercise the new off-heap memory management policies. Author: Josh Rosen Closes #9344 from JoshRosen/offheap-memory-accounting. (cherry picked from commit 30b706b7b36482921ec04145a0121ca147984fa8) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2546c22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2546c22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2546c22 Branch: refs/heads/branch-1.6 Commit: e2546c227b7eca092c6a7cf5f307d152f4dbe800 Parents: 68c1d9f Author: Josh Rosen Authored: Fri Nov 6 18:17:34 2015 -0800 Committer: Josh Rosen Committed: Fri Nov 6 18:17:55 2015 -0800 -- .../org/apache/spark/memory/MemoryConsumer.java | 7 +- .../org/apache/spark/memory/MemoryMode.java | 26 ++ .../apache/spark/memory/TaskMemoryManager.java | 72 -- .../main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/memory/ExecutionMemoryPool.scala | 153 .../org/apache/spark/memory/MemoryManager.scala | 246 ++- .../org/apache/spark/memory/MemoryPool.scala| 71 ++ .../spark/memory/StaticMemoryManager.scala | 75 +- .../apache/spark/memory/StorageMemoryPool.scala | 138 +++ .../spark/memory/UnifiedMemoryManager.scala | 138 ++- .../scala/org/apache/spark/memory/package.scala | 75 ++ .../spark/util/collection/Spillable.scala | 8 +- .../spark/memory/TaskMemoryManagerSuite.java| 8 +- .../apache/spark/memory/TestMemoryConsumer.java | 10 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../map/AbstractBytesToBytesMapSuite.java | 4 +- .../spark/memory/MemoryManagerSuite.scala | 104 +--- .../spark/memory/StaticMemoryManagerSuite.scala | 39 +-- .../apache/spark/memory/TestMemoryManager.scala | 20 +- .../memory/UnifiedMemoryManagerSuite.scala | 93 +++ .../spark/storage/BlockManagerSuite.scala | 2 +- 21 files changed, 828 insertions(+), 465 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java -- diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 8fbdb72..36138cc 100644 --- a/core/src/main/java/org/apache/spar
[1/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
Repository: spark Updated Branches: refs/heads/branch-1.6 68c1d9fa6 -> e2546c227 http://git-wip-us.apache.org/repos/asf/spark/blob/e2546c22/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 885c450..54cb28c 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -24,7 +24,6 @@ import org.mockito.Mockito.when import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { maxExecutionMem: Long, maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { val mm = new StaticMemoryManager( - conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1) + conf, + maxOnHeapExecutionMemory = maxExecutionMem, + maxStorageMemory = maxStorageMem, + numCores = 1) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): StaticMemoryManager = { new StaticMemoryManager( - conf, - maxExecutionMemory = maxMemory, + conf.clone +.set("spark.memory.fraction", "1") +.set("spark.testing.memory", maxOnHeapExecutionMemory.toString) +.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString), + maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxStorageMemory = 0, numCores = 1) } test("basic execution memory") { val maxExecutionMem = 1000L +val taskAttemptId = 0L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) assert(mm.executionMemoryUsed === 0L) -assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) +assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) -assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max -assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) +assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) -assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) +assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) -mm.releaseExecutionMemory(800L) +mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release -assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) +assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired -mm.releaseExecutionMemory(maxExecutionMem) +mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { test("execution and storage isolation") { val maxExecutionMem = 200L val maxStorageMem = 1000L +val taskAttemptId = 0L val dummyBlock = TestBlockId("ain't nobody love like you do") val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) // Only execution memory should increase -assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) -assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase @@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released -mm.releaseExecutionMemory(133L) +
[1/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
Repository: spark Updated Branches: refs/heads/master 105732dcc -> 30b706b7b http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 885c450..54cb28c 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -24,7 +24,6 @@ import org.mockito.Mockito.when import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { maxExecutionMem: Long, maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { val mm = new StaticMemoryManager( - conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1) + conf, + maxOnHeapExecutionMemory = maxExecutionMem, + maxStorageMemory = maxStorageMem, + numCores = 1) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): StaticMemoryManager = { new StaticMemoryManager( - conf, - maxExecutionMemory = maxMemory, + conf.clone +.set("spark.memory.fraction", "1") +.set("spark.testing.memory", maxOnHeapExecutionMemory.toString) +.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString), + maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxStorageMemory = 0, numCores = 1) } test("basic execution memory") { val maxExecutionMem = 1000L +val taskAttemptId = 0L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) assert(mm.executionMemoryUsed === 0L) -assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) +assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) -assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max -assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) +assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) -assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) +assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) -mm.releaseExecutionMemory(800L) +mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release -assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) +assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired -mm.releaseExecutionMemory(maxExecutionMem) +mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { test("execution and storage isolation") { val maxExecutionMem = 200L val maxStorageMem = 1000L +val taskAttemptId = 0L val dummyBlock = TestBlockId("ain't nobody love like you do") val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) // Only execution memory should increase -assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) -assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L) +assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase @@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released -mm.releaseExecutionMemory(133L) +mm.r
[2/2] spark git commit: [SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory. ## User-facing changes This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit. ## Internals changes This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies. There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances). I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution. ## TODOs - [x] Fix handful of test failures in the MemoryManagerSuites. - [x] Fix remaining TODO comments in code. - [ ] Document new configuration. - [x] Fix commented-out tests / asserts: - [x] UnifiedMemoryManagerSuite. - [x] Write tests that exercise the new off-heap memory management policies. Author: Josh Rosen Closes #9344 from JoshRosen/offheap-memory-accounting. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30b706b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30b706b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30b706b7 Branch: refs/heads/master Commit: 30b706b7b36482921ec04145a0121ca147984fa8 Parents: 105732d Author: Josh Rosen Authored: Fri Nov 6 18:17:34 2015 -0800 Committer: Josh Rosen Committed: Fri Nov 6 18:17:34 2015 -0800 -- .../org/apache/spark/memory/MemoryConsumer.java | 7 +- .../org/apache/spark/memory/MemoryMode.java | 26 ++ .../apache/spark/memory/TaskMemoryManager.java | 72 -- .../main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/memory/ExecutionMemoryPool.scala | 153 .../org/apache/spark/memory/MemoryManager.scala | 246 ++- .../org/apache/spark/memory/MemoryPool.scala| 71 ++ .../spark/memory/StaticMemoryManager.scala | 75 +- .../apache/spark/memory/StorageMemoryPool.scala | 138 +++ .../spark/memory/UnifiedMemoryManager.scala | 138 ++- .../scala/org/apache/spark/memory/package.scala | 75 ++ .../spark/util/collection/Spillable.scala | 8 +- .../spark/memory/TaskMemoryManagerSuite.java| 8 +- .../apache/spark/memory/TestMemoryConsumer.java | 10 +- .../shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- .../map/AbstractBytesToBytesMapSuite.java | 4 +- .../spark/memory/MemoryManagerSuite.scala | 104 +--- .../spark/memory/StaticMemoryManagerSuite.scala | 39 +-- .../apache/spark/memory/TestMemoryManager.scala | 20 +- .../memory/UnifiedMemoryManagerSuite.scala | 93 +++ .../spark/storage/BlockManagerSuite.scala | 2 +- 21 files changed, 828 insertions(+), 465 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30b706b7/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java -- diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 8fbdb72..36138cc 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@
spark git commit: [HOTFIX] Fix python tests after #9527
Repository: spark Updated Branches: refs/heads/branch-1.6 9bf77d5c3 -> 68c1d9fa6 [HOTFIX] Fix python tests after #9527 #9527 missed updating the python tests. Author: Michael Armbrust Closes #9533 from marmbrus/hotfixTextValue. (cherry picked from commit 105732dcc6b651b9779f4a5773a759c5b4fbd21d) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68c1d9fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68c1d9fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68c1d9fa Branch: refs/heads/branch-1.6 Commit: 68c1d9fa6e90636a6b6d2a7b4d2a7e35c936e852 Parents: 9bf77d5 Author: Michael Armbrust Authored: Fri Nov 6 17:22:30 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 17:22:39 2015 -0800 -- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68c1d9fa/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 97bd90c..927f407 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -203,7 +203,7 @@ class DataFrameReader(object): >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt') >>> df.collect() -[Row(text=u'hello'), Row(text=u'this')] +[Row(value=u'hello'), Row(value=u'this')] """ return self._df(self._jreader.text(path)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [HOTFIX] Fix python tests after #9527
Repository: spark Updated Branches: refs/heads/master 1c80d66e5 -> 105732dcc [HOTFIX] Fix python tests after #9527 #9527 missed updating the python tests. Author: Michael Armbrust Closes #9533 from marmbrus/hotfixTextValue. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/105732dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/105732dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/105732dc Branch: refs/heads/master Commit: 105732dcc6b651b9779f4a5773a759c5b4fbd21d Parents: 1c80d66 Author: Michael Armbrust Authored: Fri Nov 6 17:22:30 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 17:22:30 2015 -0800 -- python/pyspark/sql/readwriter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/105732dc/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 97bd90c..927f407 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -203,7 +203,7 @@ class DataFrameReader(object): >>> df = sqlContext.read.text('python/test_support/sql/text-test.txt') >>> df.collect() -[Row(text=u'hello'), Row(text=u'this')] +[Row(value=u'hello'), Row(value=u'this')] """ return self._df(self._jreader.text(path)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11546] Thrift server makes too many logs about result schema
Repository: spark Updated Branches: refs/heads/branch-1.6 162a7704c -> 9bf77d5c3 [SPARK-11546] Thrift server makes too many logs about result schema SparkExecuteStatementOperation logs result schema for each getNextRowSet() calls which is by default every 1000 rows, overwhelming whole log file. Author: navis.ryu Closes #9514 from navis/SPARK-11546. (cherry picked from commit 1c80d66e52c0bcc4e5adda78b3d8e5bf55e4f128) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bf77d5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bf77d5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bf77d5c Branch: refs/heads/branch-1.6 Commit: 9bf77d5c37c7f26450a68341ce52c7c9fed3c21f Parents: 162a770 Author: navis.ryu Authored: Fri Nov 6 17:13:46 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 17:13:57 2015 -0800 -- .../SparkExecuteStatementOperation.scala| 24 +++- 1 file changed, 13 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bf77d5c/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 719b03e..82fef92 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -53,6 +53,18 @@ private[hive] class SparkExecuteStatementOperation( private var dataTypes: Array[DataType] = _ private var statementId: String = _ + private lazy val resultSchema: TableSchema = { +if (result == null || result.queryExecution.analyzed.output.size == 0) { + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) +} else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + val schema = result.queryExecution.analyzed.output.map { attr => +new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") + } + new TableSchema(schema.asJava) +} + } + def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. hiveContext.sparkContext.clearJobGroup() @@ -120,17 +132,7 @@ private[hive] class SparkExecuteStatementOperation( } } - def getResultSetSchema: TableSchema = { -if (result == null || result.queryExecution.analyzed.output.size == 0) { - new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) -} else { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - val schema = result.queryExecution.analyzed.output.map { attr => -new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - new TableSchema(schema.asJava) -} - } + def getResultSetSchema: TableSchema = resultSchema override def run(): Unit = { setState(OperationState.PENDING) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11546] Thrift server makes too many logs about result schema
Repository: spark Updated Branches: refs/heads/master 6d0ead322 -> 1c80d66e5 [SPARK-11546] Thrift server makes too many logs about result schema SparkExecuteStatementOperation logs result schema for each getNextRowSet() calls which is by default every 1000 rows, overwhelming whole log file. Author: navis.ryu Closes #9514 from navis/SPARK-11546. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c80d66e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c80d66e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c80d66e Branch: refs/heads/master Commit: 1c80d66e52c0bcc4e5adda78b3d8e5bf55e4f128 Parents: 6d0ead3 Author: navis.ryu Authored: Fri Nov 6 17:13:46 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 17:13:46 2015 -0800 -- .../SparkExecuteStatementOperation.scala| 24 +++- 1 file changed, 13 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c80d66e/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 719b03e..82fef92 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -53,6 +53,18 @@ private[hive] class SparkExecuteStatementOperation( private var dataTypes: Array[DataType] = _ private var statementId: String = _ + private lazy val resultSchema: TableSchema = { +if (result == null || result.queryExecution.analyzed.output.size == 0) { + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) +} else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + val schema = result.queryExecution.analyzed.output.map { attr => +new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") + } + new TableSchema(schema.asJava) +} + } + def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. hiveContext.sparkContext.clearJobGroup() @@ -120,17 +132,7 @@ private[hive] class SparkExecuteStatementOperation( } } - def getResultSetSchema: TableSchema = { -if (result == null || result.queryExecution.analyzed.output.size == 0) { - new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) -} else { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - val schema = result.queryExecution.analyzed.output.map { attr => -new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - new TableSchema(schema.asJava) -} - } + def getResultSetSchema: TableSchema = resultSchema override def run(): Unit = { setState(OperationState.PENDING) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule
Repository: spark Updated Branches: refs/heads/branch-1.6 40a5db561 -> 162a7704c [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path. This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are: - This can use the faster TungstenAggregate code path. - It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself. The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed. cc yhuai - Could you also tell me where to add tests for this? Author: Herman van Hovell Closes #9406 from hvanhovell/SPARK-9241-rewriter. (cherry picked from commit 6d0ead322e72303c6444c6ac641378a4690cde96) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162a7704 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162a7704 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162a7704 Branch: refs/heads/branch-1.6 Commit: 162a7704c90452d77b8a51fc5f5e3ec9bf14aaac Parents: 40a5db5 Author: Herman van Hovell Authored: Fri Nov 6 16:04:20 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 16:04:32 2015 -0800 -- .../catalyst/expressions/aggregate/Count.scala | 2 + .../catalyst/expressions/aggregate/Utils.scala | 186 ++- .../expressions/aggregate/interfaces.scala | 6 + .../sql/catalyst/optimizer/Optimizer.scala | 6 +- .../catalyst/plans/logical/basicOperators.scala | 80 .../spark/sql/execution/SparkStrategies.scala | 2 +- 6 files changed, 238 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/162a7704/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala index 54df96c..ec0c8b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala @@ -49,4 +49,6 @@ case class Count(child: Expression) extends DeclarativeAggregate { ) override val evaluateExpression = Cast(count, LongType) + + override def defaultResult: Option[Literal] = Option(Literal(0L)) } http://git-wip-us.apache.org/repos/asf/spark/blob/162a7704/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index 644c621..39010c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.types.{StructType, MapType, ArrayType} +import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType} /** * Utility functions used by the query planner to convert our plan to new aggregation code path. @@ -41,7 +42,7 @@ object Utils { private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match { case p: Aggregate if supportsGroupingKeySchema(p) => - val converted = p.transformExpressionsDown { + val converted = MultipleDistinctRewriter.rewrite(p.transformExpressionsDown { case expressions.Average(child) => aggregate.AggregateExpression2( aggregateFunction = aggregate.Avera
spark git commit: [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule
Repository: spark Updated Branches: refs/heads/master 1ab72b086 -> 6d0ead322 [SPARK-9241][SQL] Supporting multiple DISTINCT columns (2) - Rewriting Rule The second PR for SPARK-9241, this adds support for multiple distinct columns to the new aggregation code path. This PR solves the multiple DISTINCT column problem by rewriting these Aggregates into an Expand-Aggregate-Aggregate combination. See the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-9241) for some information on this. The advantages over the - competing - [first PR](https://github.com/apache/spark/pull/9280) are: - This can use the faster TungstenAggregate code path. - It is impossible to OOM due to an ```OpenHashSet``` allocating to much memory. However, this will multiply the number of input rows by the number of distinct clauses (plus one), and puts a lot more memory pressure on the aggregation code path itself. The location of this Rule is a bit funny, and should probably change when the old aggregation path is changed. cc yhuai - Could you also tell me where to add tests for this? Author: Herman van Hovell Closes #9406 from hvanhovell/SPARK-9241-rewriter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d0ead32 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d0ead32 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d0ead32 Branch: refs/heads/master Commit: 6d0ead322e72303c6444c6ac641378a4690cde96 Parents: 1ab72b0 Author: Herman van Hovell Authored: Fri Nov 6 16:04:20 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 16:04:20 2015 -0800 -- .../catalyst/expressions/aggregate/Count.scala | 2 + .../catalyst/expressions/aggregate/Utils.scala | 186 ++- .../expressions/aggregate/interfaces.scala | 6 + .../sql/catalyst/optimizer/Optimizer.scala | 6 +- .../catalyst/plans/logical/basicOperators.scala | 80 .../spark/sql/execution/SparkStrategies.scala | 2 +- 6 files changed, 238 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6d0ead32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala index 54df96c..ec0c8b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala @@ -49,4 +49,6 @@ case class Count(child: Expression) extends DeclarativeAggregate { ) override val evaluateExpression = Cast(count, LongType) + + override def defaultResult: Option[Literal] = Option(Literal(0L)) } http://git-wip-us.apache.org/repos/asf/spark/blob/6d0ead32/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala index 644c621..39010c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.types.{StructType, MapType, ArrayType} +import org.apache.spark.sql.catalyst.plans.logical.{Expand, Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{IntegerType, StructType, MapType, ArrayType} /** * Utility functions used by the query planner to convert our plan to new aggregation code path. @@ -41,7 +42,7 @@ object Utils { private def doConvert(plan: LogicalPlan): Option[Aggregate] = plan match { case p: Aggregate if supportsGroupingKeySchema(p) => - val converted = p.transformExpressionsDown { + val converted = MultipleDistinctRewriter.rewrite(p.transformExpressionsDown { case expressions.Average(child) => aggregate.AggregateExpression2( aggregateFunction = aggregate.Average(child), @@ -144,7 +145,8 @@ object Utils { aggregateFunction = aggregate.VarianceSamp(child),
spark git commit: [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…
Repository: spark Updated Branches: refs/heads/branch-1.6 02748c953 -> 40a5db561 [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW⦠â¦ithinPartitions. Author: Nong Li Closes #9504 from nongli/spark-11410. (cherry picked from commit 1ab72b08601a1c8a674bdd3fab84d9804899b2c7) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40a5db56 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40a5db56 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40a5db56 Branch: refs/heads/branch-1.6 Commit: 40a5db56169b87eecf574cf8e15e89caf4836ee4 Parents: 02748c9 Author: Nong Li Authored: Fri Nov 6 15:48:20 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 15:48:45 2015 -0800 -- python/pyspark/sql/dataframe.py | 117 ++- 1 file changed, 101 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/40a5db56/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 765a451..b97c94d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -423,6 +423,67 @@ class DataFrame(object): return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) @since(1.3) +def repartition(self, numPartitions, *cols): +""" +Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The +resulting DataFrame is hash partitioned. + +``numPartitions`` can be an int to specify the target number of partitions or a Column. +If it is a Column, it will be used as the first partitioning column. If not specified, +the default number of partitions is used. + +.. versionchanged:: 1.6 + Added optional arguments to specify the partitioning columns. Also made numPartitions + optional if partitioning columns are specified. + +>>> df.repartition(10).rdd.getNumPartitions() +10 +>>> data = df.unionAll(df).repartition("age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 2|Alice| +| 5| Bob| +| 5| Bob| ++---+-+ +>>> data = data.repartition(7, "age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 5| Bob| +| 5| Bob| +| 2|Alice| +| 2|Alice| ++---+-+ +>>> data.rdd.getNumPartitions() +7 +>>> data = data.repartition("name", "age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 5| Bob| +| 5| Bob| +| 2|Alice| +| 2|Alice| ++---+-+ +""" +if isinstance(numPartitions, int): +if len(cols) == 0: +return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) +else: +return DataFrame( +self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx) +elif isinstance(numPartitions, (basestring, Column)): +cols = (numPartitions, ) + cols +return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx) +else: +raise TypeError("numPartitions should be an int or Column") + +@since(1.3) def distinct(self): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. @@ -589,6 +650,26 @@ class DataFrame(object): jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) +@since(1.6) +def sortWithinPartitions(self, *cols, **kwargs): +"""Returns a new :class:`DataFrame` with each partition sorted by the specified column(s). + +:param cols: list of :class:`Column` or column names to sort by. +:param ascending: boolean or list of boolean (default True). +Sort ascending vs. descending. Specify list for multiple sort orders. +If a list is specified, length of the list must equal length of the `cols`. + +>>> df.sortWithinPartitions("age", ascending=False).show() ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 5| Bob| ++---+-+ +""" +jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs)) +return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix @since(1.3) def sort(self, *cols, **kwargs): @@ -613,22 +694,7 @@ class DataFrame(object): >>> df.orderBy(["age", "name"], ascending=[
spark git commit: [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW…
Repository: spark Updated Branches: refs/heads/master 7e9a9e603 -> 1ab72b086 [SPARK-11410] [PYSPARK] Add python bindings for repartition and sortW⦠â¦ithinPartitions. Author: Nong Li Closes #9504 from nongli/spark-11410. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ab72b08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ab72b08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ab72b08 Branch: refs/heads/master Commit: 1ab72b08601a1c8a674bdd3fab84d9804899b2c7 Parents: 7e9a9e6 Author: Nong Li Authored: Fri Nov 6 15:48:20 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 15:48:20 2015 -0800 -- python/pyspark/sql/dataframe.py | 117 ++- 1 file changed, 101 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ab72b08/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 765a451..b97c94d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -423,6 +423,67 @@ class DataFrame(object): return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) @since(1.3) +def repartition(self, numPartitions, *cols): +""" +Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The +resulting DataFrame is hash partitioned. + +``numPartitions`` can be an int to specify the target number of partitions or a Column. +If it is a Column, it will be used as the first partitioning column. If not specified, +the default number of partitions is used. + +.. versionchanged:: 1.6 + Added optional arguments to specify the partitioning columns. Also made numPartitions + optional if partitioning columns are specified. + +>>> df.repartition(10).rdd.getNumPartitions() +10 +>>> data = df.unionAll(df).repartition("age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 2|Alice| +| 5| Bob| +| 5| Bob| ++---+-+ +>>> data = data.repartition(7, "age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 5| Bob| +| 5| Bob| +| 2|Alice| +| 2|Alice| ++---+-+ +>>> data.rdd.getNumPartitions() +7 +>>> data = data.repartition("name", "age") +>>> data.show() ++---+-+ +|age| name| ++---+-+ +| 5| Bob| +| 5| Bob| +| 2|Alice| +| 2|Alice| ++---+-+ +""" +if isinstance(numPartitions, int): +if len(cols) == 0: +return DataFrame(self._jdf.repartition(numPartitions), self.sql_ctx) +else: +return DataFrame( +self._jdf.repartition(numPartitions, self._jcols(*cols)), self.sql_ctx) +elif isinstance(numPartitions, (basestring, Column)): +cols = (numPartitions, ) + cols +return DataFrame(self._jdf.repartition(self._jcols(*cols)), self.sql_ctx) +else: +raise TypeError("numPartitions should be an int or Column") + +@since(1.3) def distinct(self): """Returns a new :class:`DataFrame` containing the distinct rows in this :class:`DataFrame`. @@ -589,6 +650,26 @@ class DataFrame(object): jdf = self._jdf.join(other._jdf, on._jc, how) return DataFrame(jdf, self.sql_ctx) +@since(1.6) +def sortWithinPartitions(self, *cols, **kwargs): +"""Returns a new :class:`DataFrame` with each partition sorted by the specified column(s). + +:param cols: list of :class:`Column` or column names to sort by. +:param ascending: boolean or list of boolean (default True). +Sort ascending vs. descending. Specify list for multiple sort orders. +If a list is specified, length of the list must equal length of the `cols`. + +>>> df.sortWithinPartitions("age", ascending=False).show() ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 5| Bob| ++---+-+ +""" +jdf = self._jdf.sortWithinPartitions(self._sort_cols(cols, kwargs)) +return DataFrame(jdf, self.sql_ctx) + @ignore_unicode_prefix @since(1.3) def sort(self, *cols, **kwargs): @@ -613,22 +694,7 @@ class DataFrame(object): >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] """ -if n
spark git commit: [SPARK-11269][SQL] Java API support & test cases for Dataset
Repository: spark Updated Branches: refs/heads/branch-1.6 b58f1ce5b -> 02748c953 [SPARK-11269][SQL] Java API support & test cases for Dataset This simply brings https://github.com/apache/spark/pull/9358 up-to-date. Author: Wenchen Fan Author: Reynold Xin Closes #9528 from rxin/dataset-java. (cherry picked from commit 7e9a9e603abce8689938bdd62d04b29299644aa4) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02748c95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02748c95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02748c95 Branch: refs/heads/branch-1.6 Commit: 02748c953cdc85fda054e366b095225f906b4548 Parents: b58f1ce Author: Wenchen Fan Authored: Fri Nov 6 15:37:07 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 15:37:16 2015 -0800 -- .../spark/sql/catalyst/encoders/Encoder.scala | 123 ++- .../sql/catalyst/expressions/objects.scala | 21 ++ .../scala/org/apache/spark/sql/Dataset.scala| 126 ++- .../org/apache/spark/sql/DatasetHolder.scala| 6 +- .../org/apache/spark/sql/GroupedDataset.scala | 17 + .../scala/org/apache/spark/sql/SQLContext.scala | 4 + .../org/apache/spark/sql/JavaDatasetSuite.java | 357 +++ .../spark/sql/DatasetPrimitiveSuite.scala | 2 +- 8 files changed, 644 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/02748c95/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala index 329a132..f05e1828 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.catalyst.encoders - - import scala.reflect.ClassTag -import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils +import org.apache.spark.sql.types.{DataType, ObjectType, StructField, StructType} +import org.apache.spark.sql.catalyst.expressions._ /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -37,3 +37,120 @@ trait Encoder[T] extends Serializable { /** A ClassTag that can be used to construct and Array to contain a collection of `T`. */ def clsTag: ClassTag[T] } + +object Encoder { + import scala.reflect.runtime.universe._ + + def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true) + def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true) + def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true) + def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true) + def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true) + def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true) + def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) + def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) + + def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { +tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2)]] + } + + def tuple[T1, T2, T3]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + } + + def tuple[T1, T2, T3, T4]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3], + enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + } + + def tuple[T1, T2, T3, T4, T5]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3], + enc4: Encoder[T4], + enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { +tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] + } + + private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { +assert(encoders.length > 1) +// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. + assert(encoders.forall(_.constructExpression.find(_.isInstanceOf[Attribute]).isEmpty)) + +val schema = StructType(encoders.zipWithIndex.map { + case (e, i) => StructField(s"_${i + 1}", if
spark git commit: [SPARK-11269][SQL] Java API support & test cases for Dataset
Repository: spark Updated Branches: refs/heads/master f6680cdc5 -> 7e9a9e603 [SPARK-11269][SQL] Java API support & test cases for Dataset This simply brings https://github.com/apache/spark/pull/9358 up-to-date. Author: Wenchen Fan Author: Reynold Xin Closes #9528 from rxin/dataset-java. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e9a9e60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e9a9e60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e9a9e60 Branch: refs/heads/master Commit: 7e9a9e603abce8689938bdd62d04b29299644aa4 Parents: f6680cd Author: Wenchen Fan Authored: Fri Nov 6 15:37:07 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 15:37:07 2015 -0800 -- .../spark/sql/catalyst/encoders/Encoder.scala | 123 ++- .../sql/catalyst/expressions/objects.scala | 21 ++ .../scala/org/apache/spark/sql/Dataset.scala| 126 ++- .../org/apache/spark/sql/DatasetHolder.scala| 6 +- .../org/apache/spark/sql/GroupedDataset.scala | 17 + .../scala/org/apache/spark/sql/SQLContext.scala | 4 + .../org/apache/spark/sql/JavaDatasetSuite.java | 357 +++ .../spark/sql/DatasetPrimitiveSuite.scala | 2 +- 8 files changed, 644 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e9a9e60/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala index 329a132..f05e1828 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.catalyst.encoders - - import scala.reflect.ClassTag -import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils +import org.apache.spark.sql.types.{DataType, ObjectType, StructField, StructType} +import org.apache.spark.sql.catalyst.expressions._ /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -37,3 +37,120 @@ trait Encoder[T] extends Serializable { /** A ClassTag that can be used to construct and Array to contain a collection of `T`. */ def clsTag: ClassTag[T] } + +object Encoder { + import scala.reflect.runtime.universe._ + + def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true) + def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true) + def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true) + def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true) + def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true) + def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true) + def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) + def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) + + def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { +tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2)]] + } + + def tuple[T1, T2, T3]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + } + + def tuple[T1, T2, T3, T4]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3], + enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + } + + def tuple[T1, T2, T3, T4, T5]( + enc1: Encoder[T1], + enc2: Encoder[T2], + enc3: Encoder[T3], + enc4: Encoder[T4], + enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { +tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) + .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] + } + + private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { +assert(encoders.length > 1) +// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. + assert(encoders.forall(_.constructExpression.find(_.isInstanceOf[Attribute]).isEmpty)) + +val schema = StructType(encoders.zipWithIndex.map { + case (e, i) => StructField(s"_${i + 1}", if (e.flat) e.schema.head.dataType else e.schema) +}) + +val cls = Utils.getContextOrSparkClassLoa
spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work
Repository: spark Updated Branches: refs/heads/branch-1.5 dc058f2ff -> 8fb6696cd [SPARK-11555] spark on yarn spark-class --num-workers doesn't work I tested the various options with both spark-submit and spark-class of specifying number of executors in both client and cluster mode where it applied. --num-workers, --num-executors, spark.executor.instances, SPARK_EXECUTOR_INSTANCES, default nothing supplied Author: Thomas Graves Closes #9523 from tgravescs/SPARK-11555. (cherry picked from commit f6680cdc5d2912dea9768ef5c3e2cc101b06daf8) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8fb6696c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8fb6696c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8fb6696c Branch: refs/heads/branch-1.5 Commit: 8fb6696cd6e066957e34e1123df5691d8e4682d2 Parents: dc058f2 Author: Thomas Graves Authored: Fri Nov 6 15:24:33 2015 -0800 Committer: Marcelo Vanzin Committed: Fri Nov 6 15:24:58 2015 -0800 -- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +-- 2 files changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8fb6696c/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1165061..a9f4374 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. -numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) +numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull http://git-wip-us.apache.org/repos/asf/spark/blob/8fb6696c/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 991c909..73023d7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -355,8 +355,11 @@ object YarnSparkHadoopUtil { /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. + * If not using dynamic allocation it gets the number of executors reqeusted by the user. */ - def getInitialTargetExecutorNumber(conf: SparkConf): Int = { + def getInitialTargetExecutorNumber( + conf: SparkConf, + numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) val initialNumExecutors = @@ -369,7 +372,7 @@ object YarnSparkHadoopUtil { initialNumExecutors } else { val targetNumExecutors = - sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS) + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) // System property can override environment variable. conf.getInt("spark.executor.instances", targetNumExecutors) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work
Repository: spark Updated Branches: refs/heads/branch-1.6 e7e3bfda3 -> b58f1ce5b [SPARK-11555] spark on yarn spark-class --num-workers doesn't work I tested the various options with both spark-submit and spark-class of specifying number of executors in both client and cluster mode where it applied. --num-workers, --num-executors, spark.executor.instances, SPARK_EXECUTOR_INSTANCES, default nothing supplied Author: Thomas Graves Closes #9523 from tgravescs/SPARK-11555. (cherry picked from commit f6680cdc5d2912dea9768ef5c3e2cc101b06daf8) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b58f1ce5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b58f1ce5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b58f1ce5 Branch: refs/heads/branch-1.6 Commit: b58f1ce5b768f77b86626767f9366c00d2232b04 Parents: e7e3bfd Author: Thomas Graves Authored: Fri Nov 6 15:24:33 2015 -0800 Committer: Marcelo Vanzin Committed: Fri Nov 6 15:24:44 2015 -0800 -- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +-- 2 files changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b58f1ce5/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1165061..a9f4374 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. -numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) +numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull http://git-wip-us.apache.org/repos/asf/spark/blob/b58f1ce5/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 561ad79..a290ebe 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -392,8 +392,11 @@ object YarnSparkHadoopUtil { /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. + * If not using dynamic allocation it gets the number of executors reqeusted by the user. */ - def getInitialTargetExecutorNumber(conf: SparkConf): Int = { + def getInitialTargetExecutorNumber( + conf: SparkConf, + numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) val initialNumExecutors = @@ -406,7 +409,7 @@ object YarnSparkHadoopUtil { initialNumExecutors } else { val targetNumExecutors = - sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS) + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) // System property can override environment variable. conf.getInt("spark.executor.instances", targetNumExecutors) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11555] spark on yarn spark-class --num-workers doesn't work
Repository: spark Updated Branches: refs/heads/master c447c9d54 -> f6680cdc5 [SPARK-11555] spark on yarn spark-class --num-workers doesn't work I tested the various options with both spark-submit and spark-class of specifying number of executors in both client and cluster mode where it applied. --num-workers, --num-executors, spark.executor.instances, SPARK_EXECUTOR_INSTANCES, default nothing supplied Author: Thomas Graves Closes #9523 from tgravescs/SPARK-11555. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6680cdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6680cdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6680cdc Branch: refs/heads/master Commit: f6680cdc5d2912dea9768ef5c3e2cc101b06daf8 Parents: c447c9d Author: Thomas Graves Authored: Fri Nov 6 15:24:33 2015 -0800 Committer: Marcelo Vanzin Committed: Fri Nov 6 15:24:33 2015 -0800 -- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala| 7 +-- 2 files changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6680cdc/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1165061..a9f4374 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -81,7 +81,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) .orNull // If dynamic allocation is enabled, start at the configured initial number of executors. // Default to minExecutors if no initialExecutors is set. -numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) +numExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf, numExecutors) principal = Option(principal) .orElse(sparkConf.getOption("spark.yarn.principal")) .orNull http://git-wip-us.apache.org/repos/asf/spark/blob/f6680cdc/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 561ad79..a290ebe 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -392,8 +392,11 @@ object YarnSparkHadoopUtil { /** * Getting the initial target number of executors depends on whether dynamic allocation is * enabled. + * If not using dynamic allocation it gets the number of executors reqeusted by the user. */ - def getInitialTargetExecutorNumber(conf: SparkConf): Int = { + def getInitialTargetExecutorNumber( + conf: SparkConf, + numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0) val initialNumExecutors = @@ -406,7 +409,7 @@ object YarnSparkHadoopUtil { initialNumExecutors } else { val targetNumExecutors = - sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(DEFAULT_NUMBER_EXECUTORS) + sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) // System property can override environment variable. conf.getInt("spark.executor.instances", targetNumExecutors) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11217][ML] save/load for non-meta estimators and transformers
Repository: spark Updated Branches: refs/heads/branch-1.6 52e921c7c -> e7e3bfda3 [SPARK-11217][ML] save/load for non-meta estimators and transformers This PR implements the default save/load for non-meta estimators and transformers using the JSON serialization of param values. The saved metadata includes: * class name * uid * timestamp * paramMap The save/load interface is similar to DataFrames. We use the current active context by default, which should be sufficient for most use cases. ~~~scala instance.save("path") instance.write.context(sqlContext).overwrite().save("path") Instance.load("path") ~~~ The param handling is different from the design doc. We didn't save default and user-set params separately, and when we load it back, all parameters are user-set. This does cause issues. But it also cause other issues if we modify the default params. TODOs: * [x] Java test * [ ] a follow-up PR to implement default save/load for all non-meta estimators and transformers cc jkbradley Author: Xiangrui Meng Closes #9454 from mengxr/SPARK-11217. (cherry picked from commit c447c9d54603890db7399fb80adc9fae40b71f64) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7e3bfda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7e3bfda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7e3bfda Branch: refs/heads/branch-1.6 Commit: e7e3bfda315eb7809c8bdffa61a30b70a680611c Parents: 52e921c Author: Xiangrui Meng Authored: Fri Nov 6 14:51:03 2015 -0800 Committer: Joseph K. Bradley Committed: Fri Nov 6 14:51:15 2015 -0800 -- .../org/apache/spark/ml/feature/Binarizer.scala | 11 +- .../org/apache/spark/ml/param/params.scala | 2 +- .../org/apache/spark/ml/util/ReadWrite.scala| 220 +++ .../ml/util/JavaDefaultReadWriteSuite.java | 74 +++ .../spark/ml/feature/BinarizerSuite.scala | 11 +- .../spark/ml/util/DefaultReadWriteTest.scala| 110 ++ .../apache/spark/ml/util/TempDirectory.scala| 45 7 files changed, 469 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala index edad754..e5c2557 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.BinaryAttribute import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} -import org.apache.spark.ml.util.{Identifiable, SchemaUtils} +import org.apache.spark.ml.util._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructType} @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DoubleType, StructType} */ @Experimental final class Binarizer(override val uid: String) - extends Transformer with HasInputCol with HasOutputCol { + extends Transformer with Writable with HasInputCol with HasOutputCol { def this() = this(Identifiable.randomUID("binarizer")) @@ -86,4 +86,11 @@ final class Binarizer(override val uid: String) } override def copy(extra: ParamMap): Binarizer = defaultCopy(extra) + + override def write: Writer = new DefaultParamsWriter(this) +} + +object Binarizer extends Readable[Binarizer] { + + override def read: Reader[Binarizer] = new DefaultParamsReader[Binarizer] } http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/param/params.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 8361406..c932570 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -592,7 +592,7 @@ trait Params extends Identifiable with Serializable { /** * Sets a parameter in the embedded param map. */ - protected final def set[T](param: Param[T], value: T): this.type = { + final def set[T](param: Param[T], value: T): this.type = { set(param -> value) } http://git-wip-us.apache.org/repos/asf/spark/blob/e7e3bfda/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWr
spark git commit: [SPARK-11217][ML] save/load for non-meta estimators and transformers
Repository: spark Updated Branches: refs/heads/master 3a652f691 -> c447c9d54 [SPARK-11217][ML] save/load for non-meta estimators and transformers This PR implements the default save/load for non-meta estimators and transformers using the JSON serialization of param values. The saved metadata includes: * class name * uid * timestamp * paramMap The save/load interface is similar to DataFrames. We use the current active context by default, which should be sufficient for most use cases. ~~~scala instance.save("path") instance.write.context(sqlContext).overwrite().save("path") Instance.load("path") ~~~ The param handling is different from the design doc. We didn't save default and user-set params separately, and when we load it back, all parameters are user-set. This does cause issues. But it also cause other issues if we modify the default params. TODOs: * [x] Java test * [ ] a follow-up PR to implement default save/load for all non-meta estimators and transformers cc jkbradley Author: Xiangrui Meng Closes #9454 from mengxr/SPARK-11217. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c447c9d5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c447c9d5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c447c9d5 Branch: refs/heads/master Commit: c447c9d54603890db7399fb80adc9fae40b71f64 Parents: 3a652f6 Author: Xiangrui Meng Authored: Fri Nov 6 14:51:03 2015 -0800 Committer: Joseph K. Bradley Committed: Fri Nov 6 14:51:03 2015 -0800 -- .../org/apache/spark/ml/feature/Binarizer.scala | 11 +- .../org/apache/spark/ml/param/params.scala | 2 +- .../org/apache/spark/ml/util/ReadWrite.scala| 220 +++ .../ml/util/JavaDefaultReadWriteSuite.java | 74 +++ .../spark/ml/feature/BinarizerSuite.scala | 11 +- .../spark/ml/util/DefaultReadWriteTest.scala| 110 ++ .../apache/spark/ml/util/TempDirectory.scala| 45 7 files changed, 469 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala index edad754..e5c2557 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.BinaryAttribute import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} -import org.apache.spark.ml.util.{Identifiable, SchemaUtils} +import org.apache.spark.ml.util._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DoubleType, StructType} @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DoubleType, StructType} */ @Experimental final class Binarizer(override val uid: String) - extends Transformer with HasInputCol with HasOutputCol { + extends Transformer with Writable with HasInputCol with HasOutputCol { def this() = this(Identifiable.randomUID("binarizer")) @@ -86,4 +86,11 @@ final class Binarizer(override val uid: String) } override def copy(extra: ParamMap): Binarizer = defaultCopy(extra) + + override def write: Writer = new DefaultParamsWriter(this) +} + +object Binarizer extends Readable[Binarizer] { + + override def read: Reader[Binarizer] = new DefaultParamsReader[Binarizer] } http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/param/params.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 8361406..c932570 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -592,7 +592,7 @@ trait Params extends Identifiable with Serializable { /** * Sets a parameter in the embedded param map. */ - protected final def set[T](param: Param[T], value: T): this.type = { + final def set[T](param: Param[T], value: T): this.type = { set(param -> value) } http://git-wip-us.apache.org/repos/asf/spark/blob/c447c9d5/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala new file mode 100644 index 000..e
spark git commit: [SPARK-11561][SQL] Rename text data source's column name to value.
Repository: spark Updated Branches: refs/heads/branch-1.6 efa1e4a25 -> 52e921c7c [SPARK-11561][SQL] Rename text data source's column name to value. Author: Reynold Xin Closes #9527 from rxin/SPARK-11561. (cherry picked from commit 3a652f691b220fada0286f8d0a562c5657973d4d) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52e921c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52e921c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52e921c7 Branch: refs/heads/branch-1.6 Commit: 52e921c7cc6ba6dddb923221c42f52eb4cd98f0f Parents: efa1e4a Author: Reynold Xin Authored: Fri Nov 6 14:47:41 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 14:47:49 2015 -0800 -- .../spark/sql/execution/datasources/text/DefaultSource.scala | 6 ++ .../spark/sql/execution/datasources/text/TextSuite.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52e921c7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 52c4421..4b8b8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -30,14 +30,12 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder} -import org.apache.spark.sql.columnar.MutableUnsafeRow import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** @@ -78,7 +76,7 @@ private[sql] class TextRelation( extends HadoopFsRelation(maybePartitionSpec) { /** Data schema is always a single column, named "text". */ - override def dataSchema: StructType = new StructType().add("text", StringType) + override def dataSchema: StructType = new StructType().add("value", StringType) /** This is an internal data source that outputs internal row format. */ override val needConversion: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/52e921c7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0a2306c..914e516 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -65,7 +65,7 @@ class TextSuite extends QueryTest with SharedSQLContext { /** Verifies data and schema. */ private def verifyFrame(df: DataFrame): Unit = { // schema -assert(df.schema == new StructType().add("text", StringType)) +assert(df.schema == new StructType().add("value", StringType)) // verify content val data = df.collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11561][SQL] Rename text data source's column name to value.
Repository: spark Updated Branches: refs/heads/master f328fedaf -> 3a652f691 [SPARK-11561][SQL] Rename text data source's column name to value. Author: Reynold Xin Closes #9527 from rxin/SPARK-11561. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a652f69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a652f69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a652f69 Branch: refs/heads/master Commit: 3a652f691b220fada0286f8d0a562c5657973d4d Parents: f328fed Author: Reynold Xin Authored: Fri Nov 6 14:47:41 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 14:47:41 2015 -0800 -- .../spark/sql/execution/datasources/text/DefaultSource.scala | 6 ++ .../spark/sql/execution/datasources/text/TextSuite.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a652f69/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 52c4421..4b8b8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -30,14 +30,12 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder} -import org.apache.spark.sql.columnar.MutableUnsafeRow import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.SerializableConfiguration /** @@ -78,7 +76,7 @@ private[sql] class TextRelation( extends HadoopFsRelation(maybePartitionSpec) { /** Data schema is always a single column, named "text". */ - override def dataSchema: StructType = new StructType().add("text", StringType) + override def dataSchema: StructType = new StructType().add("value", StringType) /** This is an internal data source that outputs internal row format. */ override val needConversion: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/3a652f69/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0a2306c..914e516 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -65,7 +65,7 @@ class TextSuite extends QueryTest with SharedSQLContext { /** Verifies data and schema. */ private def verifyFrame(df: DataFrame): Unit = { // schema -assert(df.schema == new StructType().add("text", StringType)) +assert(df.schema == new StructType().add("value", StringType)) // verify content val data = df.collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11450] [SQL] Add Unsafe Row processing to Expand
Repository: spark Updated Branches: refs/heads/branch-1.6 7755b50b4 -> efa1e4a25 [SPARK-11450] [SQL] Add Unsafe Row processing to Expand This PR enables the Expand operator to process and produce Unsafe Rows. Author: Herman van Hovell Closes #9414 from hvanhovell/SPARK-11450. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efa1e4a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efa1e4a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efa1e4a2 Branch: refs/heads/branch-1.6 Commit: efa1e4a25cd6f4311faf9fc6c551ef1c71e2227b Parents: 7755b50 Author: Herman van Hovell Authored: Fri Nov 6 12:21:53 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 12:25:10 2015 -0800 -- .../sql/catalyst/expressions/Projection.scala | 6 ++- .../org/apache/spark/sql/execution/Expand.scala | 19 --- .../spark/sql/execution/basicOperators.scala| 8 +-- .../spark/sql/execution/ExpandSuite.scala | 54 4 files changed, 73 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index a6fe730..79dabe8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -128,7 +128,11 @@ object UnsafeProjection { * Returns an UnsafeProjection for given sequence of Expressions (bounded). */ def create(exprs: Seq[Expression]): UnsafeProjection = { -GenerateUnsafeProjection.generate(exprs) +val unsafeExprs = exprs.map(_ transform { + case CreateStruct(children) => CreateStructUnsafe(children) + case CreateNamedStruct(children) => CreateNamedStructUnsafe(children) +}) +GenerateUnsafeProjection.generate(unsafeExprs) } def create(expr: Expression): UnsafeProjection = create(Seq(expr)) http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index a458881..55e9576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -41,14 +41,21 @@ case class Expand( // as UNKNOWN partitioning override def outputPartitioning: Partitioning = UnknownPartitioning(0) + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true + + private[this] val projection = { +if (outputsUnsafeRows) { + (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) +} else { + (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)() +} + } + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { child.execute().mapPartitions { iter => - // TODO Move out projection objects creation and transfer to - // workers via closure. However we can't assume the Projection - // is serializable because of the code gen, so we have to - // create the projections within each of the partition processing. - val groups = projections.map(ee => newProjection(ee, child.output)).toArray - + val groups = projections.map(projection).toArray new Iterator[InternalRow] { private[this] var result: InternalRow = _ private[this] var idx = -1 // -1 means the initial state http://git-wip-us.apache.org/repos/asf/spark/blob/efa1e4a2/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d5a803f..799650a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -67,16 +67,10 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) override def output: Seq[Attribute] = projectList.map(_.toAttribute) - /** Rewrite the
spark git commit: [SPARK-11450] [SQL] Add Unsafe Row processing to Expand
Repository: spark Updated Branches: refs/heads/master 49f1a8203 -> f328fedaf [SPARK-11450] [SQL] Add Unsafe Row processing to Expand This PR enables the Expand operator to process and produce Unsafe Rows. Author: Herman van Hovell Closes #9414 from hvanhovell/SPARK-11450. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f328feda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f328feda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f328feda Branch: refs/heads/master Commit: f328fedafd7bd084470a5e402de0429b5b7f8cd7 Parents: 49f1a82 Author: Herman van Hovell Authored: Fri Nov 6 12:21:53 2015 -0800 Committer: Michael Armbrust Committed: Fri Nov 6 12:21:53 2015 -0800 -- .../sql/catalyst/expressions/Projection.scala | 6 ++- .../org/apache/spark/sql/execution/Expand.scala | 19 --- .../spark/sql/execution/basicOperators.scala| 8 +-- .../spark/sql/execution/ExpandSuite.scala | 54 4 files changed, 73 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index a6fe730..79dabe8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -128,7 +128,11 @@ object UnsafeProjection { * Returns an UnsafeProjection for given sequence of Expressions (bounded). */ def create(exprs: Seq[Expression]): UnsafeProjection = { -GenerateUnsafeProjection.generate(exprs) +val unsafeExprs = exprs.map(_ transform { + case CreateStruct(children) => CreateStructUnsafe(children) + case CreateNamedStruct(children) => CreateNamedStructUnsafe(children) +}) +GenerateUnsafeProjection.generate(unsafeExprs) } def create(expr: Expression): UnsafeProjection = create(Seq(expr)) http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index a458881..55e9576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -41,14 +41,21 @@ case class Expand( // as UNKNOWN partitioning override def outputPartitioning: Partitioning = UnknownPartitioning(0) + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true + + private[this] val projection = { +if (outputsUnsafeRows) { + (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) +} else { + (exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)() +} + } + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { child.execute().mapPartitions { iter => - // TODO Move out projection objects creation and transfer to - // workers via closure. However we can't assume the Projection - // is serializable because of the code gen, so we have to - // create the projections within each of the partition processing. - val groups = projections.map(ee => newProjection(ee, child.output)).toArray - + val groups = projections.map(projection).toArray new Iterator[InternalRow] { private[this] var result: InternalRow = _ private[this] var idx = -1 // -1 means the initial state http://git-wip-us.apache.org/repos/asf/spark/blob/f328feda/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d5a803f..799650a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -67,16 +67,10 @@ case class TungstenProject(projectList: Seq[NamedExpression], child: SparkPlan) override def output: Seq[Attribute] = projectList.map(_.toAttribute) - /** Rewrite the project
spark git commit: [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits
Repository: spark Updated Branches: refs/heads/master 62bb29077 -> 49f1a8203 [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits https://issues.apache.org/jira/browse/SPARK-10116 This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`. mengxr mkolod Author: Imran Rashid Closes #8314 from squito/SPARK-10116. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49f1a820 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49f1a820 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49f1a820 Branch: refs/heads/master Commit: 49f1a820372d1cba41f3f00d07eb5728f2ed6705 Parents: 62bb290 Author: Imran Rashid Authored: Fri Nov 6 20:06:24 2015 + Committer: Sean Owen Committed: Fri Nov 6 20:06:24 2015 + -- R/pkg/inst/tests/test_sparkSQL.R| 8 +-- .../spark/util/random/XORShiftRandom.scala | 6 ++- .../java/org/apache/spark/JavaAPISuite.java | 20 +--- .../spark/rdd/PairRDDFunctionsSuite.scala | 52 ++-- .../spark/util/random/XORShiftRandomSuite.scala | 15 ++ .../MultilayerPerceptronClassifierSuite.scala | 5 +- .../apache/spark/ml/feature/Word2VecSuite.scala | 16 -- .../mllib/clustering/StreamingKMeansSuite.scala | 13 +++-- python/pyspark/ml/feature.py| 20 python/pyspark/ml/recommendation.py | 6 +-- python/pyspark/mllib/recommendation.py | 4 +- python/pyspark/sql/dataframe.py | 6 +-- .../sql/catalyst/expressions/RandomSuite.scala | 8 +-- .../apache/spark/sql/JavaDataFrameSuite.java| 6 ++- .../apache/spark/sql/DataFrameStatSuite.scala | 4 +- 15 files changed, 128 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 816315b..92cff1f 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -875,9 +875,9 @@ test_that("column binary mathfunctions", { expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4) expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4) expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric") - expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01) + expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01) expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric") - expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01) + expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01) }) test_that("string operators", { @@ -1458,8 +1458,8 @@ test_that("sampleBy() on a DataFrame", { fractions <- list("0" = 0.1, "1" = 0.2) sample <- sampleBy(df, "key", fractions, 0) result <- collect(orderBy(count(groupBy(sample, "key")), "key")) - expect_identical(as.list(result[1, ]), list(key = "0", count = 2)) - expect_identical(as.list(result[2, ]), list(key = "1", count = 10)) + expect_identical(as.list(result[1, ]), list(key = "0", count = 3)) + expect_identical(as.list(result[2, ]), list(key = "1", count = 7)) }) test_that("SQL error message is returned from JVM", { http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 85fb923..e8cdb6e 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -60,9 +60,11 @@ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { private[spark] object XORShiftRandom { /** Hash seeds to have 0/1 bits throughout. */ - private def hashSeed(seed: Long): Long = { + private[random] def hashSeed(seed: Long): Long = { val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array() -MurmurHash3.bytesHash(bytes) +val lowBits = MurmurHash3.bytesHash(bytes) +val highBits = MurmurHash3.bytesHash(bytes, lowBits) +(highBits.toLong << 32) | (lowBits.toLong & 0xL) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/49f1a820/core/src/test/java/org/apache/spark/JavaAPISuite.java -- diff --git
spark git commit: [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits
Repository: spark Updated Branches: refs/heads/branch-1.6 e86499954 -> 7755b50b4 [SPARK-10116][CORE] XORShiftRandom.hashSeed is random in high bits https://issues.apache.org/jira/browse/SPARK-10116 This is really trivial, just happened to notice it -- if `XORShiftRandom.hashSeed` is really supposed to have random bits throughout (as the comment implies), it needs to do something for the conversion to `long`. mengxr mkolod Author: Imran Rashid Closes #8314 from squito/SPARK-10116. (cherry picked from commit 49f1a820372d1cba41f3f00d07eb5728f2ed6705) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7755b50b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7755b50b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7755b50b Branch: refs/heads/branch-1.6 Commit: 7755b50b4352432c1ab9d600dd4f4b52727bfa40 Parents: e864999 Author: Imran Rashid Authored: Fri Nov 6 20:06:24 2015 + Committer: Sean Owen Committed: Fri Nov 6 20:06:34 2015 + -- R/pkg/inst/tests/test_sparkSQL.R| 8 +-- .../spark/util/random/XORShiftRandom.scala | 6 ++- .../java/org/apache/spark/JavaAPISuite.java | 20 +--- .../spark/rdd/PairRDDFunctionsSuite.scala | 52 ++-- .../spark/util/random/XORShiftRandomSuite.scala | 15 ++ .../MultilayerPerceptronClassifierSuite.scala | 5 +- .../apache/spark/ml/feature/Word2VecSuite.scala | 16 -- .../mllib/clustering/StreamingKMeansSuite.scala | 13 +++-- python/pyspark/ml/feature.py| 20 python/pyspark/ml/recommendation.py | 6 +-- python/pyspark/mllib/recommendation.py | 4 +- python/pyspark/sql/dataframe.py | 6 +-- .../sql/catalyst/expressions/RandomSuite.scala | 8 +-- .../apache/spark/sql/JavaDataFrameSuite.java| 6 ++- .../apache/spark/sql/DataFrameStatSuite.scala | 4 +- 15 files changed, 128 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 816315b..92cff1f 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -875,9 +875,9 @@ test_that("column binary mathfunctions", { expect_equal(collect(select(df, shiftRight(df$b, 1)))[4, 1], 4) expect_equal(collect(select(df, shiftRightUnsigned(df$b, 1)))[4, 1], 4) expect_equal(class(collect(select(df, rand()))[2, 1]), "numeric") - expect_equal(collect(select(df, rand(1)))[1, 1], 0.45, tolerance = 0.01) + expect_equal(collect(select(df, rand(1)))[1, 1], 0.134, tolerance = 0.01) expect_equal(class(collect(select(df, randn()))[2, 1]), "numeric") - expect_equal(collect(select(df, randn(1)))[1, 1], -0.0111, tolerance = 0.01) + expect_equal(collect(select(df, randn(1)))[1, 1], -1.03, tolerance = 0.01) }) test_that("string operators", { @@ -1458,8 +1458,8 @@ test_that("sampleBy() on a DataFrame", { fractions <- list("0" = 0.1, "1" = 0.2) sample <- sampleBy(df, "key", fractions, 0) result <- collect(orderBy(count(groupBy(sample, "key")), "key")) - expect_identical(as.list(result[1, ]), list(key = "0", count = 2)) - expect_identical(as.list(result[2, ]), list(key = "1", count = 10)) + expect_identical(as.list(result[1, ]), list(key = "0", count = 3)) + expect_identical(as.list(result[2, ]), list(key = "1", count = 7)) }) test_that("SQL error message is returned from JVM", { http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index 85fb923..e8cdb6e 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -60,9 +60,11 @@ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { private[spark] object XORShiftRandom { /** Hash seeds to have 0/1 bits throughout. */ - private def hashSeed(seed: Long): Long = { + private[random] def hashSeed(seed: Long): Long = { val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array() -MurmurHash3.bytesHash(bytes) +val lowBits = MurmurHash3.bytesHash(bytes) +val highBits = MurmurHash3.bytesHash(bytes, lowBits) +(highBits.toLong << 32) | (lowBits.toLong & 0xL) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/7755b50b/core/src/test/java/org/apache/s
spark git commit: Typo fixes + code readability improvements
Repository: spark Updated Branches: refs/heads/master 8211aab07 -> 62bb29077 Typo fixes + code readability improvements Author: Jacek Laskowski Closes #9501 from jaceklaskowski/typos-with-style. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62bb2907 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62bb2907 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62bb2907 Branch: refs/heads/master Commit: 62bb290773c9f9fa53cbe6d4eedc6e153761a763 Parents: 8211aab Author: Jacek Laskowski Authored: Fri Nov 6 20:05:18 2015 + Committer: Sean Owen Committed: Fri Nov 6 20:05:18 2015 + -- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 14 ++ .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +--- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 10 +- .../scala/org/apache/spark/scheduler/TaskSet.scala| 2 +- 4 files changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62bb2907/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index d841f05..0453614 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -88,8 +88,8 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit) * * @param sc The SparkContext to associate the RDD with. * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed - * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. - * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. + * variable references an instance of JobConf, then that JobConf will be used for the Hadoop job. + * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD * creates. * @param inputFormatClass Storage format of the data to be read. @@ -123,7 +123,7 @@ class HadoopRDD[K, V]( sc, sc.broadcast(new SerializableConfiguration(conf)) .asInstanceOf[Broadcast[SerializableConfiguration]], - None /* initLocalJobConfFuncOpt */, + initLocalJobConfFuncOpt = None, inputFormatClass, keyClass, valueClass, @@ -184,8 +184,9 @@ class HadoopRDD[K, V]( protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] -if (newInputFormat.isInstanceOf[Configurable]) { - newInputFormat.asInstanceOf[Configurable].setConf(conf) +newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => } newInputFormat } @@ -195,9 +196,6 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) -if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(jobConf) -} val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { http://git-wip-us.apache.org/repos/asf/spark/blob/62bb2907/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a1f0fd0..4a9518f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -541,8 +541,7 @@ class DAGScheduler( } /** - * Submit an action job to the scheduler and get a JobWaiter object back. The JobWaiter object - * can be used to block until the the job finishes executing or can be used to cancel the job. + * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD @@ -551,6 +550,11 @@ class DAGScheduler( * @param callSite where in the user program this job was called * @param resultHandler callback to pass each result to * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name +
[3/4] spark git commit: [SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark various dialects as private.
[SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark various dialects as private. Author: Reynold Xin Closes #9511 from rxin/SPARK-11541. (cherry picked from commit bc5d6c03893a9bd340d6b94d3550e25648412241) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/641cdfd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/641cdfd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/641cdfd8 Branch: refs/heads/branch-1.6 Commit: 641cdfd869cf6096274f55adcccdf01caa9752b5 Parents: a015f81 Author: Reynold Xin Authored: Thu Nov 5 22:03:26 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 11:57:43 2015 -0800 -- project/MimaExcludes.scala | 19 +- .../org/apache/spark/sql/GroupedData.scala | 2 +- .../spark/sql/jdbc/AggregatedDialect.scala | 44 + .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 32 .../apache/spark/sql/jdbc/DerbyDialect.scala| 44 + .../apache/spark/sql/jdbc/JdbcDialects.scala| 190 +-- .../spark/sql/jdbc/MsSqlServerDialect.scala | 41 .../apache/spark/sql/jdbc/MySQLDialect.scala| 48 + .../apache/spark/sql/jdbc/OracleDialect.scala | 45 + .../apache/spark/sql/jdbc/PostgresDialect.scala | 54 ++ 10 files changed, 332 insertions(+), 187 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/641cdfd8/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 40f5c9f..dacef91 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -116,7 +116,24 @@ object MimaExcludes { "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$") ) ++ Seq( // SPARK-11485 - ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df") + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"), +// SPARK-11541 mark various JDBC dialects as private + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productElement"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productArity"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.canEqual"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productIterator"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productPrefix"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.toString"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.hashCode"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.PostgresDialect$"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productElement"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productArity"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.canEqual"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productIterator"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productPrefix"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.toString"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.hashCode"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$") ) case v if v.startsWith("1.5") => Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/641cdfd8/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index 7cf66b6..f9eab5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType class GroupedData protected[sql]( df: DataFrame, groupingExprs: Seq[Expression], -private val groupType: GroupedData.GroupType) { +groupType: GroupedData.GroupType) { private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColum
[1/4] spark git commit: [SPARK-11538][BUILD] Force guava 14 in sbt build.
Repository: spark Updated Branches: refs/heads/branch-1.6 9d6238859 -> e86499954 [SPARK-11538][BUILD] Force guava 14 in sbt build. sbt's version resolution code always picks the most recent version, and we don't want that for guava. Author: Marcelo Vanzin Closes #9508 from vanzin/SPARK-11538. (cherry picked from commit 5e31db70bb783656ba042863fcd3c223e17a8f81) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/051b2ca3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/051b2ca3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/051b2ca3 Branch: refs/heads/branch-1.6 Commit: 051b2ca3a0aa18c5d805cbd183bca504865297c4 Parents: 9d62388 Author: Marcelo Vanzin Authored: Thu Nov 5 18:05:58 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 11:57:29 2015 -0800 -- project/SparkBuild.scala | 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/051b2ca3/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 75c3693..b75ed13 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -207,7 +207,8 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, tools)) -.foreach(enable(sharedSettings ++ ExcludedDependencies.settings ++ Revolver.settings)) +.foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ + ExcludedDependencies.settings ++ Revolver.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) @@ -292,6 +293,14 @@ object Flume { } /** + * Overrides to work around sbt's dependency resolution being different from Maven's. + */ +object DependencyOverrides { + lazy val settings = Seq( +dependencyOverrides += "com.google.guava" % "guava" % "14.0.1") +} + +/** This excludes library dependencies in sbt, which are specified in maven but are not needed by sbt build. */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from checkpoint
Repository: spark Updated Branches: refs/heads/branch-1.6 089ba81d1 -> 9d6238859 [SPARK-11457][STREAMING][YARN] Fix incorrect AM proxy filter conf recovery from checkpoint Currently Yarn AM proxy filter configuration is recovered from checkpoint file when Spark Streaming application is restarted, which will lead to some unwanted behaviors: 1. Wrong RM address if RM is redeployed from failure. 2. Wrong proxyBase, since app id is updated, old app id for proxyBase is wrong. So instead of recovering from checkpoint file, these configurations should be reloaded each time when app started. This problem only exists in Yarn cluster mode, for Yarn client mode, these configurations will be updated with RPC message `AddWebUIFilter`. Please help to review tdas harishreedharan vanzin , thanks a lot. Author: jerryshao Closes #9412 from jerryshao/SPARK-11457. (cherry picked from commit 468ad0ae874d5cf55712ee976faf77f19c937ccb) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d623885 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d623885 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d623885 Branch: refs/heads/branch-1.6 Commit: 9d6238859e98651f32b9dd733a83c0e6f45978a7 Parents: 089ba81 Author: jerryshao Authored: Thu Nov 5 18:03:12 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 11:57:16 2015 -0800 -- .../scala/org/apache/spark/streaming/Checkpoint.scala | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d623885/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index b7de6dd..0cd55d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -55,7 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.port", "spark.master", "spark.yarn.keytab", - "spark.yarn.principal") + "spark.yarn.principal", + "spark.ui.filters") val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") @@ -66,6 +67,16 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) newSparkConf.set(prop, value) } } + +// Add Yarn proxy filter specific configurations to the recovered SparkConf +val filter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" +val filterPrefix = s"spark.$filter.param." +newReloadConf.getAll.foreach { case (k, v) => + if (k.startsWith(filterPrefix) && k.length > filterPrefix.length) { +newSparkConf.set(k, v) + } +} + newSparkConf } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/4] spark git commit: [SPARK-11540][SQL] API audit for QueryExecutionListener.
[SPARK-11540][SQL] API audit for QueryExecutionListener. Author: Reynold Xin Closes #9509 from rxin/SPARK-11540. (cherry picked from commit 3cc2c053b5d68c747a30bd58cf388b87b1922f13) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a015f814 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a015f814 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a015f814 Branch: refs/heads/branch-1.6 Commit: a015f8141f4123d0389594e890d25b1a0f0adc71 Parents: 051b2ca Author: Reynold Xin Authored: Thu Nov 5 18:12:54 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 11:57:36 2015 -0800 -- .../spark/sql/execution/QueryExecution.scala| 30 +++--- .../spark/sql/util/QueryExecutionListener.scala | 101 ++- 2 files changed, 72 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a015f814/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index fc91745..c2142d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import com.google.common.annotations.VisibleForTesting + import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -25,31 +27,33 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** * The primary workflow for executing relational queries using Spark. Designed to allow easy * access to the intermediate phases of query execution for developers. + * + * While this is not a public class, we should avoid changing the function names for the sake of + * changing them, because a lot of developers use the feature for debugging. */ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { - val analyzer = sqlContext.analyzer - val optimizer = sqlContext.optimizer - val planner = sqlContext.planner - val cacheManager = sqlContext.cacheManager - val prepareForExecution = sqlContext.prepareForExecution - def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed) + @VisibleForTesting + def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed) + + lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical) - lazy val analyzed: LogicalPlan = analyzer.execute(logical) lazy val withCachedData: LogicalPlan = { assertAnalyzed() -cacheManager.useCachedData(analyzed) +sqlContext.cacheManager.useCachedData(analyzed) } - lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) + + lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) // TODO: Don't just pick the first one... lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(sqlContext) -planner.plan(optimizedPlan).next() +sqlContext.planner.plan(optimizedPlan).next() } + // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. - lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan) + lazy val executedPlan: SparkPlan = sqlContext.prepareForExecution.execute(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ lazy val toRdd: RDD[InternalRow] = executedPlan.execute() @@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } - def simpleString: String = + def simpleString: String = { s"""== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim - + } override def toString: String = { def output = http://git-wip-us.apache.org/repos/asf/spark/blob/a015f814/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 909a8ab..ac432e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -19,36 +19,38 @@ package org.apache.spark.sql.util import java.util.concurrent.locks.ReentrantReadWriteLock
[4/4] spark git commit: [SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit
[SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit A cleanup for https://github.com/apache/spark/pull/9085. The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them. Also added low level unit test at `SqlParserSuite` Author: Wenchen Fan Closes #9482 from cloud-fan/parser. (cherry picked from commit 253e87e8ab8717ffef40a6d0d376b1add155ef90) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8649995 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8649995 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8649995 Branch: refs/heads/branch-1.6 Commit: e8649995418fb9c3a172181f0d0fa6d49ba2f674 Parents: 641cdfd Author: Wenchen Fan Authored: Fri Nov 6 06:38:49 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 11:57:50 2015 -0800 -- .../sql/catalyst/AbstractSparkSQLParser.scala | 23 +--- .../apache/spark/sql/catalyst/SqlParser.scala | 20 - .../spark/sql/catalyst/SqlParserSuite.scala | 21 ++ 3 files changed, 35 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8649995/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 04ac4f2..bdc52c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -78,10 +78,6 @@ private[sql] abstract class AbstractSparkSQLParser } class SqlLexical extends StdLexical { - case class FloatLit(chars: String) extends Token { -override def toString: String = chars - } - case class DecimalLit(chars: String) extends Token { override def toString: String = chars } @@ -106,17 +102,16 @@ class SqlLexical extends StdLexical { } override lazy val token: Parser[Token] = -( rep1(digit) ~ ('.' ~> digit.*).? ~ (exp ~> sign.? ~ rep1(digit)) ^^ { -case i ~ None ~ (sig ~ rest) => - DecimalLit(i.mkString + "e" + sig.mkString + rest.mkString) -case i ~ Some(d) ~ (sig ~ rest) => - DecimalLit(i.mkString + "." + d.mkString + "e" + sig.mkString + rest.mkString) - } +( rep1(digit) ~ scientificNotation ^^ { case i ~ s => DecimalLit(i.mkString + s) } +| '.' ~> (rep1(digit) ~ scientificNotation) ^^ + { case i ~ s => DecimalLit("0." + i.mkString + s) } +| rep1(digit) ~ ('.' ~> digit.*) ~ scientificNotation ^^ + { case i1 ~ i2 ~ s => DecimalLit(i1.mkString + "." + i2.mkString + s) } | digit.* ~ identChar ~ (identChar | digit).* ^^ { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) } | rep1(digit) ~ ('.' ~> digit.*).? ^^ { case i ~ None => NumericLit(i.mkString) -case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString) +case i ~ Some(d) => DecimalLit(i.mkString + "." + d.mkString) } | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^ { case chars => StringLit(chars mkString "") } @@ -133,8 +128,10 @@ class SqlLexical extends StdLexical { override def identChar: Parser[Elem] = letter | elem('_') - private lazy val sign: Parser[Elem] = elem("s", c => c == '+' || c == '-') - private lazy val exp: Parser[Elem] = elem("e", c => c == 'E' || c == 'e') + private lazy val scientificNotation: Parser[String] = +(elem('e') | elem('E')) ~> (elem('+') | elem('-')).? ~ rep1(digit) ^^ { + case s ~ rest => "e" + s.mkString + rest.mkString +} override def whitespace: Parser[Any] = ( whitespaceChar http://git-wip-us.apache.org/repos/asf/spark/blob/e8649995/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 440e9e2..cd717c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -334,27 +334,15 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected lazy val numericLiteral: Parser[Literal] = ( integral ^^ { case i => Literal(toNarrowestIntegerType(i)) } -| sign.? ~ unsignedFloat ^^ { - case s ~ f => Literal(toDecimalOrDouble(s.getOrElse("") + f)) -} -| sign.? ~ unsignedDecimal ^^ {
spark git commit: [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up)
Repository: spark Updated Branches: refs/heads/branch-1.6 fba48e733 -> 089ba81d1 [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up) https://issues.apache.org/jira/browse/SPARK-9858 This PR is the follow-up work of https://github.com/apache/spark/pull/9276. It addresses JoshRosen's comments. Author: Yin Huai Closes #9453 from yhuai/numReducer-followUp. (cherry picked from commit 8211aab0793cf64202b99be4f31bb8a9ae77050d) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/089ba81d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/089ba81d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/089ba81d Branch: refs/heads/branch-1.6 Commit: 089ba81d10630dfb046a71e32c1d1804511f050f Parents: fba48e7 Author: Yin Huai Authored: Fri Nov 6 11:13:51 2015 -0800 Committer: Yin Huai Committed: Fri Nov 6 11:13:58 2015 -0800 -- .../catalyst/plans/physical/partitioning.scala | 8 - .../apache/spark/sql/execution/Exchange.scala | 40 +++-- .../sql/execution/ExchangeCoordinator.scala | 31 ++-- .../org/apache/spark/sql/CachedTableSuite.scala | 150 ++- 4 files changed, 167 insertions(+), 62 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/089ba81d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 9312c81..86b9417 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -165,11 +165,6 @@ sealed trait Partitioning { * produced by `A` could have also been produced by `B`. */ def guarantees(other: Partitioning): Boolean = this == other - - def withNumPartitions(newNumPartitions: Int): Partitioning = { -throw new IllegalStateException( - s"It is not allowed to call withNumPartitions method of a ${this.getClass.getSimpleName}") - } } object Partitioning { @@ -254,9 +249,6 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) case _ => false } - override def withNumPartitions(newNumPartitions: Int): HashPartitioning = { -HashPartitioning(expressions, newNumPartitions) - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/089ba81d/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 0f72ec6..a4ce328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -242,7 +242,7 @@ case class Exchange( // update the number of post-shuffle partitions. specifiedPartitionStartIndices.foreach { indices => assert(newPartitioning.isInstanceOf[HashPartitioning]) - newPartitioning = newPartitioning.withNumPartitions(indices.length) + newPartitioning = UnknownPartitioning(indices.length) } new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) } @@ -262,7 +262,7 @@ case class Exchange( object Exchange { def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = { -Exchange(newPartitioning, child, None: Option[ExchangeCoordinator]) +Exchange(newPartitioning, child, coordinator = None: Option[ExchangeCoordinator]) } } @@ -315,7 +315,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ child.outputPartitioning match { case hash: HashPartitioning => true case collection: PartitioningCollection => - collection.partitionings.exists(_.isInstanceOf[HashPartitioning]) + collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) case _ => false } } @@ -416,28 +416,48 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ // First check if the existing partitions of the children all match. This means they are // partitioned by the same partitioning into the same number of partitions. In that case, // don't try to make them match `defaultPartitions`, just use the existing partitioning. - // TODO: this s
spark git commit: [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up)
Repository: spark Updated Branches: refs/heads/master c048929c6 -> 8211aab07 [SPARK-9858][SQL] Add an ExchangeCoordinator to estimate the number of post-shuffle partitions for aggregates and joins (follow-up) https://issues.apache.org/jira/browse/SPARK-9858 This PR is the follow-up work of https://github.com/apache/spark/pull/9276. It addresses JoshRosen's comments. Author: Yin Huai Closes #9453 from yhuai/numReducer-followUp. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8211aab0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8211aab0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8211aab0 Branch: refs/heads/master Commit: 8211aab0793cf64202b99be4f31bb8a9ae77050d Parents: c048929 Author: Yin Huai Authored: Fri Nov 6 11:13:51 2015 -0800 Committer: Yin Huai Committed: Fri Nov 6 11:13:51 2015 -0800 -- .../catalyst/plans/physical/partitioning.scala | 8 - .../apache/spark/sql/execution/Exchange.scala | 40 +++-- .../sql/execution/ExchangeCoordinator.scala | 31 ++-- .../org/apache/spark/sql/CachedTableSuite.scala | 150 ++- 4 files changed, 167 insertions(+), 62 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8211aab0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 9312c81..86b9417 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -165,11 +165,6 @@ sealed trait Partitioning { * produced by `A` could have also been produced by `B`. */ def guarantees(other: Partitioning): Boolean = this == other - - def withNumPartitions(newNumPartitions: Int): Partitioning = { -throw new IllegalStateException( - s"It is not allowed to call withNumPartitions method of a ${this.getClass.getSimpleName}") - } } object Partitioning { @@ -254,9 +249,6 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) case _ => false } - override def withNumPartitions(newNumPartitions: Int): HashPartitioning = { -HashPartitioning(expressions, newNumPartitions) - } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/8211aab0/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 0f72ec6..a4ce328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -242,7 +242,7 @@ case class Exchange( // update the number of post-shuffle partitions. specifiedPartitionStartIndices.foreach { indices => assert(newPartitioning.isInstanceOf[HashPartitioning]) - newPartitioning = newPartitioning.withNumPartitions(indices.length) + newPartitioning = UnknownPartitioning(indices.length) } new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) } @@ -262,7 +262,7 @@ case class Exchange( object Exchange { def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = { -Exchange(newPartitioning, child, None: Option[ExchangeCoordinator]) +Exchange(newPartitioning, child, coordinator = None: Option[ExchangeCoordinator]) } } @@ -315,7 +315,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ child.outputPartitioning match { case hash: HashPartitioning => true case collection: PartitioningCollection => - collection.partitionings.exists(_.isInstanceOf[HashPartitioning]) + collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) case _ => false } } @@ -416,28 +416,48 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ // First check if the existing partitions of the children all match. This means they are // partitioned by the same partitioning into the same number of partitions. In that case, // don't try to make them match `defaultPartitions`, just use the existing partitioning. - // TODO: this should be a cost based decision. For example, a big relation should probably - // maintain its exi
spark git commit: [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399
Repository: spark Updated Branches: refs/heads/master 574141a29 -> c048929c6 [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399 This PR adds test cases that test various column pruning and filter push-down cases. Author: Cheng Lian Closes #9468 from liancheng/spark-10978.follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c048929c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c048929c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c048929c Branch: refs/heads/master Commit: c048929c6a9f7ce57f384037cd6c0bf5751c447a Parents: 574141a Author: Cheng Lian Authored: Fri Nov 6 11:11:36 2015 -0800 Committer: Yin Huai Committed: Fri Nov 6 11:11:36 2015 -0800 -- .../spark/sql/sources/FilteredScanSuite.scala | 21 +- .../SimpleTextHadoopFsRelationSuite.scala | 335 +-- .../spark/sql/sources/SimpleTextRelation.scala | 11 + 3 files changed, 321 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c048929c/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7541e72..2cad964 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.sources -import org.apache.spark.sql.execution.datasources.LogicalRelation - import scala.language.existentials import org.apache.spark.rdd.RDD -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ - +import org.apache.spark.unsafe.types.UTF8String class FilteredScanSource extends RelationProvider { override def createRelation( @@ -130,7 +129,7 @@ object ColumnsRequired { var set: Set[String] = Set.empty } -class FilteredScanSuite extends DataSourceTest with SharedSQLContext { +class FilteredScanSuite extends DataSourceTest with SharedSQLContext with PredicateHelper { protected override lazy val sql = caseInsensitiveContext.sql _ override def beforeAll(): Unit = { @@ -144,9 +143,6 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext { | to '10' |) """.stripMargin) - -// UDF for testing filter push-down -caseInsensitiveContext.udf.register("udf_gt3", (_: Int) > 3) } sqlTest( @@ -276,14 +272,15 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext { testPushDown("SELECT c FROM oneToTenFiltered WHERE c = 'aA'", 1, Set("c")) testPushDown("SELECT c FROM oneToTenFiltered WHERE c IN ('aA', 'foo')", 1, Set("c")) - // Columns only referenced by UDF filter must be required, as UDF filters can't be pushed down. - testPushDown("SELECT c FROM oneToTenFiltered WHERE udf_gt3(A)", 10, Set("a", "c")) + // Filters referencing multiple columns are not convertible, all referenced columns must be + // required. + testPushDown("SELECT c FROM oneToTenFiltered WHERE A + b > 9", 10, Set("a", "b", "c")) - // A query with an unconvertible filter, an unhandled filter, and a handled filter. + // A query with an inconvertible filter, an unhandled filter, and a handled filter. testPushDown( """SELECT a | FROM oneToTenFiltered - | WHERE udf_gt3(b) + | WHERE a + b > 9 | AND b < 16 | AND c IN ('bB', 'cC', 'dD', 'foo') """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b")) http://git-wip-us.apache.org/repos/asf/spark/blob/c048929c/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index d945408..9251a69 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -17,15 +17,21 @@ package org.apache.spark.sql.sources +import java.io.File + import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.catalyst.expressions.Expression import org
spark git commit: [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399
Repository: spark Updated Branches: refs/heads/branch-1.6 d69bc9e47 -> fba48e733 [SPARK-10978][SQL][FOLLOW-UP] More comprehensive tests for PR #9399 This PR adds test cases that test various column pruning and filter push-down cases. Author: Cheng Lian Closes #9468 from liancheng/spark-10978.follow-up. (cherry picked from commit c048929c6a9f7ce57f384037cd6c0bf5751c447a) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fba48e73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fba48e73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fba48e73 Branch: refs/heads/branch-1.6 Commit: fba48e7332b58169bda215c2a261ae9195c67744 Parents: d69bc9e Author: Cheng Lian Authored: Fri Nov 6 11:11:36 2015 -0800 Committer: Yin Huai Committed: Fri Nov 6 11:11:45 2015 -0800 -- .../spark/sql/sources/FilteredScanSuite.scala | 21 +- .../SimpleTextHadoopFsRelationSuite.scala | 335 +-- .../spark/sql/sources/SimpleTextRelation.scala | 11 + 3 files changed, 321 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fba48e73/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7541e72..2cad964 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.sources -import org.apache.spark.sql.execution.datasources.LogicalRelation - import scala.language.existentials import org.apache.spark.rdd.RDD -import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ - +import org.apache.spark.unsafe.types.UTF8String class FilteredScanSource extends RelationProvider { override def createRelation( @@ -130,7 +129,7 @@ object ColumnsRequired { var set: Set[String] = Set.empty } -class FilteredScanSuite extends DataSourceTest with SharedSQLContext { +class FilteredScanSuite extends DataSourceTest with SharedSQLContext with PredicateHelper { protected override lazy val sql = caseInsensitiveContext.sql _ override def beforeAll(): Unit = { @@ -144,9 +143,6 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext { | to '10' |) """.stripMargin) - -// UDF for testing filter push-down -caseInsensitiveContext.udf.register("udf_gt3", (_: Int) > 3) } sqlTest( @@ -276,14 +272,15 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext { testPushDown("SELECT c FROM oneToTenFiltered WHERE c = 'aA'", 1, Set("c")) testPushDown("SELECT c FROM oneToTenFiltered WHERE c IN ('aA', 'foo')", 1, Set("c")) - // Columns only referenced by UDF filter must be required, as UDF filters can't be pushed down. - testPushDown("SELECT c FROM oneToTenFiltered WHERE udf_gt3(A)", 10, Set("a", "c")) + // Filters referencing multiple columns are not convertible, all referenced columns must be + // required. + testPushDown("SELECT c FROM oneToTenFiltered WHERE A + b > 9", 10, Set("a", "b", "c")) - // A query with an unconvertible filter, an unhandled filter, and a handled filter. + // A query with an inconvertible filter, an unhandled filter, and a handled filter. testPushDown( """SELECT a | FROM oneToTenFiltered - | WHERE udf_gt3(b) + | WHERE a + b > 9 | AND b < 16 | AND c IN ('bB', 'cC', 'dD', 'foo') """.stripMargin.split("\n").map(_.trim).mkString(" "), 3, Set("a", "b")) http://git-wip-us.apache.org/repos/asf/spark/blob/fba48e73/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index d945408..9251a69 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -17,15 +17,21 @@ package org.apache.spark.sql.sources +import java.io.File + impo
spark git commit: [SPARK-9162] [SQL] Implement code generation for ScalaUDF
Repository: spark Updated Branches: refs/heads/branch-1.6 0a430f04e -> d69bc9e47 [SPARK-9162] [SQL] Implement code generation for ScalaUDF JIRA: https://issues.apache.org/jira/browse/SPARK-9162 Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF. Author: Liang-Chi Hsieh Closes #9270 from viirya/scalaudf-codegen. (cherry picked from commit 574141a29835ce78d68c97bb54336cf4fd3c39d3) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d69bc9e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d69bc9e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d69bc9e4 Branch: refs/heads/branch-1.6 Commit: d69bc9e4754d9638ca525a0880118bcf5ab2ad05 Parents: 0a430f0 Author: Liang-Chi Hsieh Authored: Fri Nov 6 10:52:04 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 10:52:14 2015 -0800 -- .../sql/catalyst/expressions/ScalaUDF.scala | 85 +++- .../scala/org/apache/spark/sql/UDFSuite.scala | 41 ++ 2 files changed, 124 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d69bc9e4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 11c7950..3388cc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types.DataType /** @@ -31,7 +31,7 @@ case class ScalaUDF( dataType: DataType, children: Seq[Expression], inputTypes: Seq[DataType] = Nil) - extends Expression with ImplicitCastInputTypes with CodegenFallback { + extends Expression with ImplicitCastInputTypes { override def nullable: Boolean = true @@ -60,6 +60,10 @@ case class ScalaUDF( */ + // Accessors used in genCode + def userDefinedFunc(): AnyRef = function + def getChildren(): Seq[Expression] = children + private[this] val f = children.size match { case 0 => val func = function.asInstanceOf[() => Any] @@ -960,6 +964,83 @@ case class ScalaUDF( } // scalastyle:on + + // Generate codes used to convert the arguments to Scala type for user-defined funtions + private[this] def genCodeForConverter(ctx: CodeGenContext, index: Int): String = { +val converterClassName = classOf[Any => Any].getName +val typeConvertersClassName = CatalystTypeConverters.getClass.getName + ".MODULE$" +val expressionClassName = classOf[Expression].getName +val scalaUDFClassName = classOf[ScalaUDF].getName + +val converterTerm = ctx.freshName("converter") +val expressionIdx = ctx.references.size - 1 +ctx.addMutableState(converterClassName, converterTerm, + s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" + + s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" + + s"expressions[$expressionIdx]).getChildren().apply($index))).dataType());") +converterTerm + } + + override def genCode( + ctx: CodeGenContext, + ev: GeneratedExpressionCode): String = { + +ctx.references += this + +val scalaUDFClassName = classOf[ScalaUDF].getName +val converterClassName = classOf[Any => Any].getName +val typeConvertersClassName = CatalystTypeConverters.getClass.getName + ".MODULE$" +val expressionClassName = classOf[Expression].getName + +// Generate codes used to convert the returned value of user-defined functions to Catalyst type +val catalystConverterTerm = ctx.freshName("catalystConverter") +val catalystConverterTermIdx = ctx.references.size - 1 +ctx.addMutableState(converterClassName, catalystConverterTerm, + s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" + +s".createToCatalystConverter((($scalaUDFClassName)expressions" + + s"[$catalystConverterTermIdx]).dataType());") + +val resultTerm = ctx.freshName("result") + +// This must be called before children expressions' codegen +// because ctx.references is used in genCodeForConverter +val converterTerms = (0 until chil
spark git commit: [SPARK-9162] [SQL] Implement code generation for ScalaUDF
Repository: spark Updated Branches: refs/heads/master cf69ce136 -> 574141a29 [SPARK-9162] [SQL] Implement code generation for ScalaUDF JIRA: https://issues.apache.org/jira/browse/SPARK-9162 Currently ScalaUDF extends CodegenFallback and doesn't provide code generation implementation. This path implements code generation for ScalaUDF. Author: Liang-Chi Hsieh Closes #9270 from viirya/scalaudf-codegen. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/574141a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/574141a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/574141a2 Branch: refs/heads/master Commit: 574141a29835ce78d68c97bb54336cf4fd3c39d3 Parents: cf69ce1 Author: Liang-Chi Hsieh Authored: Fri Nov 6 10:52:04 2015 -0800 Committer: Davies Liu Committed: Fri Nov 6 10:52:04 2015 -0800 -- .../sql/catalyst/expressions/ScalaUDF.scala | 85 +++- .../scala/org/apache/spark/sql/UDFSuite.scala | 41 ++ 2 files changed, 124 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/574141a2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index 11c7950..3388cc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types.DataType /** @@ -31,7 +31,7 @@ case class ScalaUDF( dataType: DataType, children: Seq[Expression], inputTypes: Seq[DataType] = Nil) - extends Expression with ImplicitCastInputTypes with CodegenFallback { + extends Expression with ImplicitCastInputTypes { override def nullable: Boolean = true @@ -60,6 +60,10 @@ case class ScalaUDF( */ + // Accessors used in genCode + def userDefinedFunc(): AnyRef = function + def getChildren(): Seq[Expression] = children + private[this] val f = children.size match { case 0 => val func = function.asInstanceOf[() => Any] @@ -960,6 +964,83 @@ case class ScalaUDF( } // scalastyle:on + + // Generate codes used to convert the arguments to Scala type for user-defined funtions + private[this] def genCodeForConverter(ctx: CodeGenContext, index: Int): String = { +val converterClassName = classOf[Any => Any].getName +val typeConvertersClassName = CatalystTypeConverters.getClass.getName + ".MODULE$" +val expressionClassName = classOf[Expression].getName +val scalaUDFClassName = classOf[ScalaUDF].getName + +val converterTerm = ctx.freshName("converter") +val expressionIdx = ctx.references.size - 1 +ctx.addMutableState(converterClassName, converterTerm, + s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" + + s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" + + s"expressions[$expressionIdx]).getChildren().apply($index))).dataType());") +converterTerm + } + + override def genCode( + ctx: CodeGenContext, + ev: GeneratedExpressionCode): String = { + +ctx.references += this + +val scalaUDFClassName = classOf[ScalaUDF].getName +val converterClassName = classOf[Any => Any].getName +val typeConvertersClassName = CatalystTypeConverters.getClass.getName + ".MODULE$" +val expressionClassName = classOf[Expression].getName + +// Generate codes used to convert the returned value of user-defined functions to Catalyst type +val catalystConverterTerm = ctx.freshName("catalystConverter") +val catalystConverterTermIdx = ctx.references.size - 1 +ctx.addMutableState(converterClassName, catalystConverterTerm, + s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" + +s".createToCatalystConverter((($scalaUDFClassName)expressions" + + s"[$catalystConverterTermIdx]).dataType());") + +val resultTerm = ctx.freshName("result") + +// This must be called before children expressions' codegen +// because ctx.references is used in genCodeForConverter +val converterTerms = (0 until children.size).map(genCodeForConverter(ctx, _)) + +// Initialize user-defined function +val funcClas
spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used
Repository: spark Updated Branches: refs/heads/branch-1.5 b8b1fbfc8 -> dc058f2ff [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`. Author: Shixiong Zhu Closes #9476 from zsxwing/SPARK-11511. (cherry picked from commit cf69ce136590fea51843bc54f44f0f45c7d0ac36) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc058f2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc058f2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc058f2f Branch: refs/heads/branch-1.5 Commit: dc058f2fff6ee378aeaccd756a272a5e44be1c6c Parents: b8b1fbf Author: Shixiong Zhu Authored: Fri Nov 6 14:51:53 2015 + Committer: Sean Owen Committed: Fri Nov 6 14:52:21 2015 + -- .../org/apache/spark/streaming/DStreamGraph.scala | 3 ++- .../spark/streaming/StreamingContextSuite.scala | 16 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 40789c6..8138caa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -169,7 +169,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { * safe remember duration which can be used to perform cleanup operations. */ def getMaxInputStreamRememberDuration(): Duration = { -inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds } +// If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them +inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds) } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/spark/blob/dc058f2f/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c7a8771..860fac2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo "Please don't use queueStream when checkpointing is enabled.")) } + test("Creating an InputDStream but not using it should not crash") { +ssc = new StreamingContext(master, appName, batchDuration) +val input1 = addInputStream(ssc) +val input2 = addInputStream(ssc) +val output = new TestOutputStream(input2) +output.register() +val batchCount = new BatchCounter(ssc) +ssc.start() +// Just wait for completing 2 batches to make sure it triggers +// `DStream.getMaxInputStreamRememberDuration` +batchCount.waitUntilBatchesCompleted(2, 1) +// Throw the exception if crash +ssc.awaitTerminationOrTimeout(1) +ssc.stop() + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used
Repository: spark Updated Branches: refs/heads/branch-1.6 1cfad7d55 -> 0a430f04e [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`. Author: Shixiong Zhu Closes #9476 from zsxwing/SPARK-11511. (cherry picked from commit cf69ce136590fea51843bc54f44f0f45c7d0ac36) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a430f04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a430f04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a430f04 Branch: refs/heads/branch-1.6 Commit: 0a430f04eef3445fb0095adc806d91759eea5d32 Parents: 1cfad7d Author: Shixiong Zhu Authored: Fri Nov 6 14:51:53 2015 + Committer: Sean Owen Committed: Fri Nov 6 14:52:08 2015 + -- .../org/apache/spark/streaming/DStreamGraph.scala | 3 ++- .../spark/streaming/StreamingContextSuite.scala | 16 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a430f04/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 1b0b789..7829f5e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { * safe remember duration which can be used to perform cleanup operations. */ def getMaxInputStreamRememberDuration(): Duration = { -inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds } +// If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them +inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds) } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/spark/blob/0a430f04/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c7a8771..860fac2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo "Please don't use queueStream when checkpointing is enabled.")) } + test("Creating an InputDStream but not using it should not crash") { +ssc = new StreamingContext(master, appName, batchDuration) +val input1 = addInputStream(ssc) +val input2 = addInputStream(ssc) +val output = new TestOutputStream(input2) +output.register() +val batchCount = new BatchCounter(ssc) +ssc.start() +// Just wait for completing 2 batches to make sure it triggers +// `DStream.getMaxInputStreamRememberDuration` +batchCount.waitUntilBatchesCompleted(2, 1) +// Throw the exception if crash +ssc.awaitTerminationOrTimeout(1) +ssc.stop() + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used
Repository: spark Updated Branches: refs/heads/master 253e87e8a -> cf69ce136 [SPARK-11511][STREAMING] Fix NPE when an InputDStream is not used Just ignored `InputDStream`s that have null `rememberDuration` in `DStreamGraph.getMaxInputStreamRememberDuration`. Author: Shixiong Zhu Closes #9476 from zsxwing/SPARK-11511. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf69ce13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf69ce13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf69ce13 Branch: refs/heads/master Commit: cf69ce136590fea51843bc54f44f0f45c7d0ac36 Parents: 253e87e Author: Shixiong Zhu Authored: Fri Nov 6 14:51:53 2015 + Committer: Sean Owen Committed: Fri Nov 6 14:51:53 2015 + -- .../org/apache/spark/streaming/DStreamGraph.scala | 3 ++- .../spark/streaming/StreamingContextSuite.scala | 16 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf69ce13/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 1b0b789..7829f5e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -167,7 +167,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { * safe remember duration which can be used to perform cleanup operations. */ def getMaxInputStreamRememberDuration(): Duration = { -inputStreams.map { _.rememberDuration }.maxBy { _.milliseconds } +// If an InputDStream is not used, its `rememberDuration` will be null and we can ignore them +inputStreams.map(_.rememberDuration).filter(_ != null).maxBy(_.milliseconds) } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/spark/blob/cf69ce13/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index c7a8771..860fac2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -780,6 +780,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo "Please don't use queueStream when checkpointing is enabled.")) } + test("Creating an InputDStream but not using it should not crash") { +ssc = new StreamingContext(master, appName, batchDuration) +val input1 = addInputStream(ssc) +val input2 = addInputStream(ssc) +val output = new TestOutputStream(input2) +output.register() +val batchCount = new BatchCounter(ssc) +ssc.start() +// Just wait for completing 2 batches to make sure it triggers +// `DStream.getMaxInputStreamRememberDuration` +batchCount.waitUntilBatchesCompleted(2, 1) +// Throw the exception if crash +ssc.awaitTerminationOrTimeout(1) +ssc.stop() + } + def addInputStream(s: StreamingContext): DStream[Int] = { val input = (1 to 100).map(i => 1 to i) val inputStream = new TestInputStream(s, input, 1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit
Repository: spark Updated Branches: refs/heads/master bc5d6c038 -> 253e87e8a [SPARK-11453][SQL][FOLLOW-UP] remove DecimalLit A cleanup for https://github.com/apache/spark/pull/9085. The `DecimalLit` is very similar to `FloatLit`, we can just keep one of them. Also added low level unit test at `SqlParserSuite` Author: Wenchen Fan Closes #9482 from cloud-fan/parser. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/253e87e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/253e87e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/253e87e8 Branch: refs/heads/master Commit: 253e87e8ab8717ffef40a6d0d376b1add155ef90 Parents: bc5d6c0 Author: Wenchen Fan Authored: Fri Nov 6 06:38:49 2015 -0800 Committer: Reynold Xin Committed: Fri Nov 6 06:38:49 2015 -0800 -- .../sql/catalyst/AbstractSparkSQLParser.scala | 23 +--- .../apache/spark/sql/catalyst/SqlParser.scala | 20 - .../spark/sql/catalyst/SqlParserSuite.scala | 21 ++ 3 files changed, 35 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/253e87e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 04ac4f2..bdc52c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -78,10 +78,6 @@ private[sql] abstract class AbstractSparkSQLParser } class SqlLexical extends StdLexical { - case class FloatLit(chars: String) extends Token { -override def toString: String = chars - } - case class DecimalLit(chars: String) extends Token { override def toString: String = chars } @@ -106,17 +102,16 @@ class SqlLexical extends StdLexical { } override lazy val token: Parser[Token] = -( rep1(digit) ~ ('.' ~> digit.*).? ~ (exp ~> sign.? ~ rep1(digit)) ^^ { -case i ~ None ~ (sig ~ rest) => - DecimalLit(i.mkString + "e" + sig.mkString + rest.mkString) -case i ~ Some(d) ~ (sig ~ rest) => - DecimalLit(i.mkString + "." + d.mkString + "e" + sig.mkString + rest.mkString) - } +( rep1(digit) ~ scientificNotation ^^ { case i ~ s => DecimalLit(i.mkString + s) } +| '.' ~> (rep1(digit) ~ scientificNotation) ^^ + { case i ~ s => DecimalLit("0." + i.mkString + s) } +| rep1(digit) ~ ('.' ~> digit.*) ~ scientificNotation ^^ + { case i1 ~ i2 ~ s => DecimalLit(i1.mkString + "." + i2.mkString + s) } | digit.* ~ identChar ~ (identChar | digit).* ^^ { case first ~ middle ~ rest => processIdent((first ++ (middle :: rest)).mkString) } | rep1(digit) ~ ('.' ~> digit.*).? ^^ { case i ~ None => NumericLit(i.mkString) -case i ~ Some(d) => FloatLit(i.mkString + "." + d.mkString) +case i ~ Some(d) => DecimalLit(i.mkString + "." + d.mkString) } | '\'' ~> chrExcept('\'', '\n', EofCh).* <~ '\'' ^^ { case chars => StringLit(chars mkString "") } @@ -133,8 +128,10 @@ class SqlLexical extends StdLexical { override def identChar: Parser[Elem] = letter | elem('_') - private lazy val sign: Parser[Elem] = elem("s", c => c == '+' || c == '-') - private lazy val exp: Parser[Elem] = elem("e", c => c == 'E' || c == 'e') + private lazy val scientificNotation: Parser[String] = +(elem('e') | elem('E')) ~> (elem('+') | elem('-')).? ~ rep1(digit) ^^ { + case s ~ rest => "e" + s.mkString + rest.mkString +} override def whitespace: Parser[Any] = ( whitespaceChar http://git-wip-us.apache.org/repos/asf/spark/blob/253e87e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 440e9e2..cd717c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -334,27 +334,15 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser { protected lazy val numericLiteral: Parser[Literal] = ( integral ^^ { case i => Literal(toNarrowestIntegerType(i)) } -| sign.? ~ unsignedFloat ^^ { - case s ~ f => Literal(toDecimalOrDouble(s.getOrElse("") + f)) -} -| sign.? ~ unsignedDecimal ^^ { - case s ~ d =>