svn commit: r1675040 - /spark/site/mllib/index.html
Author: meng Date: Tue Apr 21 06:25:38 2015 New Revision: 1675040 URL: http://svn.apache.org/r1675040 Log: update mllib/index.html Modified: spark/site/mllib/index.html Modified: spark/site/mllib/index.html URL: http://svn.apache.org/viewvc/spark/site/mllib/index.html?rev=1675040r1=1675039r2=1675040view=diff == --- spark/site/mllib/index.html (original) +++ spark/site/mllib/index.html Tue Apr 21 06:25:38 2015 @@ -252,16 +252,20 @@ div class=col-md-4 col-padded h3Algorithms/h3 p - MLlib 1.1 contains the following algorithms: + MLlib 1.3 contains the following algorithms: /p ul class=list-narrow lilinear SVM and logistic regression/li liclassification and regression tree/li - lik-means clustering/li + lirandom forest and gradient-boosted trees/li + liclusteirng via k-means, Gaussian mixtures, and power iteration clustering/li + litopic modeling via latent Dirichlet allocation/li lirecommendation via alternating least squares/li lisingular value decomposition/li lilinear regression with Lsub1/sub- and Lsub2/sub-regularization/li + liisotonic regression/li limultinomial naive Bayes/li + lifrequent itemset mining via FP-growth/li libasic statistics/li lifeature transformations/li /ul - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*
Repository: spark Updated Branches: refs/heads/master c736220da - 8136810df [SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.* Deprecated `spark.akka.num.retries`, `spark.akka.retry.wait`, `spark.akka.askTimeout`, `spark.akka.lookupTimeout`, and added `spark.rpc.num.retries`, `spark.rpc.retry.wait`, `spark.rpc.askTimeout`, `spark.rpc.lookupTimeout`. Author: zsxwing zsxw...@gmail.com Closes #5595 from zsxwing/SPARK-6490 and squashes the following commits: e0d80a9 [zsxwing] Use getTimeAsMs and getTimeAsSeconds and other minor fixes 31dbe69 [zsxwing] Add spark.rpc.* and deprecate spark.akka.* Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8136810d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8136810d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8136810d Branch: refs/heads/master Commit: 8136810dfad12008ac300116df7bc8448740f1ae Parents: c736220 Author: zsxwing zsxw...@gmail.com Authored: Mon Apr 20 23:18:42 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Apr 20 23:18:42 2015 -0700 -- .../main/scala/org/apache/spark/SparkConf.scala | 10 +++- .../scala/org/apache/spark/deploy/Client.scala | 6 ++--- .../apache/spark/deploy/client/AppClient.scala | 4 +-- .../org/apache/spark/deploy/master/Master.scala | 4 +-- .../spark/deploy/master/ui/MasterWebUI.scala| 4 +-- .../deploy/rest/StandaloneRestServer.scala | 8 +++--- .../spark/deploy/worker/ui/WorkerWebUI.scala| 4 +-- .../scala/org/apache/spark/rpc/RpcEnv.scala | 10 .../cluster/YarnSchedulerBackend.scala | 4 +-- .../spark/storage/BlockManagerMaster.scala | 4 +-- .../scala/org/apache/spark/util/AkkaUtils.scala | 26 +++- .../scala/org/apache/spark/util/RpcUtils.scala | 23 + .../apache/spark/MapOutputTrackerSuite.scala| 4 +-- .../scala/org/apache/spark/SparkConfSuite.scala | 24 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 4 +-- 15 files changed, 86 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index e3a649d..c1996e0 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging { spark.yarn.am.waitTime - Seq( AlternateConfig(spark.yarn.applicationMaster.waitTries, 1.3, // Translate old value to a duration, with 10s wait time per try. -translation = s = s${s.toLong * 10}s)) +translation = s = s${s.toLong * 10}s)), +spark.rpc.numRetries - Seq( + AlternateConfig(spark.akka.num.retries, 1.4)), +spark.rpc.retry.wait - Seq( + AlternateConfig(spark.akka.retry.wait, 1.4)), +spark.rpc.askTimeout - Seq( + AlternateConfig(spark.akka.askTimeout, 1.4)), +spark.rpc.lookupTimeout - Seq( + AlternateConfig(spark.akka.lookupTimeout, 1.4)) ) /** http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/Client.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 8d13b2a..c2c3e9a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} -import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils} +import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils} /** * Proxy that relays messages to the driver. @@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with ActorLogReceive with Logging { var masterActor: ActorSelection = _ - val timeout = AkkaUtils.askTimeout(conf) + val timeout = RpcUtils.askTimeout(conf) override def preStart(): Unit = { masterActor = context.actorSelection( @@ -155,7 +155,7 @@ object Client { if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { conf.set(spark.akka.logLifecycleEvents, true) } -conf.set(spark.akka.askTimeout, 10) +conf.set(spark.rpc.askTimeout, 10) conf.set(akka.loglevel, driverArgs.logLevel.toString.replace(WARN,
spark git commit: [SPARK-5990] [MLLIB] Model import/export for IsotonicRegression
Repository: spark Updated Branches: refs/heads/master ab9128fb7 - 1f2f723b0 [SPARK-5990] [MLLIB] Model import/export for IsotonicRegression Model import/export for IsotonicRegression Author: Yanbo Liang yblia...@gmail.com Closes #5270 from yanboliang/spark-5990 and squashes the following commits: 872028d [Yanbo Liang] fix code style f80ec1b [Yanbo Liang] address comments 49600cc [Yanbo Liang] address comments 429ff7d [Yanbo Liang] store each interval as a record 2b2f5a1 [Yanbo Liang] Model import/export for IsotonicRegression Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f2f723b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f2f723b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f2f723b Branch: refs/heads/master Commit: 1f2f723b0daacbb9e70ec42c19a84470af1d7765 Parents: ab9128f Author: Yanbo Liang yblia...@gmail.com Authored: Tue Apr 21 00:14:16 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 00:14:16 2015 -0700 -- .../mllib/regression/IsotonicRegression.scala | 78 +++- .../regression/IsotonicRegressionSuite.scala| 21 ++ 2 files changed, 98 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f2f723b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index cb70852..1d76170 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -23,9 +23,16 @@ import java.util.Arrays.binarySearch import scala.collection.mutable.ArrayBuffer +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SQLContext} /** * :: Experimental :: @@ -42,7 +49,7 @@ import org.apache.spark.rdd.RDD class IsotonicRegressionModel ( val boundaries: Array[Double], val predictions: Array[Double], -val isotonic: Boolean) extends Serializable { +val isotonic: Boolean) extends Serializable with Saveable { private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse @@ -124,6 +131,75 @@ class IsotonicRegressionModel ( predictions(foundIndex) } } + + override def save(sc: SparkContext, path: String): Unit = { +IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic) + } + + override protected def formatVersion: String = 1.0 +} + +object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { + + import org.apache.spark.mllib.util.Loader._ + + private object SaveLoadV1_0 { + +def thisFormatVersion: String = 1.0 + +/** Hard-code class name string in case it changes in the future */ +def thisClassName: String = org.apache.spark.mllib.regression.IsotonicRegressionModel + +/** Model data for model import/export */ +case class Data(boundary: Double, prediction: Double) + +def save( +sc: SparkContext, +path: String, +boundaries: Array[Double], +predictions: Array[Double], +isotonic: Boolean): Unit = { + val sqlContext = new SQLContext(sc) + + val metadata = compact(render( +(class - thisClassName) ~ (version - thisFormatVersion) ~ + (isotonic - isotonic))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + + sqlContext.createDataFrame( +boundaries.toSeq.zip(predictions).map { case (b, p) = Data(b, p) } + ).saveAsParquetFile(dataPath(path)) +} + +def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { + val sqlContext = new SQLContext(sc) + val dataRDD = sqlContext.parquetFile(dataPath(path)) + + checkSchema[Data](dataRDD.schema) + val dataArray = dataRDD.select(boundary, prediction).collect() + val (boundaries, predictions) = dataArray.map { x = +(x.getDouble(0), x.getDouble(1)) + }.toList.sortBy(_._1).unzip + (boundaries.toArray, predictions.toArray) +} + } + + override def load(sc: SparkContext, path: String): IsotonicRegressionModel = { +implicit val formats = DefaultFormats +val (loadedClassName, version, metadata) = loadMetadata(sc, path) +val isotonic = (metadata \
spark git commit: [SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression
Repository: spark Updated Branches: refs/heads/master 8136810df - ab9128fb7 [SPARK-6949] [SQL] [PySpark] Support Date/Timestamp in Column expression This PR enable auto_convert in JavaGateway, then we could register a converter for a given types, for example, date and datetime. There are two bugs related to auto_convert, see [1] and [2], we workaround it in this PR. [1] https://github.com/bartdag/py4j/issues/160 [2] https://github.com/bartdag/py4j/issues/161 cc rxin JoshRosen Author: Davies Liu dav...@databricks.com Closes #5570 from davies/py4j_date and squashes the following commits: eb4fa53 [Davies Liu] fix tests in python 3 d17d634 [Davies Liu] rollback changes in mllib 2e7566d [Davies Liu] convert tuple into ArrayList ceb3779 [Davies Liu] Update rdd.py 3c373f3 [Davies Liu] support date and datetime by auto_convert cb094ff [Davies Liu] enable auto convert Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab9128fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab9128fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab9128fb Branch: refs/heads/master Commit: ab9128fb7ec7ca479dc91e7cc7c775e8d071eafa Parents: 8136810 Author: Davies Liu dav...@databricks.com Authored: Tue Apr 21 00:08:18 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 00:08:18 2015 -0700 -- python/pyspark/context.py | 6 +- python/pyspark/java_gateway.py | 15 ++- python/pyspark/rdd.py | 3 +++ python/pyspark/sql/_types.py| 27 +++ python/pyspark/sql/context.py | 13 - python/pyspark/sql/dataframe.py | 18 -- python/pyspark/sql/tests.py | 11 +++ python/pyspark/streaming/context.py | 11 +++ python/pyspark/streaming/kafka.py | 7 ++- python/pyspark/streaming/tests.py | 6 +- 10 files changed, 70 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab9128fb/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6a743ac..b006120 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -23,8 +23,6 @@ import sys from threading import Lock from tempfile import NamedTemporaryFile -from py4j.java_collections import ListConverter - from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast @@ -643,7 +641,6 @@ class SparkContext(object): rdds = [x._reserialize() for x in rdds] first = rdds[0]._jrdd rest = [x._jrdd for x in rdds[1:]] -rest = ListConverter().convert(rest, self._gateway._gateway_client) return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) def broadcast(self, value): @@ -846,13 +843,12 @@ class SparkContext(object): if partitions is None: partitions = range(rdd._jrdd.partitions().size()) -javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) # Implementation note: This is implemented as a mapPartitions followed # by runJob() in order to avoid having to pass a Python lambda into # SparkContext#runJob. mappedRDD = rdd.mapPartitions(partitionFunc) -port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, +port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions, allowLocal) return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) http://git-wip-us.apache.org/repos/asf/spark/blob/ab9128fb/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 45bc38f..3cee4ea 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -17,17 +17,30 @@ import atexit import os +import sys import select import signal import shlex import socket import platform from subprocess import Popen, PIPE + +if sys.version = '3': +xrange = range + from py4j.java_gateway import java_import, JavaGateway, GatewayClient +from py4j.java_collections import ListConverter from pyspark.serializers import read_int +# patching ListConverter, or it will convert bytearray into Java ArrayList +def can_convert_list(self, obj): +return isinstance(obj, (list, tuple, xrange)) + +ListConverter.can_convert = can_convert_list + + def launch_gateway(): if PYSPARK_GATEWAY_PORT in os.environ: gateway_port = int(os.environ[PYSPARK_GATEWAY_PORT]) @@
spark git commit: [SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD
Repository: spark Updated Branches: refs/heads/master 5fea3e5c3 - c035c0f2d [SPARK-5360] [SPARK-6606] Eliminate duplicate objects in serialized CoGroupedRDD CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle. The partition is serialized separately from the RDD, so when the RDD and partition arrive on the worker, the references in the partition and in the RDD no longer point to the same object. This is a relatively minor performance issue (the closure can be 2x larger than it needs to be because the rdds and partitions are serialized twice; see numbers below) but is more annoying as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects. I'm not sure if this is enough of a potential future problem to fix this old and central part of the code, so hoping to get input from others here. I did some simple experiments to see how much this effects closure size. For this example: $ val a = sc.parallelize(1 to 10).map((_, 1)) $ val b = sc.parallelize(1 to 2).map(x = (x, 2*x)) $ a.cogroup(b).collect() the closure was 1902 bytes with current Spark, and 1129 bytes after my change. The difference comes from eliminating duplicate serialization of the shuffle handle. For this example: $ val sortedA = a.sortByKey() $ val sortedB = b.sortByKey() $ sortedA.cogroup(sortedB).collect() the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies. The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None by default). It would also get bigger for a big RDD -- although I can't think of any examples where the RDD object gets large. The difference is not affected by the size of the function the user specifies, which (based on my understanding) is typically the source of large task closures. Author: Kay Ousterhout kayousterh...@gmail.com Closes #4145 from kayousterhout/SPARK-5360 and squashes the following commits: 85156c3 [Kay Ousterhout] Better comment the narrowDeps parameter cff0209 [Kay Ousterhout] Fixed spelling issue 658e1af [Kay Ousterhout] [SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c035c0f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c035c0f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c035c0f2 Branch: refs/heads/master Commit: c035c0f2d72f2a303b86fe0037ec43d756fff060 Parents: 5fea3e5 Author: Kay Ousterhout kayousterh...@gmail.com Authored: Tue Apr 21 11:01:18 2015 -0700 Committer: Kay Ousterhout kayousterh...@gmail.com Committed: Tue Apr 21 11:01:18 2015 -0700 -- .../org/apache/spark/rdd/CoGroupedRDD.scala | 43 .../org/apache/spark/rdd/SubtractedRDD.scala| 30 -- 2 files changed, 44 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c035c0f2/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 7021a33..658e8c8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -29,15 +29,16 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} import org.apache.spark.util.Utils import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.ShuffleHandle - -private[spark] sealed trait CoGroupSplitDep extends Serializable +/** The references to rdd and splitIndex are transient because redundant information is stored + * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from + * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the + * task closure. */ private[spark] case class NarrowCoGroupSplitDep( -rdd: RDD[_], -splitIndex: Int, +@transient rdd: RDD[_], +@transient splitIndex: Int, var split: Partition - ) extends CoGroupSplitDep { + ) extends Serializable { @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream): Unit =
svn commit: r8672 - /dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc /release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.asc
Author: pwendell Date: Tue Apr 21 18:51:16 2015 New Revision: 8672 Log: Spark 1.2.2 Hadoop 2.4 ASC Added: release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.asc - copied unchanged from r8671, dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc Removed: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r8673 - /dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 /release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.md5
Author: pwendell Date: Tue Apr 21 18:51:32 2015 New Revision: 8673 Log: Spark 1.2.2 Hadoop 2.4 MD5 Added: release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.md5 - copied unchanged from r8672, dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 Removed: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.2.2-rc1 [created] 7531b50e4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r8674 - /dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha /release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.sha
Author: pwendell Date: Tue Apr 21 18:52:08 2015 New Revision: 8674 Log: Spark 1.2.2 Hadoop 2.4 Sha Added: release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz.sha - copied unchanged from r8673, dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha Removed: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r8671 - /dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz /release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz
Author: pwendell Date: Tue Apr 21 18:50:59 2015 New Revision: 8671 Log: Spark 1.2.2 Hadoop 2.4 TGZ Added: release/spark/spark-1.2.2/spark-1.2.2-bin-hadoop2.4.tgz - copied unchanged from r8670, dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz Removed: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r8670 - in /dev/spark/spark-1.2.2-rc1: ./ spark-1.2.2-bin-hadoop2.4.tgz spark-1.2.2-bin-hadoop2.4.tgz.asc spark-1.2.2-bin-hadoop2.4.tgz.md5 spark-1.2.2-bin-hadoop2.4.tgz.sha
Author: pwendell Date: Tue Apr 21 18:49:30 2015 New Revision: 8670 Log: Adding missing Hadoop 2.4 binary for Spark 1.2.2 Added: dev/spark/spark-1.2.2-rc1/ dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz (with props) dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc (with props) dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha Added: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz == Binary file - no diff available. Propchange: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz -- svn:mime-type = application/x-gzip Added: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc == Binary file - no diff available. Propchange: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.asc -- svn:mime-type = application/pgp-signature Added: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 == --- dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 (added) +++ dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.md5 Tue Apr 21 18:49:30 2015 @@ -0,0 +1 @@ +spark-1.2.2-bin-hadoop2.4.tgz: D9 F4 BC C0 31 81 D3 F1 29 51 C7 F7 E2 67 8C D6 Added: dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha == --- dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha (added) +++ dev/spark/spark-1.2.2-rc1/spark-1.2.2-bin-hadoop2.4.tgz.sha Tue Apr 21 18:49:30 2015 @@ -0,0 +1,4 @@ +spark-1.2.2-bin-hadoop2.4.tgz: 24E175EF B1E28ADF AFCA7650 F2835177 317FA4CC + 18B4CC18 6D1315DD 08A4ED11 E532E9FD E3EC376D + 6F6156CA 40FCA339 0F7F4D63 CFF1D4E5 0E5721A2 + 069A6D56 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6996][SQL] Support map types in java beans
Repository: spark Updated Branches: refs/heads/master 6265cba00 - 2a24bf92e [SPARK-6996][SQL] Support map types in java beans liancheng mengxr this is similar to #5146. Author: Punya Biswal pbis...@palantir.com Closes #5578 from punya/feature/SPARK-6996 and squashes the following commits: d56c3e0 [Punya Biswal] Fix imports c7e308b [Punya Biswal] Support java iterable types in POJOs 5e00685 [Punya Biswal] Support map types in java beans Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a24bf92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a24bf92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a24bf92 Branch: refs/heads/master Commit: 2a24bf92e6d36e876bad6a8b4e0ff12c407ebb8a Parents: 6265cba Author: Punya Biswal pbis...@palantir.com Authored: Tue Apr 21 14:50:02 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 14:50:02 2015 -0700 -- .../sql/catalyst/CatalystTypeConverters.scala | 20 .../apache/spark/sql/JavaTypeInference.scala| 110 +++ .../scala/org/apache/spark/sql/SQLContext.scala | 52 + .../apache/spark/sql/JavaDataFrameSuite.java| 57 -- 4 files changed, 180 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a24bf92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index d4f9fda..a13e2f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst +import java.lang.{Iterable = JavaIterable} import java.util.{Map = JavaMap} import scala.collection.mutable.HashMap @@ -49,6 +50,16 @@ object CatalystTypeConverters { case (s: Seq[_], arrayType: ArrayType) = s.map(convertToCatalyst(_, arrayType.elementType)) +case (jit: JavaIterable[_], arrayType: ArrayType) = { + val iter = jit.iterator + var listOfItems: List[Any] = List() + while (iter.hasNext) { +val item = iter.next() +listOfItems :+= convertToCatalyst(item, arrayType.elementType) + } + listOfItems +} + case (s: Array[_], arrayType: ArrayType) = s.toSeq.map(convertToCatalyst(_, arrayType.elementType)) @@ -124,6 +135,15 @@ object CatalystTypeConverters { extractOption(item) match { case a: Array[_] = a.toSeq.map(elementConverter) case s: Seq[_] = s.map(elementConverter) +case i: JavaIterable[_] = { + val iter = i.iterator + var convertedIterable: List[Any] = List() + while (iter.hasNext) { +val item = iter.next() +convertedIterable :+= elementConverter(item) + } + convertedIterable +} case null = null } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a24bf92/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala b/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala new file mode 100644 index 000..db484c5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.beans.Introspector +import java.lang.{Iterable = JIterable} +import java.util.{Iterator = JIterator, Map = JMap} + +import com.google.common.reflect.TypeToken + +import org.apache.spark.sql.types._ +
spark git commit: SPARK-3276 Added a new configuration spark.streaming.minRememberDuration
Repository: spark Updated Branches: refs/heads/master c035c0f2d - c25ca7c5a SPARK-3276 Added a new configuration spark.streaming.minRememberDuration SPARK-3276 Added a new configuration parameter ``spark.streaming.minRememberDuration``, with a default value of 1 minute. So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering. Author: emres emre.sev...@gmail.com Closes #5438 from emres/SPARK-3276 and squashes the following commits: 766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds method. affee1d [emres] SPARK-3276 Changed the property name and variable name for minRememberDuration c9d58ca [emres] SPARK-3276 Minor code re-formatting. 1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf, and also getLong method directly. bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields. 43cc1ce [emres] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c25ca7c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c25ca7c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c25ca7c5 Branch: refs/heads/master Commit: c25ca7c5a1f2a4f88f40b0c5cdbfa927c186cfa8 Parents: c035c0f Author: emres emre.sev...@gmail.com Authored: Tue Apr 21 16:39:56 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Tue Apr 21 16:39:56 2015 -0400 -- .../streaming/dstream/FileInputDStream.scala| 30 +++- 1 file changed, 17 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c25ca7c5/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 66d5191..eca69f0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat = NewInputFormat} -import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkConf, SerializableWritable} import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * the streaming app. * - If a file is to be visible in the directory listings, it must be visible within a certain * duration of the mod time of the file. This duration is the remember window, which is set to - * 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be + * 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be * selected as the mod time will be less than the ignore threshold when it becomes visible. * - Once a file is visible, the mod time cannot change. If it does due to appends, then the * processing semantics are undefined. @@ -80,6 +80,15 @@ class FileInputDStream[K, V, F : NewInputFormat[K,V]]( private val serializableConfOpt = conf.map(new SerializableWritable(_)) + /** + * Minimum duration of remembering the information of selected files. Defaults to 60 seconds. + * + * Files with mod times older than this window of remembering will be ignored. So if new + * files are visible within this window, then the file will get selected in the next batch. + */ + private val minRememberDurationS = +Seconds(ssc.conf.getTimeAsSeconds(spark.streaming.minRememberDuration, 60s)) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -95,7 +104,8 @@ class FileInputDStream[K, V, F : NewInputFormat[K,V]]( * This would allow us to filter away not-too-old files which have already been recently * selected and processed. */ - private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration) + private val numBatchesToRemember = FileInputDStream +.calculateNumBatchesToRemember(slideDuration, minRememberDurationS) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -330,20 +340,14
spark git commit: [SPARK-7011] Build(compilation) fails with scala 2.11 option, because a protected[sql] type is accessed in ml package.
Repository: spark Updated Branches: refs/heads/master 45c47fa41 - 04bf34e34 [SPARK-7011] Build(compilation) fails with scala 2.11 option, because a protected[sql] type is accessed in ml package. [This](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala#L58) is where it is used and fails compilations at. Author: Prashant Sharma prashan...@imaginea.com Closes #5593 from ScrapCodes/SPARK-7011/build-fix and squashes the following commits: e6d57a3 [Prashant Sharma] [SPARK-7011] Build fails with scala 2.11 option, because a protected[sql] type is accessed in ml package. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04bf34e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04bf34e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04bf34e3 Branch: refs/heads/master Commit: 04bf34e34f22e3d7e972fe755251774fc6a6d52e Parents: 45c47fa Author: Prashant Sharma prashan...@imaginea.com Authored: Tue Apr 21 14:43:46 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 14:43:46 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/types/dataTypes.scala| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04bf34e3/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index c6fb22c..a108413 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -299,7 +299,7 @@ class NullType private() extends DataType { case object NullType extends NullType -protected[sql] object NativeType { +protected[spark] object NativeType { val all = Seq( IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) @@ -327,7 +327,7 @@ protected[sql] object PrimitiveType { } } -protected[sql] abstract class NativeType extends DataType { +protected[spark] abstract class NativeType extends DataType { private[sql] type JvmType @transient private[sql] val tag: TypeTag[JvmType] private[sql] val ordering: Ordering[JvmType] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5817] [SQL] Fix bug of udtf with column names
Repository: spark Updated Branches: refs/heads/master 2a24bf92e - 7662ec23b [SPARK-5817] [SQL] Fix bug of udtf with column names It's a bug while do query like: ```sql select d from (select explode(array(1,1)) d from src limit 1) t ``` And it will throws exception like: ``` org.apache.spark.sql.AnalysisException: cannot resolve 'd' given input columns _c0; line 1 pos 7 at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:48) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:45) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:103) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:117) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:116) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) ``` To solve the bug, it requires code refactoring for UDTF The major changes are about: * Simplifying the UDTF development, UDTF will manage the output attribute names any more, instead, the `logical.Generate` will handle that properly. * UDTF will be asked for the output schema (data types) during the logical plan analyzing. Author: Cheng Hao hao.ch...@intel.com Closes #4602 from chenghao-intel/explode_bug and squashes the following commits: c2a5132 [Cheng Hao] add back resolved for Alias 556e982 [Cheng Hao] revert the unncessary change 002c361 [Cheng Hao] change the rule of resolved for Generate 04ae500 [Cheng Hao] add qualifier only for generator output 5ee5d2c [Cheng Hao] prepend the new qualifier d2e8b43 [Cheng Hao] Update the code as feedback ca5e7f4 [Cheng Hao] shrink the commits Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7662ec23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7662ec23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7662ec23 Branch: refs/heads/master Commit: 7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04 Parents: 2a24bf9 Author: Cheng Hao hao.ch...@intel.com Authored: Tue Apr 21 15:11:15 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 15:11:15 2015 -0700 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 57 ++-- .../sql/catalyst/analysis/CheckAnalysis.scala | 12 + .../apache/spark/sql/catalyst/dsl/package.scala | 3 +- .../sql/catalyst/expressions/generators.scala | 49 - .../catalyst/expressions/namedExpressions.scala | 2 + .../sql/catalyst/optimizer/Optimizer.scala | 8 +-- .../catalyst/plans/logical/basicOperators.scala | 37 - .../sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../optimizer/FilterPushdownSuite.scala | 8 +-- .../scala/org/apache/spark/sql/DataFrame.scala | 21 +--- .../apache/spark/sql/execution/Generate.scala | 22 +++- .../spark/sql/execution/SparkStrategies.scala | 5 +- .../org/apache/spark/sql/hive/HiveContext.scala | 1 - .../org/apache/spark/sql/hive/HiveQl.scala | 37 +++-- .../org/apache/spark/sql/hive/hiveUdfs.scala| 38 ++--- ...tf output-0-d1f244bce64f22b34ad5bf9fd360b632 | 1 + ...lumn name-0-7ac701cf43e73e9e416888e4df694348 | 0 ...lumn name-1-5cdf9d51fc0e105e365d82e7611e37f3 | 0 ...lumn name-2-f963396461294e06cb7cafe22a1419e4 | 3 ++ ...umn names-0-46bdb27b3359dc81d8c246b9f69d4b82 | 0 ...umn names-1-cdf6989f3b055257f1692c3bbd80dc73 | 0 ...umn names-2-ab3954b69d7a991bc801a509c3166cc5 | 3 ++ ...lumn name-0-7ac701cf43e73e9e416888e4df694348 | 0 ...lumn name-1-26599718c322ff4f9740040c066d8292 | 0 ...lumn name-2-f963396461294e06cb7cafe22a1419e4 | 3 ++ .../sql/hive/execution/HiveQuerySuite.scala | 40 +- 26 files changed, 207 insertions(+), 145 deletions(-)
spark git commit: [SPARK-6845] [MLlib] [PySpark] Add isTranposed flag to DenseMatrix
Repository: spark Updated Branches: refs/heads/master c25ca7c5a - 45c47fa41 [SPARK-6845] [MLlib] [PySpark] Add isTranposed flag to DenseMatrix Since sparse matrices now support a isTransposed flag for row major data, DenseMatrices should do the same. Author: MechCoder manojkumarsivaraj...@gmail.com Closes #5455 from MechCoder/spark-6845 and squashes the following commits: 525c370 [MechCoder] minor 004a37f [MechCoder] Cast boolean to int 151f3b6 [MechCoder] [WIP] Add isTransposed to pickle DenseMatrix cc0b90a [MechCoder] [SPARK-6845] Add isTranposed flag to DenseMatrix Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45c47fa4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45c47fa4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45c47fa4 Branch: refs/heads/master Commit: 45c47fa4176ea75886a58f5d73c44afcb29aa629 Parents: c25ca7c Author: MechCoder manojkumarsivaraj...@gmail.com Authored: Tue Apr 21 14:36:50 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 14:36:50 2015 -0700 -- .../spark/mllib/api/python/PythonMLLibAPI.scala | 13 -- python/pyspark/mllib/linalg.py | 49 +--- python/pyspark/mllib/tests.py | 16 +++ 3 files changed, 58 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45c47fa4/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f976d2f..6237b64 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -985,8 +985,10 @@ private[spark] object SerDe extends Serializable { val m: DenseMatrix = obj.asInstanceOf[DenseMatrix] val bytes = new Array[Byte](8 * m.values.size) val order = ByteOrder.nativeOrder() + val isTransposed = if (m.isTransposed) 1 else 0 ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values) + out.write(Opcodes.MARK) out.write(Opcodes.BININT) out.write(PickleUtils.integer_to_bytes(m.numRows)) out.write(Opcodes.BININT) @@ -994,19 +996,22 @@ private[spark] object SerDe extends Serializable { out.write(Opcodes.BINSTRING) out.write(PickleUtils.integer_to_bytes(bytes.length)) out.write(bytes) - out.write(Opcodes.TUPLE3) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(isTransposed)) + out.write(Opcodes.TUPLE) } def construct(args: Array[Object]): Object = { - if (args.length != 3) { -throw new PickleException(should be 3) + if (args.length != 4) { +throw new PickleException(should be 4) } val bytes = getBytes(args(2)) val n = bytes.length / 8 val values = new Array[Double](n) val order = ByteOrder.nativeOrder() ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values) - new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values) + val isTransposed = args(3).asInstanceOf[Int] == 1 + new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values, isTransposed) } } http://git-wip-us.apache.org/repos/asf/spark/blob/45c47fa4/python/pyspark/mllib/linalg.py -- diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index ec8c879..cc9a4cf 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -638,9 +638,10 @@ class Matrix(object): Represents a local matrix. -def __init__(self, numRows, numCols): +def __init__(self, numRows, numCols, isTransposed=False): self.numRows = numRows self.numCols = numCols +self.isTransposed = isTransposed def toArray(self): @@ -662,14 +663,16 @@ class DenseMatrix(Matrix): Column-major dense matrix. -def __init__(self, numRows, numCols, values): -Matrix.__init__(self, numRows, numCols) +def __init__(self, numRows, numCols, values, isTransposed=False): +Matrix.__init__(self, numRows, numCols, isTransposed) values = self._convert_to_array(values, np.float64) assert len(values) == numRows * numCols self.values = values def __reduce__(self): -return DenseMatrix, (self.numRows, self.numCols, self.values.tostring()) +return DenseMatrix, ( +self.numRows, self.numCols,
spark git commit: [SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used
Repository: spark Updated Branches: refs/heads/master 03fd92167 - 6265cba00 [SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used https://issues.apache.org/jira/browse/SPARK-6969 Author: Yin Huai yh...@databricks.com Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits: 1e5142b [Yin Huai] Add todo. 92b2498 [Yin Huai] Minor updates. 367df92 [Yin Huai] Recache data in the command of REFRESH TABLE. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6265cba0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6265cba0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6265cba0 Branch: refs/heads/master Commit: 6265cba00f6141575b4be825735d77d4cea500ab Parents: 03fd921 Author: Yin Huai yh...@databricks.com Authored: Tue Apr 21 14:48:42 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 14:48:42 2015 -0700 -- .../org/apache/spark/sql/sources/ddl.scala | 17 +++ .../spark/sql/hive/CachedTableSuite.scala | 50 +++- 2 files changed, 66 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 2e861b8..78d4941 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { +// Refresh the given table's metadata first. sqlContext.catalog.refreshTable(databaseName, tableName) + +// If this table is cached as a InMemoryColumnarRelation, drop the original +// cached version and make the new version cached lazily. +val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName)) +// Use lookupCachedData directly since RefreshTable also takes databaseName. +val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty +if (isCached) { + // Create a data frame to represent the table. + // TODO: Use uncacheTable once it supports database name. + val df = DataFrame(sqlContext, logicalPlan) + // Uncache the logicalPlan. + sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) + // Cache it again. + sqlContext.cacheManager.cacheQuery(df, Some(tableName)) +} + Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/6265cba0/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index c188264..fc6c3c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.hive +import java.io.File + import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest} +import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest} import org.apache.spark.storage.RDDBlockId +import org.apache.spark.util.Utils class CachedTableSuite extends QueryTest { @@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest { assertCached(table(udfTest)) uncacheTable(udfTest) } + + test(REFRESH TABLE also needs to recache the data (data source tables)) { +val tempPath: File = Utils.createTempDir() +tempPath.delete() +table(src).save(tempPath.toString, parquet, SaveMode.Overwrite) +sql(DROP TABLE IF EXISTS refreshTable) +createExternalTable(refreshTable, tempPath.toString, parquet) +checkAnswer( + table(refreshTable), + table(src).collect()) +// Cache the table. +sql(CACHE TABLE refreshTable) +assertCached(table(refreshTable)) +// Append new data. +table(src).save(tempPath.toString, parquet, SaveMode.Append) +// We are still using the old data. +assertCached(table(refreshTable)) +checkAnswer( + table(refreshTable), + table(src).collect()) +// Refresh the table. +sql(REFRESH TABLE refreshTable) +// We
spark git commit: [SQL][minor] make it more clear that we only need to re-throw GetField exception for UnresolvedAttribute
Repository: spark Updated Branches: refs/heads/master 2e8c6ca47 - 03fd92167 [SQL][minor] make it more clear that we only need to re-throw GetField exception for UnresolvedAttribute For `GetField` outside `UnresolvedAttribute`, we will throw exception in `Analyzer`. Author: Wenchen Fan cloud0...@outlook.com Closes #5588 from cloud-fan/tmp and squashes the following commits: 7ac74d2 [Wenchen Fan] small refactor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03fd9216 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03fd9216 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03fd9216 Branch: refs/heads/master Commit: 03fd92167107f1d061c1a7ef216468b508546ac7 Parents: 2e8c6ca Author: Wenchen Fan cloud0...@outlook.com Authored: Tue Apr 21 14:48:02 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 14:48:02 2015 -0700 -- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03fd9216/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 1155dac..a986dd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -46,12 +46,11 @@ trait CheckAnalysis { operator transformExpressionsUp { case a: Attribute if !a.resolved = if (operator.childrenResolved) { - val nameParts = a match { -case UnresolvedAttribute(nameParts) = nameParts -case _ = Seq(a.name) + a match { +case UnresolvedAttribute(nameParts) = + // Throw errors for specific problems with get field. + operator.resolveChildren(nameParts, resolver, throwErrors = true) } - // Throw errors for specific problems with get field. - operator.resolveChildren(nameParts, resolver, throwErrors = true) } val from = operator.inputSet.map(_.name).mkString(, ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6994] Allow to fetch field values by name in sql.Row
Repository: spark Updated Branches: refs/heads/master 04bf34e34 - 2e8c6ca47 [SPARK-6994] Allow to fetch field values by name in sql.Row It looked weird that up to now there was no way in Spark's Scala API to access fields of `DataFrame/sql.Row` by name, only by their index. This tries to solve this issue. Author: vidmantas zemleris vidman...@vinted.com Closes #5573 from vidma/features/row-with-named-fields and squashes the following commits: 6145ae3 [vidmantas zemleris] [SPARK-6994][SQL] Allow to fetch field values by name on Row 9564ebb [vidmantas zemleris] [SPARK-6994][SQL] Add fieldIndex to schema (StructType) Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e8c6ca4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e8c6ca4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e8c6ca4 Branch: refs/heads/master Commit: 2e8c6ca47df14681c1110f0736234ce76a3eca9b Parents: 04bf34e Author: vidmantas zemleris vidman...@vinted.com Authored: Tue Apr 21 14:47:09 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Tue Apr 21 14:47:09 2015 -0700 -- .../main/scala/org/apache/spark/sql/Row.scala | 32 + .../spark/sql/catalyst/expressions/rows.scala | 2 + .../org/apache/spark/sql/types/dataTypes.scala | 9 +++ .../scala/org/apache/spark/sql/RowTest.scala| 71 .../apache/spark/sql/types/DataTypeSuite.scala | 13 .../scala/org/apache/spark/sql/RowSuite.scala | 10 +++ 6 files changed, 137 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e8c6ca4/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index ac8a782..4190b7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -306,6 +306,38 @@ trait Row extends Serializable { */ def getAs[T](i: Int): T = apply(i).asInstanceOf[T] + /** + * Returns the value of a given fieldName. + * + * @throws UnsupportedOperationException when schema is not defined. + * @throws IllegalArgumentException when fieldName do not exist. + * @throws ClassCastException when data type does not match. + */ + def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName)) + + /** + * Returns the index of a given field name. + * + * @throws UnsupportedOperationException when schema is not defined. + * @throws IllegalArgumentException when fieldName do not exist. + */ + def fieldIndex(name: String): Int = { +throw new UnsupportedOperationException(fieldIndex on a Row without schema is undefined.) + } + + /** + * Returns a Map(name - value) for the requested fieldNames + * + * @throws UnsupportedOperationException when schema is not defined. + * @throws IllegalArgumentException when fieldName do not exist. + * @throws ClassCastException when data type does not match. + */ + def getValuesMap[T](fieldNames: Seq[String]): Map[String, T] = { +fieldNames.map { name = + name - getAs[T](name) +}.toMap + } + override def toString(): String = s[${this.mkString(,)}] /** http://git-wip-us.apache.org/repos/asf/spark/blob/2e8c6ca4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index b6ec7d3..9813734 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -181,6 +181,8 @@ class GenericRowWithSchema(values: Array[Any], override val schema: StructType) /** No-arg constructor for serialization. */ protected def this() = this(null, null) + + override def fieldIndex(name: String): Int = schema.fieldIndex(name) } class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { http://git-wip-us.apache.org/repos/asf/spark/blob/2e8c6ca4/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index a108413..7cd7bd1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++
spark git commit: [SPARK-3386] Share and reuse SerializerInstances in shuffle paths
Repository: spark Updated Branches: refs/heads/master 7662ec23b - f83c0f112 [SPARK-3386] Share and reuse SerializerInstances in shuffle paths This patch modifies several shuffle-related code paths to share and re-use SerializerInstances instead of creating new ones. Some serializers, such as KryoSerializer or SqlSerializer, can be fairly expensive to create or may consume moderate amounts of memory, so it's probably best to avoid unnecessary serializer creation in hot code paths. The key change in this patch is modifying `getDiskWriter()` / `DiskBlockObjectWriter` to accept `SerializerInstance`s instead of `Serializer`s (which are factories for instances). This allows the disk writer's creator to decide whether the serializer instance can be shared or re-used. The rest of the patch modifies several write and read paths to use shared serializers. One big win is in `ShuffleBlockFetcherIterator`, where we used to create a new serializer per received block. Similarly, the shuffle write path used to create a new serializer per file even though in many cases only a single thread would be writing to a file at a time. I made a small serializer reuse optimization in CoarseGrainedExecutorBackend as well, since it seemed like a small and obvious improvement. Author: Josh Rosen joshro...@databricks.com Closes #5606 from JoshRosen/SPARK-3386 and squashes the following commits: f661ce7 [Josh Rosen] Remove thread local; add comment instead 64f8398 [Josh Rosen] Use ThreadLocal for serializer instance in CoarseGrainedExecutorBackend aeb680e [Josh Rosen] [SPARK-3386] Reuse SerializerInstance in shuffle code paths Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f83c0f11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f83c0f11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f83c0f11 Branch: refs/heads/master Commit: f83c0f112d04173f4fc2c5eaf0f9cb11d9180077 Parents: 7662ec2 Author: Josh Rosen joshro...@databricks.com Authored: Tue Apr 21 16:24:15 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 16:24:15 2015 -0700 -- .../spark/executor/CoarseGrainedExecutorBackend.scala | 6 +- .../spark/shuffle/FileShuffleBlockManager.scala | 6 -- .../scala/org/apache/spark/storage/BlockManager.scala | 8 .../org/apache/spark/storage/BlockObjectWriter.scala | 6 +++--- .../spark/storage/ShuffleBlockFetcherIterator.scala | 6 -- .../spark/util/collection/ExternalAppendOnlyMap.scala | 6 ++ .../apache/spark/util/collection/ExternalSorter.scala | 14 +- .../apache/spark/storage/BlockObjectWriterSuite.scala | 6 +++--- 8 files changed, 34 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8300f9f..8af46f3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{SignalLogger, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -47,6 +48,10 @@ private[spark] class CoarseGrainedExecutorBackend( var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None + // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need + // to be changed so that we don't share the serializer instance across threads + private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() + override def onStart() { import scala.concurrent.ExecutionContext.Implicits.global logInfo(Connecting to driver: + driverUrl) @@ -83,7 +88,6 @@ private[spark] class CoarseGrainedExecutorBackend( logError(Received LaunchTask command but executor was null) System.exit(1) } else { -val ser = env.closureSerializer.newInstance() val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo(Got assigned task + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
spark git commit: [minor] [build] Set java options when generating mima ignores.
Repository: spark Updated Branches: refs/heads/master f83c0f112 - a70e849c7 [minor] [build] Set java options when generating mima ignores. The default java options make the call to GenerateMIMAIgnore take forever to run since it's gc'ing all the time. Improve things by setting the perm gen size / max heap size to larger values. Author: Marcelo Vanzin van...@cloudera.com Closes #5615 from vanzin/gen-mima-fix and squashes the following commits: f44e921 [Marcelo Vanzin] [minor] [build] Set java options when generating mima ignores. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a70e849c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a70e849c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a70e849c Branch: refs/heads/master Commit: a70e849c7f9e3df5e86113d45b8c4537597cfb29 Parents: f83c0f1 Author: Marcelo Vanzin van...@cloudera.com Authored: Tue Apr 21 16:35:37 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Tue Apr 21 16:35:37 2015 -0700 -- dev/mima | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a70e849c/dev/mima -- diff --git a/dev/mima b/dev/mima index bed5cd0..2952fa6 100755 --- a/dev/mima +++ b/dev/mima @@ -27,16 +27,21 @@ cd $FWDIR echo -e q\n | build/sbt oldDeps/update rm -f .generated-mima* +generate_mima_ignore() { + SPARK_JAVA_OPTS=-XX:MaxPermSize=1g -Xmx2g \ +./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore +} + # Generate Mima Ignore is called twice, first with latest built jars # on the classpath and then again with previous version jars on the classpath. # Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath # it did not process the new classes (which are in assembly jar). -./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore +generate_mima_ignore export SPARK_CLASSPATH=`find lib_managed \( -name '*spark*jar' -a -type f \) | tr \\n :` echo SPARK_CLASSPATH=$SPARK_CLASSPATH -./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore +generate_mima_ignore echo -e q\n | build/sbt mima-report-binary-issues | grep -v -e info.*Resolving ret_val=$? - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6065] [MLlib] Optimize word2vec.findSynonyms using blas calls
Repository: spark Updated Branches: refs/heads/master a70e849c7 - 7fe6142cd [SPARK-6065] [MLlib] Optimize word2vec.findSynonyms using blas calls 1. Use blas calls to find the dot product between two vectors. 2. Prevent re-computing the L2 norm of the given vector for each word in model. Author: MechCoder manojkumarsivaraj...@gmail.com Closes #5467 from MechCoder/spark-6065 and squashes the following commits: dd0b0b2 [MechCoder] Preallocate wordVectors ffc9240 [MechCoder] Minor 6b74c81 [MechCoder] Switch back to native blas calls da1642d [MechCoder] Explicit types and indexing 64575b0 [MechCoder] Save indexedmap and a wordvecmat instead of matrix fbe0108 [MechCoder] Made the following changes 1. Calculate norms during initialization. 2. Use Blas calls from linalg.blas 1350cf3 [MechCoder] [SPARK-6065] Optimize word2vec.findSynonynms using blas calls Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fe6142c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fe6142c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fe6142c Branch: refs/heads/master Commit: 7fe6142cd3c39ec79899878c3deca9d5130d05b1 Parents: a70e849 Author: MechCoder manojkumarsivaraj...@gmail.com Authored: Tue Apr 21 16:42:45 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Tue Apr 21 16:42:45 2015 -0700 -- .../apache/spark/mllib/feature/Word2Vec.scala | 57 +--- 1 file changed, 51 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fe6142c/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index b2d9053..98e8311 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -34,7 +34,7 @@ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseMatrix, BLAS, DenseVector} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd._ import org.apache.spark.util.Utils @@ -429,7 +429,36 @@ class Word2Vec extends Serializable with Logging { */ @Experimental class Word2VecModel private[mllib] ( -private val model: Map[String, Array[Float]]) extends Serializable with Saveable { +model: Map[String, Array[Float]]) extends Serializable with Saveable { + + // wordList: Ordered list of words obtained from model. + private val wordList: Array[String] = model.keys.toArray + + // wordIndex: Maps each word to an index, which can retrieve the corresponding + //vector from wordVectors (see below). + private val wordIndex: Map[String, Int] = wordList.zip(0 until model.size).toMap + + // vectorSize: Dimension of each word's vector. + private val vectorSize = model.head._2.size + private val numWords = wordIndex.size + + // wordVectors: Array of length numWords * vectorSize, vector corresponding to the word + // mapped with index i can be retrieved by the slice + // (ind * vectorSize, ind * vectorSize + vectorSize) + // wordVecNorms: Array of length numWords, each value being the Euclidean norm + // of the wordVector. + private val (wordVectors: Array[Float], wordVecNorms: Array[Double]) = { +val wordVectors = new Array[Float](vectorSize * numWords) +val wordVecNorms = new Array[Double](numWords) +var i = 0 +while (i numWords) { + val vec = model.get(wordList(i)).get + Array.copy(vec, 0, wordVectors, i * vectorSize, vectorSize) + wordVecNorms(i) = blas.snrm2(vectorSize, vec, 1) + i += 1 +} +(wordVectors, wordVecNorms) + } private def cosineSimilarity(v1: Array[Float], v2: Array[Float]): Double = { require(v1.length == v2.length, Vectors should have the same length) @@ -443,7 +472,7 @@ class Word2VecModel private[mllib] ( override protected def formatVersion = 1.0 def save(sc: SparkContext, path: String): Unit = { -Word2VecModel.SaveLoadV1_0.save(sc, path, model) +Word2VecModel.SaveLoadV1_0.save(sc, path, getVectors) } /** @@ -479,9 +508,23 @@ class Word2VecModel private[mllib] ( */ def findSynonyms(vector: Vector, num: Int): Array[(String, Double)] = { require(num 0, Number of similar words should 0) -// TODO: optimize top-k + val fVector = vector.toArray.map(_.toFloat) -model.mapValues(vec =
spark git commit: [SPARK-6490][Docs] Add docs for rpc configurations
Repository: spark Updated Branches: refs/heads/master a0761ec70 - 3a3f7100f [SPARK-6490][Docs] Add docs for rpc configurations Added docs for rpc configurations and also fixed two places that should have been fixed in #5595. Author: zsxwing zsxw...@gmail.com Closes #5607 from zsxwing/SPARK-6490-docs and squashes the following commits: 25a6736 [zsxwing] Increase the default timeout to 120s 6e37c30 [zsxwing] Update docs 5577540 [zsxwing] Use spark.network.timeout as the default timeout if it presents 4f07174 [zsxwing] Fix unit tests 1c2cf26 [zsxwing] Add docs for rpc configurations Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a3f7100 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a3f7100 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a3f7100 Branch: refs/heads/master Commit: 3a3f7100f4ead9b7ac50e9711ac50b603ebf6bea Parents: a0761ec Author: zsxwing zsxw...@gmail.com Authored: Tue Apr 21 18:37:53 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 18:37:53 2015 -0700 -- .../scala/org/apache/spark/util/RpcUtils.scala | 6 ++-- .../scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 2 +- docs/configuration.md | 34 ++-- 4 files changed, 38 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a3f7100/core/src/main/scala/org/apache/spark/util/RpcUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 5ae793e..f16cc8e 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -48,11 +48,13 @@ object RpcUtils { /** Returns the default Spark timeout to use for RPC ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { -conf.getTimeAsSeconds(spark.rpc.askTimeout, 30s) seconds +conf.getTimeAsSeconds(spark.rpc.askTimeout, + conf.get(spark.network.timeout, 120s)) seconds } /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupTimeout(conf: SparkConf): FiniteDuration = { -conf.getTimeAsSeconds(spark.rpc.lookupTimeout, 30s) seconds +conf.getTimeAsSeconds(spark.rpc.lookupTimeout, + conf.get(spark.network.timeout, 120s)) seconds } } http://git-wip-us.apache.org/repos/asf/spark/blob/3a3f7100/core/src/test/scala/org/apache/spark/SparkConfSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index d7d8014..272e6af 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -227,7 +227,7 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro test(akka deprecated configs) { val conf = new SparkConf() -assert(!conf.contains(spark.rpc.num.retries)) +assert(!conf.contains(spark.rpc.numRetries)) assert(!conf.contains(spark.rpc.retry.wait)) assert(!conf.contains(spark.rpc.askTimeout)) assert(!conf.contains(spark.rpc.lookupTimeout)) http://git-wip-us.apache.org/repos/asf/spark/blob/3a3f7100/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 5fbda37..44c88b0 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -156,7 +156,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { val conf = new SparkConf() conf.set(spark.rpc.retry.wait, 0) -conf.set(spark.rpc.num.retries, 1) +conf.set(spark.rpc.numRetries, 1) val anotherEnv = createRpcEnv(conf, remote, 13345) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(local, env.address, ask-timeout) http://git-wip-us.apache.org/repos/asf/spark/blob/3a3f7100/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index d9e9e67..d587b91 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -963,8 +963,9 @@ Apart from these, the following properties are also available, and may be useful td Default timeout for all network interactions. This config will be
spark git commit: [MINOR] Comment improvements in ExternalSorter.
Repository: spark Updated Branches: refs/heads/master 3a3f7100f - 70f9f8ff3 [MINOR] Comment improvements in ExternalSorter. 1. Clearly specifies the contract/interactions for users of this class. 2. Minor fix in one doc to avoid ambiguity. Author: Patrick Wendell patr...@databricks.com Closes #5620 from pwendell/cleanup and squashes the following commits: 8d8f44f [Patrick Wendell] [Minor] Comment improvements in ExternalSorter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70f9f8ff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70f9f8ff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70f9f8ff Branch: refs/heads/master Commit: 70f9f8ff38560967f2c84de77263a5455c45c495 Parents: 3a3f710 Author: Patrick Wendell patr...@databricks.com Authored: Tue Apr 21 21:04:04 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Tue Apr 21 21:04:04 2015 -0700 -- .../spark/util/collection/ExternalSorter.scala | 27 ++-- 1 file changed, 19 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70f9f8ff/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 79a1a8a..79a695f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -53,7 +53,18 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId} * probably want to pass None as the ordering to avoid extra sorting. On the other hand, if you do * want to do combining, having an Ordering is more efficient than not having it. * - * At a high level, this class works as follows: + * Users interact with this class in the following way: + * + * 1. Instantiate an ExternalSorter. + * + * 2. Call insertAll() with a set of records. + * + * 3. Request an iterator() back to traverse sorted/aggregated records. + * - or - + *Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs + *that can be used in Spark's sort shuffle. + * + * At a high level, this class works internally as follows: * * - We repeatedly fill up buffers of in-memory data, using either a SizeTrackingAppendOnlyMap if * we want to combine by key, or an simple SizeTrackingBuffer if we don't. Inside these buffers, @@ -65,11 +76,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId} * aggregation. For each file, we track how many objects were in each partition in memory, so we * don't have to write out the partition ID for every element. * - * - When the user requests an iterator, the spilled files are merged, along with any remaining - * in-memory data, using the same sort order defined above (unless both sorting and aggregation - * are disabled). If we need to aggregate by key, we either use a total ordering from the - * ordering parameter, or read the keys with the same hash code and compare them with each other - * for equality to merge values. + * - When the user requests an iterator or file output, the spilled files are merged, along with + * any remaining in-memory data, using the same sort order defined above (unless both sorting + * and aggregation are disabled). If we need to aggregate by key, we either use a total ordering + * from the ordering parameter, or read the keys with the same hash code and compare them with + * each other for equality to merge values. * * - Users are expected to call stop() at the end to delete all the intermediate files. * @@ -259,8 +270,8 @@ private[spark] class ExternalSorter[K, V, C]( * Spill our in-memory collection to a sorted file that we can merge later (normal code path). * We add this file into spilledFiles to find it later. * - * Alternatively, if bypassMergeSort is true, we spill to separate files for each partition. - * See spillToPartitionedFiles() for that code path. + * This should not be invoked if bypassMergeSort is true. In that case, spillToPartitionedFiles() + * is used to write files for each partition. * * @param collection whichever collection we're using (map or buffer) */ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6889] [DOCS] CONTRIBUTING.md updates to accompany contribution doc updates
Repository: spark Updated Branches: refs/heads/master 607eff0ed - bdc5c16e7 [SPARK-6889] [DOCS] CONTRIBUTING.md updates to accompany contribution doc updates Part of the SPARK-6889 doc updates, to accompany wiki updates at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark See draft text at https://docs.google.com/document/d/1tB9-f9lmxhC32QlOo4E8Z7eGDwHx1_Q3O8uCmRXQTo8/edit# Author: Sean Owen so...@cloudera.com Closes #5623 from srowen/SPARK-6889 and squashes the following commits: 03773b1 [Sean Owen] Part of the SPARK-6889 doc updates, to accompany wiki updates at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdc5c16e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdc5c16e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdc5c16e Branch: refs/heads/master Commit: bdc5c16e76c5d0bc147408353b2ba4faa8e914fc Parents: 607eff0 Author: Sean Owen so...@cloudera.com Authored: Tue Apr 21 22:34:31 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 22:34:31 2015 -0700 -- CONTRIBUTING.md | 22 +- 1 file changed, 13 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bdc5c16e/CONTRIBUTING.md -- diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b6c6b05..f10d7e2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,12 +1,16 @@ ## Contributing to Spark -Contributions via GitHub pull requests are gladly accepted from their original -author. Along with any pull requests, please state that the contribution is -your original work and that you license the work to the project under the -project's open source license. Whether or not you state this explicitly, by -submitting any copyrighted material via pull request, email, or other means -you agree to license the material under the project's open source license and -warrant that you have the legal authority to do so. +*Before opening a pull request*, review the +[Contributing to Spark wiki](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark). +It lists steps that are required before creating a PR. In particular, consider: + +- Is the change important and ready enough to ask the community to spend time reviewing? +- Have you searched for existing, related JIRAs and pull requests? +- Is this a new feature that can stand alone as a package on http://spark-packages.org ? +- Is the change being proposed clearly explained and motivated? -Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark) -for more information. +When you contribute code, you affirm that the contribution is your original work and that you +license the work to the project under the project's open source license. Whether or not you +state this explicitly, by submitting any copyrighted material via pull request, email, or +other means you agree to license the material under the project's open source license and +warrant that you have the legal authority to do so. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6113] [ML] Small cleanups after original tree API PR
Repository: spark Updated Branches: refs/heads/master 70f9f8ff3 - 607eff0ed [SPARK-6113] [ML] Small cleanups after original tree API PR This does a few clean-ups. With this PR, all spark.ml tree components have ```private[ml]``` constructors. CC: mengxr Author: Joseph K. Bradley jos...@databricks.com Closes #5567 from jkbradley/dt-api-dt2 and squashes the following commits: 2263b5b [Joseph K. Bradley] Added note about tree example issue. bb9f610 [Joseph K. Bradley] Small cleanups after original tree API PR Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/607eff0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/607eff0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/607eff0e Branch: refs/heads/master Commit: 607eff0edfc10a1473fa9713a0500bf09f105c13 Parents: 70f9f8f Author: Joseph K. Bradley jos...@databricks.com Authored: Tue Apr 21 21:44:44 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 21:44:44 2015 -0700 -- .../spark/examples/ml/DecisionTreeExample.scala | 25 +++- .../apache/spark/ml/impl/tree/treeParams.scala | 4 ++-- .../scala/org/apache/spark/ml/tree/Split.scala | 7 +++--- 3 files changed, 25 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/607eff0e/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index 921b396..2cd515c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -44,6 +44,13 @@ import org.apache.spark.sql.{SQLContext, DataFrame} * {{{ * ./bin/run-example ml.DecisionTreeExample [options] * }}} + * Note that Decision Trees can take a large amount of memory. If the run-example command above + * fails, try running via spark-submit and specifying the amount of memory as at least 1g. + * For local mode, run + * {{{ + * ./bin/spark-submit --class org.apache.spark.examples.ml.DecisionTreeExample --driver-memory 1g + * [examples JAR path] [options] + * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object DecisionTreeExample { @@ -70,7 +77,7 @@ object DecisionTreeExample { val parser = new OptionParser[Params](DecisionTreeExample) { head(DecisionTreeExample: an example decision tree app.) opt[String](algo) -.text(salgorithm (Classification, Regression), default: ${defaultParams.algo}) +.text(salgorithm (classification, regression), default: ${defaultParams.algo}) .action((x, c) = c.copy(algo = x)) opt[Int](maxDepth) .text(smax depth of the tree, default: ${defaultParams.maxDepth}) @@ -222,18 +229,23 @@ object DecisionTreeExample { // (1) For classification, re-index classes. val labelColName = if (algo == classification) indexedLabel else label if (algo == classification) { - val labelIndexer = new StringIndexer().setInputCol(labelString).setOutputCol(labelColName) + val labelIndexer = new StringIndexer() +.setInputCol(labelString) +.setOutputCol(labelColName) stages += labelIndexer } // (2) Identify categorical features using VectorIndexer. // Features with more than maxCategories values will be treated as continuous. -val featuresIndexer = new VectorIndexer().setInputCol(features) - .setOutputCol(indexedFeatures).setMaxCategories(10) +val featuresIndexer = new VectorIndexer() + .setInputCol(features) + .setOutputCol(indexedFeatures) + .setMaxCategories(10) stages += featuresIndexer // (3) Learn DecisionTree val dt = algo match { case classification = -new DecisionTreeClassifier().setFeaturesCol(indexedFeatures) +new DecisionTreeClassifier() + .setFeaturesCol(indexedFeatures) .setLabelCol(labelColName) .setMaxDepth(params.maxDepth) .setMaxBins(params.maxBins) @@ -242,7 +254,8 @@ object DecisionTreeExample { .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) case regression = -new DecisionTreeRegressor().setFeaturesCol(indexedFeatures) +new DecisionTreeRegressor() + .setFeaturesCol(indexedFeatures) .setLabelCol(labelColName) .setMaxDepth(params.maxDepth) .setMaxBins(params.maxBins)
spark git commit: [Minor][MLLIB] Fix a minor formatting bug in toString method in Node.scala
Repository: spark Updated Branches: refs/heads/branch-1.3 fd61820d3 - 4508f0189 [Minor][MLLIB] Fix a minor formatting bug in toString method in Node.scala add missing comma and space Author: Alain a...@usc.edu Closes #5621 from AiHe/tree-node-issue and squashes the following commits: 159a7bb [Alain] [Minor][MLLIB] Fix a minor formatting bug in toString methods in Node.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4508f018 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4508f018 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4508f018 Branch: refs/heads/branch-1.3 Commit: 4508f01890a723f80d631424ff8eda166a13a727 Parents: fd61820 Author: Alain a...@usc.edu Authored: Tue Apr 21 16:46:17 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 16:46:17 2015 -0700 -- mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4508f018/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala index d961081..0f28860 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala @@ -51,7 +51,7 @@ class Node ( var stats: Option[InformationGainStats]) extends Serializable with Logging { override def toString = id = + id + , isLeaf = + isLeaf + , predict = + predict + , + -impurity = + impurity + split = + split + , stats = + stats +impurity = + impurity + , split = + split + , stats = + stats /** * build the left node and right nodes if not leaf - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Repository: spark Updated Branches: refs/heads/master b063a61b9 - e72c16e30 [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races. This change adds some new utility code to handle shutdown hooks in Spark. The main goal is to take advantage of Hadoop 2.x's API for shutdown hooks, which allows Spark to register a hook that will run before the one that cleans up HDFS clients, and thus avoids some races that would cause exceptions to show up and other issues such as failure to properly close event logs. Unfortunately, Hadoop 1.x does not have such APIs, so in that case correctness is still left to chance. Author: Marcelo Vanzin van...@cloudera.com Closes #5560 from vanzin/SPARK-6014 and squashes the following commits: edfafb1 [Marcelo Vanzin] Better scaladoc. fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014 e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e72c16e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e72c16e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e72c16e3 Branch: refs/heads/master Commit: e72c16e30d85cdc394d318b5551698885cfda9b8 Parents: b063a61 Author: Marcelo Vanzin van...@cloudera.com Authored: Tue Apr 21 20:33:57 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Tue Apr 21 20:33:57 2015 -0400 -- .../spark/deploy/history/HistoryServer.scala| 6 +- .../spark/deploy/worker/ExecutorRunner.scala| 12 +- .../apache/spark/storage/DiskBlockManager.scala | 18 +-- .../spark/storage/TachyonBlockManager.scala | 24 ++-- .../scala/org/apache/spark/util/Utils.scala | 136 --- .../org/apache/spark/util/UtilsSuite.scala | 32 +++-- .../hive/thriftserver/HiveThriftServer2.scala | 9 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 9 +- .../spark/deploy/yarn/ApplicationMaster.scala | 63 - 9 files changed, 195 insertions(+), 114 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 72f6048..56bef57 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.SignalLogger +import org.apache.spark.util.{SignalLogger, Utils} /** * A web server that renders SparkUIs of completed applications. @@ -194,9 +194,7 @@ object HistoryServer extends Logging { val server = new HistoryServer(conf, provider, securityManager, port) server.bind() -Runtime.getRuntime().addShutdownHook(new Thread(HistoryServerStopper) { - override def run(): Unit = server.stop() -}) +Utils.addShutdownHook { () = server.stop() } // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7d5acab..7aa85b7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -28,6 +28,7 @@ import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.util.Utils import org.apache.spark.util.logging.FileAppender /** @@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner( // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might // make sense to remove this in the future. - private var shutdownHook: Thread = null + private var shutdownHook: AnyRef = null private[worker] def start() { workerThread = new Thread(ExecutorRunner for + fullId) { @@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner( }
spark git commit: Closes #5427
Repository: spark Updated Branches: refs/heads/master 3134c3fe4 - 41ef78a94 Closes #5427 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/41ef78a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/41ef78a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/41ef78a9 Branch: refs/heads/master Commit: 41ef78a94105bb995bb14d15d47cbb6ca1638f62 Parents: 3134c3f Author: Reynold Xin r...@databricks.com Authored: Tue Apr 21 17:52:52 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 17:52:52 2015 -0700 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-1684] [PROJECT INFRA] Merge script should standardize SPARK-XXX prefix
Repository: spark Updated Branches: refs/heads/master 41ef78a94 - a0761ec70 [SPARK-1684] [PROJECT INFRA] Merge script should standardize SPARK-XXX prefix Cleans up the pull request title in the merge script to follow conventions outlined in the wiki under Contributing Code. https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingCode [MODULE] SPARK-: Description Author: texasmichelle texasmiche...@gmail.com Closes #5149 from texasmichelle/master and squashes the following commits: 9b6b0a7 [texasmichelle] resolved variable scope issue 7d5fa20 [texasmichelle] only prompt if title has been modified 8c195bb [texasmichelle] removed erroneous line 4f1ed46 [texasmichelle] Deque removal, logic simplifications, prompt user to pick a title (orig or modified) df73f6a [texasmichelle] reworked regex's to enforce brackets around JIRA ref 43b5aed [texasmichelle] Merge remote-tracking branch 'apache/master' 25229c6 [texasmichelle] Merge remote-tracking branch 'apache/master' aa20a6e [texasmichelle] Move code into main() and add doctest for new text parsing method 48520ba [texasmichelle] SPARK-1684: Corrected import statement 042099d [texasmichelle] SPARK-1684 Merge script should standardize SPARK-XXX prefix 8f4a7d1 [texasmichelle] SPARK-1684 Merge script should standardize SPARK-XXX prefix Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0761ec7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0761ec7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0761ec7 Branch: refs/heads/master Commit: a0761ec7063f984dcadc8d154f83dd9cfd1c5e0b Parents: 41ef78a Author: texasmichelle texasmiche...@gmail.com Authored: Tue Apr 21 18:08:29 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Tue Apr 21 18:08:29 2015 -0700 -- dev/merge_spark_pr.py | 199 +++-- 1 file changed, 140 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0761ec7/dev/merge_spark_pr.py -- diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index 3062e9c..b69cd15 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -55,8 +55,6 @@ JIRA_API_BASE = https://issues.apache.org/jira; # Prefix added to temporary branches BRANCH_PREFIX = PR_TOOL -os.chdir(SPARK_HOME) - def get_json(url): try: @@ -85,10 +83,6 @@ def continue_maybe(prompt): if result.lower() != y: fail(Okay, exiting) - -original_head = run_cmd(git rev-parse HEAD)[:8] - - def clean_up(): print Restoring head pointer to %s % original_head run_cmd(git checkout %s % original_head) @@ -101,7 +95,7 @@ def clean_up(): # merge the requested PR and return the merge hash -def merge_pr(pr_num, target_ref): +def merge_pr(pr_num, target_ref, title, body, pr_repo_desc): pr_branch_name = %s_MERGE_PR_%s % (BRANCH_PREFIX, pr_num) target_branch_name = %s_MERGE_PR_%s_%s % (BRANCH_PREFIX, pr_num, target_ref.upper()) run_cmd(git fetch %s pull/%s/head:%s % (PR_REMOTE_NAME, pr_num, pr_branch_name)) @@ -274,7 +268,7 @@ def resolve_jira_issue(merge_branches, comment, default_jira_id=): asf_jira.transition_issue( jira_id, resolve[id], fixVersions=jira_fix_versions, comment=comment) -print Succesfully resolved %s with fixVersions=%s! % (jira_id, fix_versions) +print Successfully resolved %s with fixVersions=%s! % (jira_id, fix_versions) def resolve_jira_issues(title, merge_branches, comment): @@ -286,68 +280,155 @@ def resolve_jira_issues(title, merge_branches, comment): resolve_jira_issue(merge_branches, comment, jira_id) -branches = get_json(%s/branches % GITHUB_API_BASE) -branch_names = filter(lambda x: x.startswith(branch-), [x['name'] for x in branches]) -# Assumes branch names can be sorted lexicographically -latest_branch = sorted(branch_names, reverse=True)[0] - -pr_num = raw_input(Which pull request would you like to merge? (e.g. 34): ) -pr = get_json(%s/pulls/%s % (GITHUB_API_BASE, pr_num)) -pr_events = get_json(%s/issues/%s/events % (GITHUB_API_BASE, pr_num)) +def standardize_jira_ref(text): + +Standardize the [SPARK-X] [MODULE] prefix +Converts [SPARK-XXX][mllib] Issue, [MLLib] SPARK-XXX. Issue or SPARK XXX [MLLIB]: Issue to [SPARK-XXX] [MLLIB] Issue + + standardize_jira_ref([SPARK-5821] [SQL] ParquetRelation2 CTAS should check if delete is successful) +'[SPARK-5821] [SQL] ParquetRelation2 CTAS should check if delete is successful' + standardize_jira_ref([SPARK-4123][Project Infra][WIP]: Show new dependencies added in pull requests) +'[SPARK-4123] [PROJECT INFRA] [WIP] Show new dependencies added in pull
spark git commit: [SPARK-7036][MLLIB] ALS.train should support DataFrames in PySpark
Repository: spark Updated Branches: refs/heads/master 7fe6142cd - 686dd742e [SPARK-7036][MLLIB] ALS.train should support DataFrames in PySpark SchemaRDD works with ALS.train in 1.2, so we should continue support DataFrames for compatibility. coderxiang Author: Xiangrui Meng m...@databricks.com Closes #5619 from mengxr/SPARK-7036 and squashes the following commits: dfcaf5a [Xiangrui Meng] ALS.train should support DataFrames in PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/686dd742 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/686dd742 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/686dd742 Branch: refs/heads/master Commit: 686dd742e11f6ad0078b7ff9b30b83a118fd8109 Parents: 7fe6142 Author: Xiangrui Meng m...@databricks.com Authored: Tue Apr 21 16:44:52 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 16:44:52 2015 -0700 -- python/pyspark/mllib/recommendation.py | 36 + 1 file changed, 26 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/686dd742/python/pyspark/mllib/recommendation.py -- diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index 80e0a35..4b7d17d 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -22,6 +22,7 @@ from pyspark import SparkContext from pyspark.rdd import RDD from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc from pyspark.mllib.util import JavaLoader, JavaSaveable +from pyspark.sql import DataFrame __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] @@ -78,18 +79,23 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader): True model = ALS.train(ratings, 1, nonnegative=True, seed=10) - model.predict(2,2) + model.predict(2, 2) +3.8... + + df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)]) + model = ALS.train(df, 1, nonnegative=True, seed=10) + model.predict(2, 2) 3.8... model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10) - model.predict(2,2) + model.predict(2, 2) 0.4... import os, tempfile path = tempfile.mkdtemp() model.save(sc, path) sameModel = MatrixFactorizationModel.load(sc, path) - sameModel.predict(2,2) + sameModel.predict(2, 2) 0.4... sameModel.predictAll(testset).collect() [Rating(... @@ -125,13 +131,20 @@ class ALS(object): @classmethod def _prepare(cls, ratings): -assert isinstance(ratings, RDD), ratings should be RDD +if isinstance(ratings, RDD): +pass +elif isinstance(ratings, DataFrame): +ratings = ratings.rdd +else: +raise TypeError(Ratings should be represented by either an RDD or a DataFrame, +but got %s. % type(ratings)) first = ratings.first() -if not isinstance(first, Rating): -if isinstance(first, (tuple, list)): -ratings = ratings.map(lambda x: Rating(*x)) -else: -raise ValueError(rating should be RDD of Rating or tuple/list) +if isinstance(first, Rating): +pass +elif isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise TypeError(Expect a Rating or a tuple/list, but got %s. % type(first)) return ratings @classmethod @@ -152,8 +165,11 @@ class ALS(object): def _test(): import doctest import pyspark.mllib.recommendation +from pyspark.sql import SQLContext globs = pyspark.mllib.recommendation.__dict__.copy() -globs['sc'] = SparkContext('local[4]', 'PythonTest') +sc = SparkContext('local[4]', 'PythonTest') +globs['sc'] = sc +globs['sqlContext'] = SQLContext(sc) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7036][MLLIB] ALS.train should support DataFrames in PySpark
Repository: spark Updated Branches: refs/heads/branch-1.3 948f2f635 - fd61820d3 [SPARK-7036][MLLIB] ALS.train should support DataFrames in PySpark SchemaRDD works with ALS.train in 1.2, so we should continue support DataFrames for compatibility. coderxiang Author: Xiangrui Meng m...@databricks.com Closes #5619 from mengxr/SPARK-7036 and squashes the following commits: dfcaf5a [Xiangrui Meng] ALS.train should support DataFrames in PySpark (cherry picked from commit 686dd742e11f6ad0078b7ff9b30b83a118fd8109) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd61820d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd61820d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd61820d Branch: refs/heads/branch-1.3 Commit: fd61820d3c29d9018c8b0c70cba23abede09b162 Parents: 948f2f6 Author: Xiangrui Meng m...@databricks.com Authored: Tue Apr 21 16:44:52 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Tue Apr 21 16:45:00 2015 -0700 -- python/pyspark/mllib/recommendation.py | 36 + 1 file changed, 26 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd61820d/python/pyspark/mllib/recommendation.py -- diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index c5c4c13..71c23c9 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -21,6 +21,7 @@ from pyspark import SparkContext from pyspark.rdd import RDD from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc from pyspark.mllib.util import JavaLoader, JavaSaveable +from pyspark.sql import DataFrame __all__ = ['MatrixFactorizationModel', 'ALS', 'Rating'] @@ -77,18 +78,23 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader): True model = ALS.train(ratings, 1, nonnegative=True, seed=10) - model.predict(2,2) + model.predict(2, 2) +3.8... + + df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)]) + model = ALS.train(df, 1, nonnegative=True, seed=10) + model.predict(2, 2) 3.8... model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10) - model.predict(2,2) + model.predict(2, 2) 0.4... import os, tempfile path = tempfile.mkdtemp() model.save(sc, path) sameModel = MatrixFactorizationModel.load(sc, path) - sameModel.predict(2,2) + sameModel.predict(2, 2) 0.4... sameModel.predictAll(testset).collect() [Rating(... @@ -124,13 +130,20 @@ class ALS(object): @classmethod def _prepare(cls, ratings): -assert isinstance(ratings, RDD), ratings should be RDD +if isinstance(ratings, RDD): +pass +elif isinstance(ratings, DataFrame): +ratings = ratings.rdd +else: +raise TypeError(Ratings should be represented by either an RDD or a DataFrame, +but got %s. % type(ratings)) first = ratings.first() -if not isinstance(first, Rating): -if isinstance(first, (tuple, list)): -ratings = ratings.map(lambda x: Rating(*x)) -else: -raise ValueError(rating should be RDD of Rating or tuple/list) +if isinstance(first, Rating): +pass +elif isinstance(first, (tuple, list)): +ratings = ratings.map(lambda x: Rating(*x)) +else: +raise TypeError(Expect a Rating or a tuple/list, but got %s. % type(first)) return ratings @classmethod @@ -151,8 +164,11 @@ class ALS(object): def _test(): import doctest import pyspark.mllib.recommendation +from pyspark.sql import SQLContext globs = pyspark.mllib.recommendation.__dict__.copy() -globs['sc'] = SparkContext('local[4]', 'PythonTest') +sc = SparkContext('local[4]', 'PythonTest') +globs['sc'] = sc +globs['sqlContext'] = SQLContext(sc) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6953] [PySpark] speed up python tests
Repository: spark Updated Branches: refs/heads/master e72c16e30 - 3134c3fe4 [SPARK-6953] [PySpark] speed up python tests This PR try to speed up some python tests: ``` tests.py 144s - 103s -41s mllib/classification.py 24s - 17s-7s mllib/regression.py 27s - 15s -12s mllib/tree.py 27s - 13s -14s mllib/tests.py 64s - 31s -33s streaming/tests.py 185s - 84s -101s ``` Considering python3, the total saving will be 558s (almost 10 minutes) (core, and streaming run three times, mllib runs twice). During testing, it will show used time for each test file: ``` Run core tests ... Running test: pyspark/rdd.py ... ok (22s) Running test: pyspark/context.py ... ok (16s) Running test: pyspark/conf.py ... ok (4s) Running test: pyspark/broadcast.py ... ok (4s) Running test: pyspark/accumulators.py ... ok (4s) Running test: pyspark/serializers.py ... ok (6s) Running test: pyspark/profiler.py ... ok (5s) Running test: pyspark/shuffle.py ... ok (1s) Running test: pyspark/tests.py ... ok (103s) 144s ``` Author: Reynold Xin r...@databricks.com Author: Xiangrui Meng m...@databricks.com Closes #5605 from rxin/python-tests-speed and squashes the following commits: d08542d [Reynold Xin] Merge pull request #14 from mengxr/SPARK-6953 89321ee [Xiangrui Meng] fix seed in tests 3ad2387 [Reynold Xin] Merge pull request #5427 from davies/python_tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134c3fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134c3fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134c3fe Branch: refs/heads/master Commit: 3134c3fe495862b7687b5aa00d3344d09cd5e08e Parents: e72c16e Author: Reynold Xin r...@databricks.com Authored: Tue Apr 21 17:49:55 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Tue Apr 21 17:49:55 2015 -0700 -- python/pyspark/mllib/classification.py | 17 ++--- python/pyspark/mllib/regression.py | 25 +--- python/pyspark/mllib/tests.py | 69 +++-- python/pyspark/mllib/tree.py | 15 ++--- python/pyspark/shuffle.py | 7 ++- python/pyspark/sql/tests.py| 4 +- python/pyspark/streaming/tests.py | 63 +++ python/pyspark/tests.py| 96 ++--- python/run-tests | 13 ++-- 9 files changed, 182 insertions(+), 127 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3134c3fe/python/pyspark/mllib/classification.py -- diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index eda0b60..a70c664 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -86,7 +86,7 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(0.0, [0.0, 1.0]), ... LabeledPoint(1.0, [1.0, 0.0]), ... ] - lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) + lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10) lrm.predict([1.0, 0.0]) 1 lrm.predict([0.0, 1.0]) @@ -95,7 +95,7 @@ class LogisticRegressionModel(LinearClassificationModel): [1, 0] lrm.clearThreshold() lrm.predict([0.0, 1.0]) -0.123... +0.279... sparse_data = [ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})), @@ -103,7 +103,7 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(0.0, SparseVector(2, {0: 1.0})), ... LabeledPoint(1.0, SparseVector(2, {1: 2.0})) ... ] - lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) + lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10) lrm.predict(array([0.0, 1.0])) 1 lrm.predict(array([1.0, 0.0])) @@ -129,7 +129,8 @@ class LogisticRegressionModel(LinearClassificationModel): ... LabeledPoint(1.0, [1.0, 0.0, 0.0]), ... LabeledPoint(2.0, [0.0, 0.0, 1.0]) ... ] - mcm = LogisticRegressionWithLBFGS.train(data=sc.parallelize(multi_class_data), numClasses=3) + data = sc.parallelize(multi_class_data) + mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3) mcm.predict([0.0, 0.5, 0.0]) 0 mcm.predict([0.8, 0.0, 0.0]) @@ -298,7 +299,7 @@ class LogisticRegressionWithLBFGS(object): ... LabeledPoint(0.0, [0.0, 1.0]), ... LabeledPoint(1.0, [1.0, 0.0]), ... ] - lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data)) + lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data),
spark git commit: [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError
Repository: spark Updated Branches: refs/heads/branch-1.3 8549ff4f0 - 948f2f635 [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch. However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass. Author: David McGuire david.mcgui...@nike.com Closes #5559 from dmcguire81/master and squashes the following commits: d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in timing 8be6934 [David McGuire] Fix spacing per code review 90e98b9 [David McGuire] Address scalastyle errors 29011bd [David McGuire] Further ratchet down the error margins b33b796 [David McGuire] Eliminate dependency on even distribution by BlockGenerator 8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code 70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is system-dependent 82ee46d [David McGuire] Replace guard clause with nested conditional 2794717 [David McGuire] Replace the RateLimiter with the Guava implementation 38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail 24b1bc0 [David McGuire] Fix truncation in integer division causing infinite recursion d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s (cherry picked from commit 5fea3e5c36450658d8b767dd3c06dac2251a0e0c) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/948f2f63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/948f2f63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/948f2f63 Branch: refs/heads/branch-1.3 Commit: 948f2f635d17cda0b8e1ef48c467ebb807ea1a3d Parents: 8549ff4 Author: David McGuire david.mcgui...@nike.com Authored: Tue Apr 21 07:21:10 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Tue Apr 21 07:23:00 2015 -0400 -- .../spark/streaming/receiver/RateLimiter.scala | 33 +++- .../apache/spark/streaming/ReceiverSuite.scala | 29 ++--- 2 files changed, 21 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/948f2f63/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index e4f6ba6..97db9de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.receiver import org.apache.spark.{Logging, SparkConf} -import java.util.concurrent.TimeUnit._ +import com.google.common.util.concurrent.{RateLimiter=GuavaRateLimiter} /** Provides waitToPush() method to limit the rate at which receivers consume data. * @@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._ */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private var lastSyncTime = System.nanoTime - private var messagesWrittenSinceSync = 0L private val desiredRate = conf.getInt(spark.streaming.receiver.maxRate, 0) - private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) def waitToPush() { -if( desiredRate = 0 ) { - return -} -val now = System.nanoTime -val elapsedNanosecs = math.max(now - lastSyncTime, 1) -val rate = messagesWrittenSinceSync.toDouble * 10 / elapsedNanosecs -if (rate desiredRate) { - // It's okay to write; just update some variables and return - messagesWrittenSinceSync += 1 - if (now lastSyncTime + SYNC_INTERVAL) { -// Sync interval has passed; let's resync -lastSyncTime = now -messagesWrittenSinceSync = 1 - } -} else { - // Calculate how much
spark git commit: [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError
Repository: spark Updated Branches: refs/heads/master 1f2f723b0 - 5fea3e5c3 [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch. However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass. Author: David McGuire david.mcgui...@nike.com Closes #5559 from dmcguire81/master and squashes the following commits: d29d2e0 [David McGuire] Back out to +/-5% error margins, for flexibility in timing 8be6934 [David McGuire] Fix spacing per code review 90e98b9 [David McGuire] Address scalastyle errors 29011bd [David McGuire] Further ratchet down the error margins b33b796 [David McGuire] Eliminate dependency on even distribution by BlockGenerator 8f2934b [David McGuire] Remove arbitrary thread timing / cooperation code 70ee310 [David McGuire] Use Thread.yield(), since Thread.sleep(0) is system-dependent 82ee46d [David McGuire] Replace guard clause with nested conditional 2794717 [David McGuire] Replace the RateLimiter with the Guava implementation 38f3ca8 [David McGuire] Ratchet down the error rate to +/- 5%; tests fail 24b1bc0 [David McGuire] Fix truncation in integer division causing infinite recursion d6e1079 [David McGuire] Stack overflow error in RateLimiter on rates over 1000/s Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fea3e5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fea3e5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fea3e5c Branch: refs/heads/master Commit: 5fea3e5c36450658d8b767dd3c06dac2251a0e0c Parents: 1f2f723 Author: David McGuire david.mcgui...@nike.com Authored: Tue Apr 21 07:21:10 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Tue Apr 21 07:21:10 2015 -0400 -- .../spark/streaming/receiver/RateLimiter.scala | 33 +++- .../apache/spark/streaming/ReceiverSuite.scala | 29 ++--- 2 files changed, 21 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5fea3e5c/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index e4f6ba6..97db9de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.receiver import org.apache.spark.{Logging, SparkConf} -import java.util.concurrent.TimeUnit._ +import com.google.common.util.concurrent.{RateLimiter=GuavaRateLimiter} /** Provides waitToPush() method to limit the rate at which receivers consume data. * @@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._ */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { - private var lastSyncTime = System.nanoTime - private var messagesWrittenSinceSync = 0L private val desiredRate = conf.getInt(spark.streaming.receiver.maxRate, 0) - private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS) + private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate) def waitToPush() { -if( desiredRate = 0 ) { - return -} -val now = System.nanoTime -val elapsedNanosecs = math.max(now - lastSyncTime, 1) -val rate = messagesWrittenSinceSync.toDouble * 10 / elapsedNanosecs -if (rate desiredRate) { - // It's okay to write; just update some variables and return - messagesWrittenSinceSync += 1 - if (now lastSyncTime + SYNC_INTERVAL) { -// Sync interval has passed; let's resync -lastSyncTime = now -messagesWrittenSinceSync = 1 - } -} else { - // Calculate how much time we should sleep to bring ourselves to the desired rate. - val targetTimeInMillis = messagesWrittenSinceSync *