spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton
Repository: spark Updated Branches: refs/heads/master 8694c3ad7 -> f0f563a3c [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton * do not cache first cost RDD * change following cost RDD cache level to MEMORY_AND_DISK * remove Vector wrapper to save a object per instance Further improvements will be addressed in SPARK-10329 cc: yu-iskw HuJiayin Author: Xiangrui MengCloses #8526 from mengxr/SPARK-10354. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0f563a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0f563a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0f563a3 Branch: refs/heads/master Commit: f0f563a3c43fc9683e6920890cce44611c0c5f4b Parents: 8694c3a Author: Xiangrui Meng Authored: Sun Aug 30 23:20:03 2015 -0700 Committer: Xiangrui Meng Committed: Sun Aug 30 23:20:03 2015 -0700 -- .../apache/spark/mllib/clustering/KMeans.scala | 21 +--- 1 file changed, 14 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f0f563a3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 46920ff..7168aac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -369,7 +369,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) -var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() +var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -394,21 +394,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => -Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } +}.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs -.aggregate(Vectors.zeros(runs))( +.aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v -axpy(1.0, v, s) +var r = 0 +while (r < runs) { + s(r) += v(r) + r += 1 +} s }, combOp = (s0, s1) => { // s0 += s1 -axpy(1.0, s1, s0) +var r = 0 +while (r < runs) { + s0(r) += s1(r) + r += 1 +} s0 } ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton
Repository: spark Updated Branches: refs/heads/branch-1.4 d6fd80570 -> 15beccb71 [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton * do not cache first cost RDD * change following cost RDD cache level to MEMORY_AND_DISK * remove Vector wrapper to save a object per instance Further improvements will be addressed in SPARK-10329 cc: yu-iskw HuJiayin Author: Xiangrui MengCloses #8526 from mengxr/SPARK-10354. (cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15beccb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15beccb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15beccb7 Branch: refs/heads/branch-1.4 Commit: 15beccb716a3d4bb533ecf3e81e26e757fb0f844 Parents: d6fd805 Author: Xiangrui Meng Authored: Sun Aug 30 23:20:03 2015 -0700 Committer: Xiangrui Meng Committed: Sun Aug 30 23:20:30 2015 -0700 -- .../apache/spark/mllib/clustering/KMeans.scala | 21 +--- 1 file changed, 14 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/15beccb7/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0f8d6a3..555b98d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -318,7 +318,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) -var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() +var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -343,21 +343,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => -Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } +}.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs -.aggregate(Vectors.zeros(runs))( +.aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v -axpy(1.0, v, s) +var r = 0 +while (r < runs) { + s(r) += v(r) + r += 1 +} s }, combOp = (s0, s1) => { // s0 += s1 -axpy(1.0, s1, s0) +var r = 0 +while (r < runs) { + s0(r) += s1(r) + r += 1 +} s0 } ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton
Repository: spark Updated Branches: refs/heads/branch-1.3 e8b0564e7 -> a58c1afe8 [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton * do not cache first cost RDD * change following cost RDD cache level to MEMORY_AND_DISK * remove Vector wrapper to save a object per instance Further improvements will be addressed in SPARK-10329 cc: yu-iskw HuJiayin Author: Xiangrui MengCloses #8526 from mengxr/SPARK-10354. (cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a58c1afe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a58c1afe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a58c1afe Branch: refs/heads/branch-1.3 Commit: a58c1afe8fcf53537c18a53591343ff33461 Parents: e8b0564 Author: Xiangrui Meng Authored: Sun Aug 30 23:20:03 2015 -0700 Committer: Xiangrui Meng Committed: Sun Aug 30 23:21:09 2015 -0700 -- .../apache/spark/mllib/clustering/KMeans.scala | 21 +--- 1 file changed, 14 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a58c1afe/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 11633e8..21bf3cd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -281,7 +281,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) -var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() +var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -306,21 +306,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => -Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } +}.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs -.aggregate(Vectors.zeros(runs))( +.aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v -axpy(1.0, v, s) +var r = 0 +while (r < runs) { + s(r) += v(r) + r += 1 +} s }, combOp = (s0, s1) => { // s0 += s1 -axpy(1.0, s1, s0) +var r = 0 +while (r < runs) { + s0(r) += s1(r) + r += 1 +} s0 } ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8730] Fixes - Deser objects containing a primitive class attribute
Repository: spark Updated Branches: refs/heads/master f0f563a3c -> 72f6dbf7b [SPARK-8730] Fixes - Deser objects containing a primitive class attribute Author: EugenCepoiCloses #7122 from EugenCepoi/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72f6dbf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72f6dbf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72f6dbf7 Branch: refs/heads/master Commit: 72f6dbf7b0c8b271f5f9c762374422c69c8ab43d Parents: f0f563a Author: EugenCepoi Authored: Mon Aug 31 13:24:35 2015 -0500 Committer: Imran Rashid Committed: Mon Aug 31 13:24:35 2015 -0500 -- .../spark/serializer/JavaSerializer.scala | 27 .../spark/serializer/JavaSerializerSuite.scala | 18 + 2 files changed, 40 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 4a5274b..b463a71 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -62,17 +62,34 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa extends DeserializationStream { private val objIn = new ObjectInputStream(in) { -override def resolveClass(desc: ObjectStreamClass): Class[_] = { - // scalastyle:off classforname - Class.forName(desc.getName, false, loader) - // scalastyle:on classforname -} +override def resolveClass(desc: ObjectStreamClass): Class[_] = + try { +// scalastyle:off classforname +Class.forName(desc.getName, false, loader) +// scalastyle:on classforname + } catch { +case e: ClassNotFoundException => + JavaDeserializationStream.primitiveMappings.get(desc.getName).getOrElse(throw e) + } } def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T] def close() { objIn.close() } } +private object JavaDeserializationStream { + val primitiveMappings = Map[String, Class[_]]( +"boolean" -> classOf[Boolean], +"byte" -> classOf[Byte], +"char" -> classOf[Char], +"short" -> classOf[Short], +"int" -> classOf[Int], +"long" -> classOf[Long], +"float" -> classOf[Float], +"double" -> classOf[Double], +"void" -> classOf[Void] + ) +} private[spark] class JavaSerializerInstance( counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader) http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala index 329a2b6..20f4567 100644 --- a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala @@ -25,4 +25,22 @@ class JavaSerializerSuite extends SparkFunSuite { val instance = serializer.newInstance() instance.deserialize[JavaSerializer](instance.serialize(serializer)) } + + test("Deserialize object containing a primitive Class as attribute") { +val serializer = new JavaSerializer(new SparkConf()) +val instance = serializer.newInstance() +instance.deserialize[JavaSerializer](instance.serialize(new ContainsPrimitiveClass())) + } +} + +private class ContainsPrimitiveClass extends Serializable { + val intClass = classOf[Int] + val longClass = classOf[Long] + val shortClass = classOf[Short] + val charClass = classOf[Char] + val doubleClass = classOf[Double] + val floatClass = classOf[Float] + val booleanClass = classOf[Boolean] + val byteClass = classOf[Byte] + val voidClass = classOf[Void] } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
Repository: spark Updated Branches: refs/heads/master 72f6dbf7b -> 4a5fe0916 [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later `deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwingCloses #8538 from zsxwing/SPARK-10369. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a5fe091 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a5fe091 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a5fe091 Branch: refs/heads/master Commit: 4a5fe091658b1d06f427e404a11a84fc84f953c5 Parents: 72f6dbf Author: zsxwing Authored: Mon Aug 31 12:19:11 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 31 12:19:11 2015 -0700 -- .../streaming/scheduler/ReceiverTracker.scala | 4 +- .../scheduler/ReceiverTrackerSuite.scala| 51 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d532a6..f86fd44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } -receiverTrackingInfos -= streamId +receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" @@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false context.reply(true) // Local messages case AllReceiverIds => -context.reply(receiverTrackingInfos.keys.toSeq) +context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index dd292ba..45138b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("should restart receiver after stopping it") { +withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + @volatile var startTimes = 0 + ssc.addStreamingListener(new StreamingListener { +override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + startTimes += 1 +} + }) + val input = ssc.receiverStream(new StoppableReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + StoppableReceiver.shouldStop = true + eventually(timeout(10 seconds), interval(10 millis)) { +// The receiver is stopped once, so if it's restarted, it should be started twice. +assert(startTimes === 2) + } +} + } } /** An input DStream with for testing rate controlling */ @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver { def getActive(): Option[RateTestReceiver] = Option(activeReceiver) } + +/** + * A custom receiver that could be stopped via StoppableReceiver.shouldStop + */ +class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { +val thread = new Thread() { + override def run() { +while (!StoppableReceiver.shouldStop) { + Thread.sleep(10) +} +
spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
Repository: spark Updated Branches: refs/heads/branch-1.5 bf5b2f26b -> 33ce274cd [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later `deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwingCloses #8538 from zsxwing/SPARK-10369. (cherry picked from commit 4a5fe091658b1d06f427e404a11a84fc84f953c5) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ce274c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ce274c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ce274c Branch: refs/heads/branch-1.5 Commit: 33ce274cdf7538b5816f1a400b2fad394ec2a056 Parents: bf5b2f2 Author: zsxwing Authored: Mon Aug 31 12:19:11 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 31 12:19:48 2015 -0700 -- .../streaming/scheduler/ReceiverTracker.scala | 4 +- .../scheduler/ReceiverTrackerSuite.scala| 51 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d532a6..f86fd44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } -receiverTrackingInfos -= streamId +receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" @@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false context.reply(true) // Local messages case AllReceiverIds => -context.reply(receiverTrackingInfos.keys.toSeq) +context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index dd292ba..45138b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("should restart receiver after stopping it") { +withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + @volatile var startTimes = 0 + ssc.addStreamingListener(new StreamingListener { +override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + startTimes += 1 +} + }) + val input = ssc.receiverStream(new StoppableReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + StoppableReceiver.shouldStop = true + eventually(timeout(10 seconds), interval(10 millis)) { +// The receiver is stopped once, so if it's restarted, it should be started twice. +assert(startTimes === 2) + } +} + } } /** An input DStream with for testing rate controlling */ @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver { def getActive(): Option[RateTestReceiver] = Option(activeReceiver) } + +/** + * A custom receiver that could be stopped via StoppableReceiver.shouldStop + */ +class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { +val thread =
spark git commit: [SPARK-10170] [SQL] Add DB2 JDBC dialect support.
Repository: spark Updated Branches: refs/heads/master 4a5fe0916 -> a2d5c7209 [SPARK-10170] [SQL] Add DB2 JDBC dialect support. Data frame write to DB2 database is failing because by default JDBC data source implementation is generating a table schema with DB2 unsupported data types TEXT for String, and BIT1(1) for Boolean. This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 data types. Author: sureshthalamatiCloses #8393 from sureshthalamati/db2_dialect_spark-10170. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d5c720 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d5c720 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d5c720 Branch: refs/heads/master Commit: a2d5c72091b1c602694dbca823a7b26f86b02864 Parents: 4a5fe09 Author: sureshthalamati Authored: Mon Aug 31 12:39:58 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 12:39:58 2015 -0700 -- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 18 ++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++ 2 files changed, 25 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 8849fc2..c6d05c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -125,6 +125,7 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) + registerDialect(DB2Dialect) /** * Fetch the JdbcDialect class corresponding to a given database url. @@ -222,3 +223,20 @@ case object MySQLDialect extends JdbcDialect { s"`$colName`" } } + +/** + * :: DeveloperApi :: + * Default DB2 dialect, mapping string/boolean on write to valid DB2 types. + * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1). + */ +@DeveloperApi +case object DB2Dialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { +case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) +case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) +case _ => None + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edac08..d8c9a08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,6 +407,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) +assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } @@ -443,4 +444,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) } + + test("DB2Dialect type mapping") { +val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") + assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode
Repository: spark Updated Branches: refs/heads/master a2d5c7209 -> 23e39cc7b [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode This could help reduce hash collisions, e.g., in `RDD[Vector].repartition`. jkbradley Author: Xiangrui MengCloses #8182 from mengxr/SPARK-9954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23e39cc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23e39cc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23e39cc7 Branch: refs/heads/master Commit: 23e39cc7b1bb7f1087c4706234c9b5165a571357 Parents: a2d5c72 Author: Xiangrui Meng Authored: Mon Aug 31 15:49:25 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 15:49:25 2015 -0700 -- .../org/apache/spark/mllib/linalg/Vectors.scala | 38 +++- 1 file changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23e39cc7/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 06ebb15..3642e92 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -71,20 +71,22 @@ sealed trait Vector extends Serializable { } /** - * Returns a hash code value for the vector. The hash code is based on its size and its nonzeros - * in the first 16 entries, using a hash algorithm similar to [[java.util.Arrays.hashCode]]. + * Returns a hash code value for the vector. The hash code is based on its size and its first 128 + * nonzero entries, using a hash algorithm similar to [[java.util.Arrays.hashCode]]. */ override def hashCode(): Int = { // This is a reference implementation. It calls return in foreachActive, which is slow. // Subclasses should override it with optimized implementation. var result: Int = 31 + size +var nnz = 0 this.foreachActive { (index, value) => - if (index < 16) { + if (nnz < Vectors.MAX_HASH_NNZ) { // ignore explicit 0 for comparison between sparse and dense if (value != 0) { result = 31 * result + index val bits = java.lang.Double.doubleToLongBits(value) result = 31 * result + (bits ^ (bits >>> 32)).toInt + nnz += 1 } } else { return result @@ -536,6 +538,9 @@ object Vectors { } allEqual } + + /** Max number of nonzero entries used in computing hash code. */ + private[linalg] val MAX_HASH_NNZ = 128 } /** @@ -578,13 +583,15 @@ class DenseVector @Since("1.0.0") ( override def hashCode(): Int = { var result: Int = 31 + size var i = 0 -val end = math.min(values.length, 16) -while (i < end) { +val end = values.length +var nnz = 0 +while (i < end && nnz < Vectors.MAX_HASH_NNZ) { val v = values(i) if (v != 0.0) { result = 31 * result + i val bits = java.lang.Double.doubleToLongBits(values(i)) result = 31 * result + (bits ^ (bits >>> 32)).toInt +nnz += 1 } i += 1 } @@ -707,19 +714,16 @@ class SparseVector @Since("1.0.0") ( override def hashCode(): Int = { var result: Int = 31 + size val end = values.length -var continue = true var k = 0 -while ((k < end) & continue) { - val i = indices(k) - if (i < 16) { -val v = values(k) -if (v != 0.0) { - result = 31 * result + i - val bits = java.lang.Double.doubleToLongBits(v) - result = 31 * result + (bits ^ (bits >>> 32)).toInt -} - } else { -continue = false +var nnz = 0 +while (k < end && nnz < Vectors.MAX_HASH_NNZ) { + val v = values(k) + if (v != 0.0) { +val i = indices(k) +result = 31 * result + i +val bits = java.lang.Double.doubleToLongBits(v) +result = 31 * result + (bits ^ (bits >>> 32)).toInt +nnz += 1 } k += 1 } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.5.0-rc3
Repository: spark Updated Branches: refs/heads/branch-1.5 1c752b8b5 -> 2b270a166 Preparing Spark release v1.5.0-rc3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/908e37bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/908e37bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/908e37bc Branch: refs/heads/branch-1.5 Commit: 908e37bcc10132bb2aa7f80ae694a9df6e40f31a Parents: 1c752b8 Author: Patrick WendellAuthored: Mon Aug 31 15:57:42 2015 -0700 Committer: Patrick Wendell Committed: Mon Aug 31 15:57:42 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 7b41ebb..3ef7d6f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 16bf17c..684e07b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index beb547f..6b082ad 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 3926b79..9ef1eda 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 5eda12d..aa7021d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 33f2cd7..7d72f78 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 670c783..38683e3
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-rc3 [created] 908e37bcc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 1.5.1-SNAPSHOT
Preparing development version 1.5.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b270a16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b270a16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b270a16 Branch: refs/heads/branch-1.5 Commit: 2b270a166d6bd5b42399400924c576c9996bfc10 Parents: 908e37b Author: Patrick WendellAuthored: Mon Aug 31 15:57:49 2015 -0700 Committer: Patrick Wendell Committed: Mon Aug 31 15:57:49 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 3ef7d6f..7b41ebb 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 684e07b..16bf17c 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 6b082ad..beb547f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9ef1eda..3926b79 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index aa7021d..5eda12d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 7d72f78..33f2cd7 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 38683e3..670c783 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7
spark git commit: [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer
Repository: spark Updated Branches: refs/heads/master fe16fd0b8 -> 52ea399e6 [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer Add Python API for SQLTransformer Author: Yanbo LiangCloses #8527 from yanboliang/spark-10355. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ea399e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ea399e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ea399e Branch: refs/heads/master Commit: 52ea399e6ee37b7c44aae7709863e006fca88906 Parents: fe16fd0 Author: Yanbo Liang Authored: Mon Aug 31 16:11:27 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 16:11:27 2015 -0700 -- python/pyspark/ml/feature.py | 57 --- 1 file changed, 54 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52ea399e/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 59300a6..0626281 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -28,9 +28,9 @@ from pyspark.mllib.linalg import _convert_to_vector __all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 'HashingTF', 'IDF', 'IDFModel', 'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 'RegexTokenizer', - 'StandardScaler', 'StandardScalerModel', 'StringIndexer', 'StringIndexerModel', - 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 'Word2VecModel', - 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel'] + 'SQLTransformer', 'StandardScaler', 'StandardScalerModel', 'StringIndexer', + 'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', + 'Word2VecModel', 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel'] @inherit_doc @@ -744,6 +744,57 @@ class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol): @inherit_doc +class SQLTransformer(JavaTransformer): +""" +Implements the transforms which are defined by SQL statement. +Currently we only support SQL syntax like 'SELECT ... FROM __THIS__' +where '__THIS__' represents the underlying table of the input dataset. + +>>> df = sqlContext.createDataFrame([(0, 1.0, 3.0), (2, 2.0, 5.0)], ["id", "v1", "v2"]) +>>> sqlTrans = SQLTransformer( +... statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") +>>> sqlTrans.transform(df).head() +Row(id=0, v1=1.0, v2=3.0, v3=4.0, v4=3.0) +""" + +# a placeholder to make it appear in the generated doc +statement = Param(Params._dummy(), "statement", "SQL statement") + +@keyword_only +def __init__(self, statement=None): +""" +__init__(self, statement=None) +""" +super(SQLTransformer, self).__init__() +self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.SQLTransformer", self.uid) +self.statement = Param(self, "statement", "SQL statement") +kwargs = self.__init__._input_kwargs +self.setParams(**kwargs) + +@keyword_only +def setParams(self, statement=None): +""" +setParams(self, statement=None) +Sets params for this SQLTransformer. +""" +kwargs = self.setParams._input_kwargs +return self._set(**kwargs) + +def setStatement(self, value): +""" +Sets the value of :py:attr:`statement`. +""" +self._paramMap[self.statement] = value +return self + +def getStatement(self): +""" +Gets the value of statement or its default value. +""" +return self.getOrDefault(self.statement) + + +@inherit_doc class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol): """ Standardizes features by removing the mean and scaling to unit variance using column summary - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.
Repository: spark Updated Branches: refs/heads/master 52ea399e6 -> d65656c45 [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite. They don't bring much value since we now have better unit test coverage for hash joins. This will also help reduce the test time. Author: Reynold XinCloses #8542 from rxin/SPARK-10378. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d65656c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d65656c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d65656c4 Branch: refs/heads/master Commit: d65656c455d19b83c6412571873586b458aa355e Parents: 52ea399 Author: Reynold Xin Authored: Mon Aug 31 18:09:24 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 18:09:24 2015 -0700 -- .../execution/HashJoinCompatibilitySuite.scala | 169 --- 1 file changed, 169 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d65656c4/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala deleted file mode 100644 index 1a5ba20..000 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.io.File - -import org.apache.spark.sql.SQLConf -import org.apache.spark.sql.hive.test.TestHive - -/** - * Runs the test cases that are included in the hive distribution with hash joins. - */ -class HashJoinCompatibilitySuite extends HiveCompatibilitySuite { - override def beforeAll() { -super.beforeAll() -TestHive.setConf(SQLConf.SORTMERGE_JOIN, false) - } - - override def afterAll() { -TestHive.setConf(SQLConf.SORTMERGE_JOIN, true) -super.afterAll() - } - - override def whiteList = Seq( -"auto_join0", -"auto_join1", -"auto_join10", -"auto_join11", -"auto_join12", -"auto_join13", -"auto_join14", -"auto_join14_hadoop20", -"auto_join15", -"auto_join17", -"auto_join18", -"auto_join19", -"auto_join2", -"auto_join20", -"auto_join21", -"auto_join22", -"auto_join23", -"auto_join24", -"auto_join25", -"auto_join26", -"auto_join27", -"auto_join28", -"auto_join3", -"auto_join30", -"auto_join31", -"auto_join32", -"auto_join4", -"auto_join5", -"auto_join6", -"auto_join7", -"auto_join8", -"auto_join9", -"auto_join_filters", -"auto_join_nulls", -"auto_join_reordering_values", -"auto_smb_mapjoin_14", -"auto_sortmerge_join_1", -"auto_sortmerge_join_10", -"auto_sortmerge_join_11", -"auto_sortmerge_join_12", -"auto_sortmerge_join_13", -"auto_sortmerge_join_14", -"auto_sortmerge_join_15", -"auto_sortmerge_join_16", -"auto_sortmerge_join_2", -"auto_sortmerge_join_3", -"auto_sortmerge_join_4", -"auto_sortmerge_join_5", -"auto_sortmerge_join_6", -"auto_sortmerge_join_7", -"auto_sortmerge_join_8", -"auto_sortmerge_join_9", -"correlationoptimizer1", -"correlationoptimizer10", -"correlationoptimizer11", -"correlationoptimizer13", -"correlationoptimizer14", -"correlationoptimizer15", -"correlationoptimizer2", -"correlationoptimizer3", -"correlationoptimizer4", -"correlationoptimizer6", -"correlationoptimizer7", -"correlationoptimizer8", -"correlationoptimizer9", -"join0", -"join1", -"join10", -"join11", -"join12", -"join13", -"join14", -"join14_hadoop20", -"join15", -"join16", -"join17", -