spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 8694c3ad7 -> f0f563a3c


[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| 
initializaiton

* do not cache first cost RDD
* change following cost RDD cache level to MEMORY_AND_DISK
* remove Vector wrapper to save a object per instance

Further improvements will be addressed in SPARK-10329

cc: yu-iskw HuJiayin

Author: Xiangrui Meng 

Closes #8526 from mengxr/SPARK-10354.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0f563a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0f563a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0f563a3

Branch: refs/heads/master
Commit: f0f563a3c43fc9683e6920890cce44611c0c5f4b
Parents: 8694c3a
Author: Xiangrui Meng 
Authored: Sun Aug 30 23:20:03 2015 -0700
Committer: Xiangrui Meng 
Committed: Sun Aug 30 23:20:03 2015 -0700

--
 .../apache/spark/mllib/clustering/KMeans.scala  | 21 +---
 1 file changed, 14 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f0f563a3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 46920ff..7168aac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -369,7 +369,7 @@ class KMeans private (
   : Array[Array[VectorWithNorm]] = {
 // Initialize empty centers and point costs.
 val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
-var costs = data.map(_ => 
Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache()
+var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
 
 // Initialize each run's first center to a random point.
 val seed = new XORShiftRandom(this.seed).nextInt()
@@ -394,21 +394,28 @@ class KMeans private (
   val bcNewCenters = data.context.broadcast(newCenters)
   val preCosts = costs
   costs = data.zip(preCosts).map { case (point, cost) =>
-Vectors.dense(
   Array.tabulate(runs) { r =>
 math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
-  })
-  }.cache()
+  }
+}.persist(StorageLevel.MEMORY_AND_DISK)
   val sumCosts = costs
-.aggregate(Vectors.zeros(runs))(
+.aggregate(new Array[Double](runs))(
   seqOp = (s, v) => {
 // s += v
-axpy(1.0, v, s)
+var r = 0
+while (r < runs) {
+  s(r) += v(r)
+  r += 1
+}
 s
   },
   combOp = (s0, s1) => {
 // s0 += s1
-axpy(1.0, s1, s0)
+var r = 0
+while (r < runs) {
+  s0(r) += s1(r)
+  r += 1
+}
 s0
   }
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 d6fd80570 -> 15beccb71


[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| 
initializaiton

* do not cache first cost RDD
* change following cost RDD cache level to MEMORY_AND_DISK
* remove Vector wrapper to save a object per instance

Further improvements will be addressed in SPARK-10329

cc: yu-iskw HuJiayin

Author: Xiangrui Meng 

Closes #8526 from mengxr/SPARK-10354.

(cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15beccb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15beccb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15beccb7

Branch: refs/heads/branch-1.4
Commit: 15beccb716a3d4bb533ecf3e81e26e757fb0f844
Parents: d6fd805
Author: Xiangrui Meng 
Authored: Sun Aug 30 23:20:03 2015 -0700
Committer: Xiangrui Meng 
Committed: Sun Aug 30 23:20:30 2015 -0700

--
 .../apache/spark/mllib/clustering/KMeans.scala  | 21 +---
 1 file changed, 14 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/15beccb7/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 0f8d6a3..555b98d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -318,7 +318,7 @@ class KMeans private (
   : Array[Array[VectorWithNorm]] = {
 // Initialize empty centers and point costs.
 val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
-var costs = data.map(_ => 
Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache()
+var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
 
 // Initialize each run's first center to a random point.
 val seed = new XORShiftRandom(this.seed).nextInt()
@@ -343,21 +343,28 @@ class KMeans private (
   val bcNewCenters = data.context.broadcast(newCenters)
   val preCosts = costs
   costs = data.zip(preCosts).map { case (point, cost) =>
-Vectors.dense(
   Array.tabulate(runs) { r =>
 math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
-  })
-  }.cache()
+  }
+}.persist(StorageLevel.MEMORY_AND_DISK)
   val sumCosts = costs
-.aggregate(Vectors.zeros(runs))(
+.aggregate(new Array[Double](runs))(
   seqOp = (s, v) => {
 // s += v
-axpy(1.0, v, s)
+var r = 0
+while (r < runs) {
+  s(r) += v(r)
+  r += 1
+}
 s
   },
   combOp = (s0, s1) => {
 // s0 += s1
-axpy(1.0, s1, s0)
+var r = 0
+while (r < runs) {
+  s0(r) += s1(r)
+  r += 1
+}
 s0
   }
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e8b0564e7 -> a58c1afe8


[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| 
initializaiton

* do not cache first cost RDD
* change following cost RDD cache level to MEMORY_AND_DISK
* remove Vector wrapper to save a object per instance

Further improvements will be addressed in SPARK-10329

cc: yu-iskw HuJiayin

Author: Xiangrui Meng 

Closes #8526 from mengxr/SPARK-10354.

(cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a58c1afe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a58c1afe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a58c1afe

Branch: refs/heads/branch-1.3
Commit: a58c1afe8fcf53537c18a53591343ff33461
Parents: e8b0564
Author: Xiangrui Meng 
Authored: Sun Aug 30 23:20:03 2015 -0700
Committer: Xiangrui Meng 
Committed: Sun Aug 30 23:21:09 2015 -0700

--
 .../apache/spark/mllib/clustering/KMeans.scala  | 21 +---
 1 file changed, 14 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a58c1afe/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 11633e8..21bf3cd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -281,7 +281,7 @@ class KMeans private (
   : Array[Array[VectorWithNorm]] = {
 // Initialize empty centers and point costs.
 val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
-var costs = data.map(_ => 
Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache()
+var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
 
 // Initialize each run's first center to a random point.
 val seed = new XORShiftRandom(this.seed).nextInt()
@@ -306,21 +306,28 @@ class KMeans private (
   val bcNewCenters = data.context.broadcast(newCenters)
   val preCosts = costs
   costs = data.zip(preCosts).map { case (point, cost) =>
-Vectors.dense(
   Array.tabulate(runs) { r =>
 math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
-  })
-  }.cache()
+  }
+}.persist(StorageLevel.MEMORY_AND_DISK)
   val sumCosts = costs
-.aggregate(Vectors.zeros(runs))(
+.aggregate(new Array[Double](runs))(
   seqOp = (s, v) => {
 // s += v
-axpy(1.0, v, s)
+var r = 0
+while (r < runs) {
+  s(r) += v(r)
+  r += 1
+}
 s
   },
   combOp = (s0, s1) => {
 // s0 += s1
-axpy(1.0, s1, s0)
+var r = 0
+while (r < runs) {
+  s0(r) += s1(r)
+  r += 1
+}
 s0
   }
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8730] Fixes - Deser objects containing a primitive class attribute

2015-08-31 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master f0f563a3c -> 72f6dbf7b


[SPARK-8730] Fixes - Deser objects containing a primitive class attribute

Author: EugenCepoi 

Closes #7122 from EugenCepoi/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72f6dbf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72f6dbf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72f6dbf7

Branch: refs/heads/master
Commit: 72f6dbf7b0c8b271f5f9c762374422c69c8ab43d
Parents: f0f563a
Author: EugenCepoi 
Authored: Mon Aug 31 13:24:35 2015 -0500
Committer: Imran Rashid 
Committed: Mon Aug 31 13:24:35 2015 -0500

--
 .../spark/serializer/JavaSerializer.scala   | 27 
 .../spark/serializer/JavaSerializerSuite.scala  | 18 +
 2 files changed, 40 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 4a5274b..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -62,17 +62,34 @@ private[spark] class JavaDeserializationStream(in: 
InputStream, loader: ClassLoa
   extends DeserializationStream {
 
   private val objIn = new ObjectInputStream(in) {
-override def resolveClass(desc: ObjectStreamClass): Class[_] = {
-  // scalastyle:off classforname
-  Class.forName(desc.getName, false, loader)
-  // scalastyle:on classforname
-}
+override def resolveClass(desc: ObjectStreamClass): Class[_] =
+  try {
+// scalastyle:off classforname
+Class.forName(desc.getName, false, loader)
+// scalastyle:on classforname
+  } catch {
+case e: ClassNotFoundException =>
+  
JavaDeserializationStream.primitiveMappings.get(desc.getName).getOrElse(throw e)
+  }
   }
 
   def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T]
   def close() { objIn.close() }
 }
 
+private object JavaDeserializationStream {
+  val primitiveMappings = Map[String, Class[_]](
+"boolean" -> classOf[Boolean],
+"byte" -> classOf[Byte],
+"char" -> classOf[Char],
+"short" -> classOf[Short],
+"int" -> classOf[Int],
+"long" -> classOf[Long],
+"float" -> classOf[Float],
+"double" -> classOf[Double],
+"void" -> classOf[Void]
+  )
+}
 
 private[spark] class JavaSerializerInstance(
 counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: 
ClassLoader)

http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala 
b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
index 329a2b6..20f4567 100644
--- a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala
@@ -25,4 +25,22 @@ class JavaSerializerSuite extends SparkFunSuite {
 val instance = serializer.newInstance()
 instance.deserialize[JavaSerializer](instance.serialize(serializer))
   }
+
+  test("Deserialize object containing a primitive Class as attribute") {
+val serializer = new JavaSerializer(new SparkConf())
+val instance = serializer.newInstance()
+instance.deserialize[JavaSerializer](instance.serialize(new 
ContainsPrimitiveClass()))
+  }
+}
+
+private class ContainsPrimitiveClass extends Serializable {
+  val intClass = classOf[Int]
+  val longClass = classOf[Long]
+  val shortClass = classOf[Short]
+  val charClass = classOf[Char]
+  val doubleClass = classOf[Double]
+  val floatClass = classOf[Float]
+  val booleanClass = classOf[Boolean]
+  val byteClass = classOf[Byte]
+  val voidClass = classOf[Void]
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later

2015-08-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 72f6dbf7b -> 4a5fe0916


[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when 
deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it 
will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing 

Closes #8538 from zsxwing/SPARK-10369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a5fe091
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a5fe091
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a5fe091

Branch: refs/heads/master
Commit: 4a5fe091658b1d06f427e404a11a84fc84f953c5
Parents: 72f6dbf
Author: zsxwing 
Authored: Mon Aug 31 12:19:11 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 31 12:19:11 2015 -0700

--
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +-
 .../scheduler/ReceiverTrackerSuite.scala| 51 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a6..f86fd44 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 ReceiverTrackingInfo(
   streamId, ReceiverState.INACTIVE, None, None, None, None, 
Some(errorInfo))
 }
-receiverTrackingInfos -= streamId
+receiverTrackingInfos(streamId) = newReceiverTrackingInfo
 
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
 val messageWithError = if (error != null && !error.isEmpty) {
   s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 context.reply(true)
   // Local messages
   case AllReceiverIds =>
-context.reply(receiverTrackingInfos.keys.toSeq)
+context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
   case StopAllReceivers =>
 assert(isTrackerStopping || isTrackerStopped)
 stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba..45138b7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
   }
 }
   }
+
+  test("should restart receiver after stopping it") {
+withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+  @volatile var startTimes = 0
+  ssc.addStreamingListener(new StreamingListener {
+override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+  startTimes += 1
+}
+  })
+  val input = ssc.receiverStream(new StoppableReceiver)
+  val output = new TestOutputStream(input)
+  output.register()
+  ssc.start()
+  StoppableReceiver.shouldStop = true
+  eventually(timeout(10 seconds), interval(10 millis)) {
+// The receiver is stopped once, so if it's restarted, it should be 
started twice.
+assert(startTimes === 2)
+  }
+}
+  }
 }
 
 /** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
 
   def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
 }
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+val thread = new Thread() {
+  override def run() {
+while (!StoppableReceiver.shouldStop) {
+  Thread.sleep(10)
+}
+

spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later

2015-08-31 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bf5b2f26b -> 33ce274cd


[SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when 
deregisterReceivering since we may reuse it later

`deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it 
will throw `java.util.NoSuchElementException: key not found` when restarting it.

Author: zsxwing 

Closes #8538 from zsxwing/SPARK-10369.

(cherry picked from commit 4a5fe091658b1d06f427e404a11a84fc84f953c5)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ce274c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ce274c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ce274c

Branch: refs/heads/branch-1.5
Commit: 33ce274cdf7538b5816f1a400b2fad394ec2a056
Parents: bf5b2f2
Author: zsxwing 
Authored: Mon Aug 31 12:19:11 2015 -0700
Committer: Tathagata Das 
Committed: Mon Aug 31 12:19:48 2015 -0700

--
 .../streaming/scheduler/ReceiverTracker.scala   |  4 +-
 .../scheduler/ReceiverTrackerSuite.scala| 51 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d532a6..f86fd44 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 ReceiverTrackingInfo(
   streamId, ReceiverState.INACTIVE, None, None, None, None, 
Some(errorInfo))
 }
-receiverTrackingInfos -= streamId
+receiverTrackingInfos(streamId) = newReceiverTrackingInfo
 
listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
 val messageWithError = if (error != null && !error.isEmpty) {
   s"$message - $error"
@@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 context.reply(true)
   // Local messages
   case AllReceiverIds =>
-context.reply(receiverTrackingInfos.keys.toSeq)
+context.reply(receiverTrackingInfos.filter(_._2.state != 
ReceiverState.INACTIVE).keys.toSeq)
   case StopAllReceivers =>
 assert(isTrackerStopping || isTrackerStopped)
 stopReceivers()

http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index dd292ba..45138b7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase {
   }
 }
   }
+
+  test("should restart receiver after stopping it") {
+withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc 
=>
+  @volatile var startTimes = 0
+  ssc.addStreamingListener(new StreamingListener {
+override def onReceiverStarted(receiverStarted: 
StreamingListenerReceiverStarted): Unit = {
+  startTimes += 1
+}
+  })
+  val input = ssc.receiverStream(new StoppableReceiver)
+  val output = new TestOutputStream(input)
+  output.register()
+  ssc.start()
+  StoppableReceiver.shouldStop = true
+  eventually(timeout(10 seconds), interval(10 millis)) {
+// The receiver is stopped once, so if it's restarted, it should be 
started twice.
+assert(startTimes === 2)
+  }
+}
+  }
 }
 
 /** An input DStream with for testing rate controlling */
@@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver {
 
   def getActive(): Option[RateTestReceiver] = Option(activeReceiver)
 }
+
+/**
+ * A custom receiver that could be stopped via StoppableReceiver.shouldStop
+ */
+class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+val thread = 

spark git commit: [SPARK-10170] [SQL] Add DB2 JDBC dialect support.

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4a5fe0916 -> a2d5c7209


[SPARK-10170] [SQL] Add DB2 JDBC dialect support.

Data frame write to DB2 database is failing because by default JDBC data source 
implementation is generating a table schema with DB2 unsupported data types 
TEXT for String, and BIT1(1) for Boolean.

This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 
data types.

Author: sureshthalamati 

Closes #8393 from sureshthalamati/db2_dialect_spark-10170.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d5c720
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d5c720
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d5c720

Branch: refs/heads/master
Commit: a2d5c72091b1c602694dbca823a7b26f86b02864
Parents: 4a5fe09
Author: sureshthalamati 
Authored: Mon Aug 31 12:39:58 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 12:39:58 2015 -0700

--
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala  | 18 ++
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala |  7 +++
 2 files changed, 25 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 8849fc2..c6d05c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -125,6 +125,7 @@ object JdbcDialects {
 
   registerDialect(MySQLDialect)
   registerDialect(PostgresDialect)
+  registerDialect(DB2Dialect)
 
   /**
* Fetch the JdbcDialect class corresponding to a given database url.
@@ -222,3 +223,20 @@ case object MySQLDialect extends JdbcDialect {
 s"`$colName`"
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
+ * By default string, and boolean gets mapped to db2 invalid types TEXT, and 
BIT(1).
+ */
+@DeveloperApi
+case object DB2Dialect extends JdbcDialect {
+
+  override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
+
+  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
+case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
+case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
+case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edac08..d8c9a08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -407,6 +407,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
   test("Default jdbc dialect registration") {
 assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect)
 assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == 
PostgresDialect)
+assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
 assert(JdbcDialects.get("test.invalid") == NoopDialect)
   }
 
@@ -443,4 +444,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
 assert(agg.getCatalystType(0, "", 1, null) === Some(LongType))
 assert(agg.getCatalystType(1, "", 1, null) === Some(StringType))
   }
+
+  test("DB2Dialect type mapping") {
+val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
+
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == 
"CLOB")
+
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == 
"CHAR(1)")
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master a2d5c7209 -> 23e39cc7b


[SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode

This could help reduce hash collisions, e.g., in `RDD[Vector].repartition`. 
jkbradley

Author: Xiangrui Meng 

Closes #8182 from mengxr/SPARK-9954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23e39cc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23e39cc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23e39cc7

Branch: refs/heads/master
Commit: 23e39cc7b1bb7f1087c4706234c9b5165a571357
Parents: a2d5c72
Author: Xiangrui Meng 
Authored: Mon Aug 31 15:49:25 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 15:49:25 2015 -0700

--
 .../org/apache/spark/mllib/linalg/Vectors.scala | 38 +++-
 1 file changed, 21 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23e39cc7/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 06ebb15..3642e92 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -71,20 +71,22 @@ sealed trait Vector extends Serializable {
   }
 
   /**
-   * Returns a hash code value for the vector. The hash code is based on its 
size and its nonzeros
-   * in the first 16 entries, using a hash algorithm similar to 
[[java.util.Arrays.hashCode]].
+   * Returns a hash code value for the vector. The hash code is based on its 
size and its first 128
+   * nonzero entries, using a hash algorithm similar to 
[[java.util.Arrays.hashCode]].
*/
   override def hashCode(): Int = {
 // This is a reference implementation. It calls return in foreachActive, 
which is slow.
 // Subclasses should override it with optimized implementation.
 var result: Int = 31 + size
+var nnz = 0
 this.foreachActive { (index, value) =>
-  if (index < 16) {
+  if (nnz < Vectors.MAX_HASH_NNZ) {
 // ignore explicit 0 for comparison between sparse and dense
 if (value != 0) {
   result = 31 * result + index
   val bits = java.lang.Double.doubleToLongBits(value)
   result = 31 * result + (bits ^ (bits >>> 32)).toInt
+  nnz += 1
 }
   } else {
 return result
@@ -536,6 +538,9 @@ object Vectors {
 }
 allEqual
   }
+
+  /** Max number of nonzero entries used in computing hash code. */
+  private[linalg] val MAX_HASH_NNZ = 128
 }
 
 /**
@@ -578,13 +583,15 @@ class DenseVector @Since("1.0.0") (
   override def hashCode(): Int = {
 var result: Int = 31 + size
 var i = 0
-val end = math.min(values.length, 16)
-while (i < end) {
+val end = values.length
+var nnz = 0
+while (i < end && nnz < Vectors.MAX_HASH_NNZ) {
   val v = values(i)
   if (v != 0.0) {
 result = 31 * result + i
 val bits = java.lang.Double.doubleToLongBits(values(i))
 result = 31 * result + (bits ^ (bits >>> 32)).toInt
+nnz += 1
   }
   i += 1
 }
@@ -707,19 +714,16 @@ class SparseVector @Since("1.0.0") (
   override def hashCode(): Int = {
 var result: Int = 31 + size
 val end = values.length
-var continue = true
 var k = 0
-while ((k < end) & continue) {
-  val i = indices(k)
-  if (i < 16) {
-val v = values(k)
-if (v != 0.0) {
-  result = 31 * result + i
-  val bits = java.lang.Double.doubleToLongBits(v)
-  result = 31 * result + (bits ^ (bits >>> 32)).toInt
-}
-  } else {
-continue = false
+var nnz = 0
+while (k < end && nnz < Vectors.MAX_HASH_NNZ) {
+  val v = values(k)
+  if (v != 0.0) {
+val i = indices(k)
+result = 31 * result + i
+val bits = java.lang.Double.doubleToLongBits(v)
+result = 31 * result + (bits ^ (bits >>> 32)).toInt
+nnz += 1
   }
   k += 1
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: Preparing Spark release v1.5.0-rc3

2015-08-31 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 1c752b8b5 -> 2b270a166


Preparing Spark release v1.5.0-rc3


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/908e37bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/908e37bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/908e37bc

Branch: refs/heads/branch-1.5
Commit: 908e37bcc10132bb2aa7f80ae694a9df6e40f31a
Parents: 1c752b8
Author: Patrick Wendell 
Authored: Mon Aug 31 15:57:42 2015 -0700
Committer: Patrick Wendell 
Committed: Mon Aug 31 15:57:42 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 7b41ebb..3ef7d6f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 16bf17c..684e07b 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index beb547f..6b082ad 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 3926b79..9ef1eda 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 5eda12d..aa7021d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 33f2cd7..7d72f78 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.1-SNAPSHOT
+1.5.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 670c783..38683e3 

Git Push Summary

2015-08-31 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.0-rc3 [created] 908e37bcc

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: Preparing development version 1.5.1-SNAPSHOT

2015-08-31 Thread pwendell
Preparing development version 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b270a16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b270a16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b270a16

Branch: refs/heads/branch-1.5
Commit: 2b270a166d6bd5b42399400924c576c9996bfc10
Parents: 908e37b
Author: Patrick Wendell 
Authored: Mon Aug 31 15:57:49 2015 -0700
Committer: Patrick Wendell 
Committed: Mon Aug 31 15:57:49 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 33 files changed, 33 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 3ef7d6f..7b41ebb 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 684e07b..16bf17c 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 6b082ad..beb547f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9ef1eda..3926b79 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index aa7021d..5eda12d 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 7d72f78..33f2cd7 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.5.0
+1.5.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 38683e3..670c783 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 

spark git commit: [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer

2015-08-31 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master fe16fd0b8 -> 52ea399e6


[SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer

Add Python API for SQLTransformer

Author: Yanbo Liang 

Closes #8527 from yanboliang/spark-10355.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ea399e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ea399e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ea399e

Branch: refs/heads/master
Commit: 52ea399e6ee37b7c44aae7709863e006fca88906
Parents: fe16fd0
Author: Yanbo Liang 
Authored: Mon Aug 31 16:11:27 2015 -0700
Committer: Xiangrui Meng 
Committed: Mon Aug 31 16:11:27 2015 -0700

--
 python/pyspark/ml/feature.py | 57 ---
 1 file changed, 54 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/52ea399e/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 59300a6..0626281 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -28,9 +28,9 @@ from pyspark.mllib.linalg import _convert_to_vector
 
 __all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 
'HashingTF', 'IDF', 'IDFModel',
'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 
'RegexTokenizer',
-   'StandardScaler', 'StandardScalerModel', 'StringIndexer', 
'StringIndexerModel',
-   'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 
'Word2VecModel',
-   'PCA', 'PCAModel', 'RFormula', 'RFormulaModel']
+   'SQLTransformer', 'StandardScaler', 'StandardScalerModel', 
'StringIndexer',
+   'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 
'VectorIndexer', 'Word2Vec',
+   'Word2VecModel', 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel']
 
 
 @inherit_doc
@@ -744,6 +744,57 @@ class RegexTokenizer(JavaTransformer, HasInputCol, 
HasOutputCol):
 
 
 @inherit_doc
+class SQLTransformer(JavaTransformer):
+"""
+Implements the transforms which are defined by SQL statement.
+Currently we only support SQL syntax like 'SELECT ... FROM __THIS__'
+where '__THIS__' represents the underlying table of the input dataset.
+
+>>> df = sqlContext.createDataFrame([(0, 1.0, 3.0), (2, 2.0, 5.0)], ["id", 
"v1", "v2"])
+>>> sqlTrans = SQLTransformer(
+... statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM 
__THIS__")
+>>> sqlTrans.transform(df).head()
+Row(id=0, v1=1.0, v2=3.0, v3=4.0, v4=3.0)
+"""
+
+# a placeholder to make it appear in the generated doc
+statement = Param(Params._dummy(), "statement", "SQL statement")
+
+@keyword_only
+def __init__(self, statement=None):
+"""
+__init__(self, statement=None)
+"""
+super(SQLTransformer, self).__init__()
+self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.SQLTransformer", self.uid)
+self.statement = Param(self, "statement", "SQL statement")
+kwargs = self.__init__._input_kwargs
+self.setParams(**kwargs)
+
+@keyword_only
+def setParams(self, statement=None):
+"""
+setParams(self, statement=None)
+Sets params for this SQLTransformer.
+"""
+kwargs = self.setParams._input_kwargs
+return self._set(**kwargs)
+
+def setStatement(self, value):
+"""
+Sets the value of :py:attr:`statement`.
+"""
+self._paramMap[self.statement] = value
+return self
+
+def getStatement(self):
+"""
+Gets the value of statement or its default value.
+"""
+return self.getOrDefault(self.statement)
+
+
+@inherit_doc
 class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol):
 """
 Standardizes features by removing the mean and scaling to unit variance 
using column summary


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.

2015-08-31 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 52ea399e6 -> d65656c45


[SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.

They don't bring much value since we now have better unit test coverage for 
hash joins. This will also help reduce the test time.

Author: Reynold Xin 

Closes #8542 from rxin/SPARK-10378.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d65656c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d65656c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d65656c4

Branch: refs/heads/master
Commit: d65656c455d19b83c6412571873586b458aa355e
Parents: 52ea399
Author: Reynold Xin 
Authored: Mon Aug 31 18:09:24 2015 -0700
Committer: Reynold Xin 
Committed: Mon Aug 31 18:09:24 2015 -0700

--
 .../execution/HashJoinCompatibilitySuite.scala  | 169 ---
 1 file changed, 169 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d65656c4/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
deleted file mode 100644
index 1a5ba20..000
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import java.io.File
-
-import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.hive.test.TestHive
-
-/**
- * Runs the test cases that are included in the hive distribution with hash 
joins.
- */
-class HashJoinCompatibilitySuite extends HiveCompatibilitySuite {
-  override def beforeAll() {
-super.beforeAll()
-TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
-  }
-
-  override def afterAll() {
-TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
-super.afterAll()
-  }
-
-  override def whiteList = Seq(
-"auto_join0",
-"auto_join1",
-"auto_join10",
-"auto_join11",
-"auto_join12",
-"auto_join13",
-"auto_join14",
-"auto_join14_hadoop20",
-"auto_join15",
-"auto_join17",
-"auto_join18",
-"auto_join19",
-"auto_join2",
-"auto_join20",
-"auto_join21",
-"auto_join22",
-"auto_join23",
-"auto_join24",
-"auto_join25",
-"auto_join26",
-"auto_join27",
-"auto_join28",
-"auto_join3",
-"auto_join30",
-"auto_join31",
-"auto_join32",
-"auto_join4",
-"auto_join5",
-"auto_join6",
-"auto_join7",
-"auto_join8",
-"auto_join9",
-"auto_join_filters",
-"auto_join_nulls",
-"auto_join_reordering_values",
-"auto_smb_mapjoin_14",
-"auto_sortmerge_join_1",
-"auto_sortmerge_join_10",
-"auto_sortmerge_join_11",
-"auto_sortmerge_join_12",
-"auto_sortmerge_join_13",
-"auto_sortmerge_join_14",
-"auto_sortmerge_join_15",
-"auto_sortmerge_join_16",
-"auto_sortmerge_join_2",
-"auto_sortmerge_join_3",
-"auto_sortmerge_join_4",
-"auto_sortmerge_join_5",
-"auto_sortmerge_join_6",
-"auto_sortmerge_join_7",
-"auto_sortmerge_join_8",
-"auto_sortmerge_join_9",
-"correlationoptimizer1",
-"correlationoptimizer10",
-"correlationoptimizer11",
-"correlationoptimizer13",
-"correlationoptimizer14",
-"correlationoptimizer15",
-"correlationoptimizer2",
-"correlationoptimizer3",
-"correlationoptimizer4",
-"correlationoptimizer6",
-"correlationoptimizer7",
-"correlationoptimizer8",
-"correlationoptimizer9",
-"join0",
-"join1",
-"join10",
-"join11",
-"join12",
-"join13",
-"join14",
-"join14_hadoop20",
-"join15",
-"join16",
-"join17",
-