spark git commit: [SPARK-6856] [R] Make RDD information more useful in SparkR
Repository: spark Updated Branches: refs/heads/master 998aac21f - 7078f6028 [SPARK-6856] [R] Make RDD information more useful in SparkR Author: Jeff Harrison jeffrharri...@gmail.com Closes #5667 from His-name-is-Joof/joofspark and squashes the following commits: f8814a6 [Jeff Harrison] newline added after RDD show() output 4d9d972 [Jeff Harrison] Merge branch 'master' into joofspark 9d2295e [Jeff Harrison] parallelize with 1:10 878b830 [Jeff Harrison] Merge branch 'master' into joofspark c8c0b80 [Jeff Harrison] add test for RDD function show() 123be65 [Jeff Harrison] SPARK-6856 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7078f602 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7078f602 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7078f602 Branch: refs/heads/master Commit: 7078f6028bf012235c664b02ec3541cbb0a248a7 Parents: 998aac2 Author: Jeff Harrison jeffrharri...@gmail.com Authored: Mon Apr 27 13:38:25 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Mon Apr 27 13:38:25 2015 -0700 -- R/pkg/R/RDD.R | 5 + R/pkg/inst/tests/test_rdd.R | 5 + 2 files changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7078f602/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 1662d6b..f90c26b 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -66,6 +66,11 @@ setMethod(initialize, RDD, function(.Object, jrdd, serializedMode, .Object }) +setMethod(show, RDD, + function(.Object) { + cat(paste(callJMethod(.Object@jrdd, toString), \n, sep=)) + }) + setMethod(initialize, PipelinedRDD, function(.Object, prev, func, jrdd_val) { .Object@env - new.env() .Object@env$isCached - FALSE http://git-wip-us.apache.org/repos/asf/spark/blob/7078f602/R/pkg/inst/tests/test_rdd.R -- diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R index d55af93..0320735 100644 --- a/R/pkg/inst/tests/test_rdd.R +++ b/R/pkg/inst/tests/test_rdd.R @@ -759,6 +759,11 @@ test_that(collectAsMap() on a pairwise RDD, { expect_equal(vals, list(`1` = a, `2` = b)) }) +test_that(show(), { + rdd - parallelize(sc, list(1:10)) + expect_output(show(rdd), ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+) +}) + test_that(sampleByKey() on pairwise RDDs, { rdd - parallelize(sc, 1:2000) pairsRDD - lapply(rdd, function(x) { if (x %% 2 == 0) list(a, x) else list(b, x) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat...
Repository: spark Updated Branches: refs/heads/master 7078f6028 - ef82bddc1 SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat... py Author: tedyu yuzhih...@gmail.com Closes #5673 from tedyu/master and squashes the following commits: ab7c72b [tedyu] SPARK-7107 Adjust indentation to pass Python style tests 6e25939 [tedyu] Adjust line length to be shorter than 100 characters 18d172a [tedyu] SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef82bddc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef82bddc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef82bddc Branch: refs/heads/master Commit: ef82bddc11d1aea42e22d2f85613a869cbe9a990 Parents: 7078f60 Author: tedyu yuzhih...@gmail.com Authored: Mon Apr 27 14:42:40 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Apr 27 14:42:40 2015 -0700 -- examples/src/main/python/hbase_inputformat.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef82bddc/examples/src/main/python/hbase_inputformat.py -- diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index e17819d..5b82a14 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -54,8 +54,9 @@ if __name__ == __main__: Run with example jar: ./bin/spark-submit --driver-class-path /path/to/example/jar \ -/path/to/examples/hbase_inputformat.py host table +/path/to/examples/hbase_inputformat.py host table [znode] Assumes you have some data in HBase already, running on host, in table + optionally, you can specify parent znode for your hbase cluster - znode , file=sys.stderr) exit(-1) @@ -64,6 +65,9 @@ if __name__ == __main__: sc = SparkContext(appName=HBaseInputFormat) conf = {hbase.zookeeper.quorum: host, hbase.mapreduce.inputtable: table} +if len(sys.argv) 3: +conf = {hbase.zookeeper.quorum: host, zookeeper.znode.parent: sys.argv[3], +hbase.mapreduce.inputtable: table} keyConv = org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter valueConv = org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper
Repository: spark Updated Branches: refs/heads/master d188b8bad - 82bb7fd41 [SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper according lianchengâs comment in https://issues.apache.org/jira/browse/SPARK-6505, this patch remove the reflection call in HiveFunctionWrapper, and implement the functions named deserializeObjectByKryo and serializeObjectByKryo according the functions with the save name in org.apache.hadoop.hive.ql.exec.Utilities.java Author: baishuo vc_j...@hotmail.com Closes #5660 from baishuo/SPARK-6505-20150423 and squashes the following commits: ae61ec4 [baishuo] modify code style 78d9fa3 [baishuo] modify code style 0b522a7 [baishuo] modify code style a5ff9c7 [baishuo] Remove the reflection call in HiveFunctionWrapper Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82bb7fd4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82bb7fd4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82bb7fd4 Branch: refs/heads/master Commit: 82bb7fd41a2c7992e0aea69623c504bd439744f7 Parents: d188b8b Author: baishuo vc_j...@hotmail.com Authored: Mon Apr 27 14:08:05 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Mon Apr 27 14:08:05 2015 +0800 -- .../org/apache/spark/sql/hive/Shim13.scala | 44 ++-- 1 file changed, 22 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82bb7fd4/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala -- diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala index d331c21..dbc5e02 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala @@ -19,11 +19,15 @@ package org.apache.spark.sql.hive import java.rmi.server.UID import java.util.{Properties, ArrayList = JArrayList} +import java.io.{OutputStream, InputStream} import scala.collection.JavaConversions._ import scala.language.implicitConversions +import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.Input +import com.esotericsoftware.kryo.io.Output import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst @@ -46,6 +50,7 @@ import org.apache.hadoop.{io = hadoopIo} import org.apache.spark.Logging import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String} +import org.apache.spark.util.Utils._ /** * This class provides the UDF creation and also the UDF instance serialization and @@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String) // for Serialization def this() = this(null) - import org.apache.spark.util.Utils._ - @transient - private val methodDeSerialize = { -val method = classOf[Utilities].getDeclaredMethod( - deserializeObjectByKryo, - classOf[Kryo], - classOf[java.io.InputStream], - classOf[Class[_]]) -method.setAccessible(true) - -method + def deserializeObjectByKryo[T: ClassTag]( + kryo: Kryo, + in: InputStream, + clazz: Class[_]): T = { +val inp = new Input(in) +val t: T = kryo.readObject(inp,clazz).asInstanceOf[T] +inp.close() +t } @transient - private val methodSerialize = { -val method = classOf[Utilities].getDeclaredMethod( - serializeObjectByKryo, - classOf[Kryo], - classOf[Object], - classOf[java.io.OutputStream]) -method.setAccessible(true) - -method + def serializeObjectByKryo( + kryo: Kryo, + plan: Object, + out: OutputStream ) { +val output: Output = new Output(out) +kryo.writeObject(output, plan) +output.close() } def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { -methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), is, clazz) +deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz) .asInstanceOf[UDFType] } def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { -methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), function, out) +serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out) } private var instance: AnyRef = null - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6738] [CORE] Improve estimate the size of a large array
Repository: spark Updated Branches: refs/heads/master b9de9e040 - 8e1c00dbf [SPARK-6738] [CORE] Improve estimate the size of a large array Currently, SizeEstimator.visitArray is not correct in the follow case, ``` array size 200, elem has the share object ``` when I add a debug log in SizeTracker.scala: ``` System.err.println(snumUpdates:$numUpdates, size:$ts, bytesPerUpdate:$bytesPerUpdate, cost time:$b) ``` I get the following log: ``` numUpdates:1, size:262448, bytesPerUpdate:0.0, cost time:35 numUpdates:2, size:420698, bytesPerUpdate:158250.0, cost time:35 numUpdates:4, size:420754, bytesPerUpdate:28.0, cost time:32 numUpdates:7, size:420754, bytesPerUpdate:0.0, cost time:27 numUpdates:12, size:420754, bytesPerUpdate:0.0, cost time:28 numUpdates:20, size:420754, bytesPerUpdate:0.0, cost time:25 numUpdates:32, size:420754, bytesPerUpdate:0.0, cost time:21 numUpdates:52, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:84, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:135, size:420754, bytesPerUpdate:0.0, cost time:20 numUpdates:216, size:420754, bytesPerUpdate:0.0, cost time:11 numUpdates:346, size:420754, bytesPerUpdate:0.0, cost time:6 numUpdates:554, size:488911, bytesPerUpdate:327.67788461538464, cost time:8 numUpdates:887, size:2312259426, bytesPerUpdate:6942253.798798799, cost time:198 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 3.0 GB to disk (1 time so far) 15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` But in fact the file size is only 162K: ``` $ ll -h /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc -rw-r- 1 spark users 162K Apr 21 14:27 /data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc ``` In order to test case, I change visitArray to: ``` var size = 0l for (i - 0 until length) { val obj = JArray.get(array, i) size += SizeEstimator.estimate(obj, state.visited).toLong } state.size += size ``` I get the following log: ``` ... 14895 277016088 566.9046118590662 time:8470 23832 281840544 552.3308270676691 time:8031 38132 289891824 539.8294729775092 time:7897 61012 302803640 563.0265734265735 time:13044 97620 322904416 564.3276223776223 time:13554 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling in-memory map of 314.5 MB to disk (1 time so far) 15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-local-20150414114020-2fcb/14/temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` the file size is 85M. ``` $ ll -h /data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark- local-20150414114020-2fcb/14/ total 85M -rw-r- 1 spark users 85M Apr 14 11:46 temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0 ``` The following log is when I use this patch, ``` numUpdates:32, size:365484, bytesPerUpdate:0.0, cost time:7 numUpdates:52, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:84, size:365484, bytesPerUpdate:0.0, cost time:5 numUpdates:135, size:372208, bytesPerUpdate:131.84313725490196, cost time:86 numUpdates:216, size:379020, bytesPerUpdate:84.09876543209876, cost time:21 numUpdates:346, size:1865208, bytesPerUpdate:11432.215384615385, cost time:23 numUpdates:554, size:2052380, bytesPerUpdate:899.8653846153846, cost time:16 numUpdates:887, size:2142820, bytesPerUpdate:271.59159159159157, cost time:15 .. numUpdates:14895, size:251675500, bytesPerUpdate:438.5263157894737, cost time:13 numUpdates:23832, size:257010268, bytesPerUpdate:596.9305135951662, cost time:14 numUpdates:38132, size:263922396, bytesPerUpdate:483.3655944055944, cost time:15 numUpdates:61012, size:268962596, bytesPerUpdate:220.28846153846155, cost time:24 numUpdates:97620, size:286980644, bytesPerUpdate:492.1888111888112, cost time:22 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: Thread 53 spilling in-memory map of 328.7 MB to disk (1 time so far) 15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` the file size is 88M. ``` $ ll -h /data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/ total 88M -rw-r- 1 spark users 88M Apr 21 14:45 temp_local_9c109510-af16-4468-8f23-48cad04da88f ``` Author: Hong Shen hongs...@tencent.com Closes #5608 from shenh062326/my_change5 and squashes the following
spark git commit: [SPARK-7162] [YARN] Launcher error in yarn-client
Repository: spark Updated Branches: refs/heads/master ab5adb7a9 - 62888a4de [SPARK-7162] [YARN] Launcher error in yarn-client jira: https://issues.apache.org/jira/browse/SPARK-7162 Author: GuoQiang Li wi...@qq.com Closes #5716 from witgo/SPARK-7162 and squashes the following commits: b64564c [GuoQiang Li] Launcher error in yarn-client Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62888a4d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62888a4d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62888a4d Branch: refs/heads/master Commit: 62888a4ded91b3c2cbb05936c374c7ebfc10799e Parents: ab5adb7 Author: GuoQiang Li wi...@qq.com Authored: Mon Apr 27 19:52:41 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Mon Apr 27 19:52:41 2015 -0400 -- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62888a4d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala -- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 019afbd..741239c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -354,7 +354,7 @@ private[spark] class Client( val dir = new File(path) if (dir.isDirectory()) { dir.listFiles().foreach { file = -if (!hadoopConfFiles.contains(file.getName())) { +if (file.isFile !hadoopConfFiles.contains(file.getName())) { hadoopConfFiles(file.getName()) = file } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner
Repository: spark Updated Branches: refs/heads/branch-1.3 81de30ae5 - d13080aa2 [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner Added a check to the SparkContext.union method to check that a partitioner is defined on all RDDs when instantiating a PartitionerAwareUnionRDD. Author: Steven She ste...@canopylabs.com Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits: 5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner (cherry picked from commit b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d13080aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d13080aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d13080aa Branch: refs/heads/branch-1.3 Commit: d13080aa24106a348d3d1e2b8a788292d5915f21 Parents: 81de30a Author: Steven She ste...@canopylabs.com Authored: Mon Apr 27 18:55:02 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Mon Apr 27 18:55:15 2015 -0400 -- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/rdd/PartitionerAwareUnionRDD.scala| 1 + .../scala/org/apache/spark/rdd/RDDSuite.scala | 21 3 files changed, 23 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 495227b..66426f3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -972,7 +972,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { val partitioners = rdds.flatMap(_.partitioner).toSet -if (partitioners.size == 1) { +if (rdds.forall(_.partitioner.isDefined) partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 92b0641..7598ff6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x = new OneToOneDependency(x))) { require(rdds.length 0) + require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, Parent RDDs have different partitioners: + rdds.flatMap(_.partitioner)) http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index bede1ff..b5f4a19 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test(SparkContext.union creates UnionRDD if at least one RDD has no partitioner) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val rddWithNoPartitioner = sc.parallelize(Seq(2-true)) +val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner) +assert(unionRdd.isInstanceOf[UnionRDD[_]]) + } + + test(SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner) +assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]]) + } + + test(PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val rddWithNoPartitioner = sc.parallelize(Seq(2-true)) +
spark git commit: [SPARK-3090] [CORE] Stop SparkContext if user forgets to.
Repository: spark Updated Branches: refs/heads/master 8e1c00dbf - 5d45e1f60 [SPARK-3090] [CORE] Stop SparkContext if user forgets to. Set up a shutdown hook to try to stop the Spark context in case the user forgets to do it. The main effect is that any open logs files are flushed and closed, which is particularly interesting for event logs. Author: Marcelo Vanzin van...@cloudera.com Closes #5696 from vanzin/SPARK-3090 and squashes the following commits: 3b554b5 [Marcelo Vanzin] [SPARK-3090] [core] Stop SparkContext if user forgets to. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d45e1f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d45e1f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d45e1f6 Branch: refs/heads/master Commit: 5d45e1f60059e2f2fc8ad64778b9ddcc8887c570 Parents: 8e1c00d Author: Marcelo Vanzin van...@cloudera.com Authored: Mon Apr 27 19:46:17 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Mon Apr 27 19:46:17 2015 -0400 -- .../scala/org/apache/spark/SparkContext.scala | 38 +--- .../scala/org/apache/spark/util/Utils.scala | 10 -- .../spark/deploy/yarn/ApplicationMaster.scala | 10 ++ 3 files changed, 35 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d45e1f6/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ea4ddcc..65b903a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _listenerBusStarted: Boolean = false private var _jars: Seq[String] = _ private var _files: Seq[String] = _ + private var _shutdownHookRef: AnyRef = _ /* - * | Accessors and public fields. These provide access to the internal state of the| @@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _taskScheduler.postStartHook() _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler)) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) + +// Make sure the context is stopped if the user forgets about it. This avoids leaving +// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM +// is killed, though. +_shutdownHookRef = Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () = + logInfo(Invoking stop() from shutdown hook) + stop() +} } catch { case NonFatal(e) = logError(Error initializing SparkContext., e) @@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(SparkContext already stopped.) return } +if (_shutdownHookRef != null) { + Utils.removeShutdownHook(_shutdownHookRef) +} postApplicationEnd() _ui.foreach(_.stop()) @@ -1891,7 +1903,7 @@ object SparkContext extends Logging { * * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. */ - private val activeContext: AtomicReference[SparkContext] = + private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /** @@ -1944,11 +1956,11 @@ object SparkContext extends Logging { } /** - * This function may be used to get or instantiate a SparkContext and register it as a - * singleton object. Because we can only have one active SparkContext per JVM, - * this is useful when applications may wish to share a SparkContext. + * This function may be used to get or instantiate a SparkContext and register it as a + * singleton object. Because we can only have one active SparkContext per JVM, + * this is useful when applications may wish to share a SparkContext. * - * Note: This function cannot be used to create multiple SparkContext instances + * Note: This function cannot be used to create multiple SparkContext instances * even if multiple contexts are allowed. */ def getOrCreate(config: SparkConf): SparkContext = { @@ -1961,17 +1973,17 @@ object SparkContext extends Logging { activeContext.get() } } - + /** - * This function may be used to get or instantiate a SparkContext and register it as a - * singleton object. Because we can only have one active SparkContext per JVM, + * This function may be used to get
spark git commit: [SPARK-6991] [SPARKR] Adds support for zipPartitions.
Repository: spark Updated Branches: refs/heads/master ef82bddc1 - ca9f4ebb8 [SPARK-6991] [SPARKR] Adds support for zipPartitions. Author: hlin09 hlin0...@gmail.com Closes #5568 from hlin09/zipPartitions and squashes the following commits: 12c08a5 [hlin09] Fix comments d2d32db [hlin09] Merge branch 'master' into zipPartitions ec56d2f [hlin09] Fix test. 27655d3 [hlin09] Adds support for zipPartitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9f4ebb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9f4ebb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9f4ebb Branch: refs/heads/master Commit: ca9f4ebb8e510e521bf4df0331375ddb385fb9d2 Parents: ef82bdd Author: hlin09 hlin0...@gmail.com Authored: Mon Apr 27 15:04:37 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Mon Apr 27 15:04:37 2015 -0700 -- R/pkg/NAMESPACE | 1 + R/pkg/R/RDD.R | 46 R/pkg/R/generics.R | 5 +++ R/pkg/inst/tests/test_binary_function.R | 33 4 files changed, 85 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8028364..e077eac 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -71,6 +71,7 @@ exportMethods( unpersist, value, values, + zipPartitions, zipRDD, zipWithIndex, zipWithUniqueId http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index f90c26b..a3a0421 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1595,3 +1595,49 @@ setMethod(intersection, keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction)) }) + +#' Zips an RDD's partitions with one (or more) RDD(s). +#' Same as zipPartitions in Spark. +#' +#' @param ... RDDs to be zipped. +#' @param func A function to transform zipped partitions. +#' @return A new RDD by applying a function to the zipped partitions. +#' Assumes that all the RDDs have the *same number of partitions*, but +#' does *not* require them to have the same number of elements in each partition. +#' @examples +#'\dontrun{ +#' sc - sparkR.init() +#' rdd1 - parallelize(sc, 1:2, 2L) # 1, 2 +#' rdd2 - parallelize(sc, 1:4, 2L) # 1:2, 3:4 +#' rdd3 - parallelize(sc, 1:6, 2L) # 1:3, 4:6 +#' collect(zipPartitions(rdd1, rdd2, rdd3, +#' func = function(x, y, z) { list(list(x, y, z))} )) +#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))) +#'} +#' @rdname zipRDD +#' @aliases zipPartitions,RDD +setMethod(zipPartitions, + RDD, + function(..., func) { +rrdds - list(...) +if (length(rrdds) == 1) { + return(rrdds[[1]]) +} +nPart - sapply(rrdds, numPartitions) +if (length(unique(nPart)) != 1) { + stop(Can only zipPartitions RDDs which have the same number of partitions.) +} + +rrdds - lapply(rrdds, function(rdd) { + mapPartitionsWithIndex(rdd, function(partIndex, part) { +print(length(part)) +list(list(partIndex, part)) + }) +}) +union.rdd - Reduce(unionRDD, rrdds) +zipped.rdd - values(groupByKey(union.rdd, numPartitions = nPart[1])) +res - mapPartitions(zipped.rdd, function(plist) { + do.call(func, plist[[1]]) +}) +res + }) http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 34dbe84..e887293 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -217,6 +217,11 @@ setGeneric(unpersist, function(x, ...) { standardGeneric(unpersist) }) #' @export setGeneric(zipRDD, function(x, other) { standardGeneric(zipRDD) }) +#' @rdname zipRDD +#' @export +setGeneric(zipPartitions, function(..., func) { standardGeneric(zipPartitions) }, + signature = ...) + #' @rdname zipWithIndex #' @seealso zipWithUniqueId #' @export http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/inst/tests/test_binary_function.R -- diff --git a/R/pkg/inst/tests/test_binary_function.R
spark git commit: [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner
Repository: spark Updated Branches: refs/heads/master ca9f4ebb8 - b9de9e040 [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner Added a check to the SparkContext.union method to check that a partitioner is defined on all RDDs when instantiating a PartitionerAwareUnionRDD. Author: Steven She ste...@canopylabs.com Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits: 5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at least one RDD has no partitioner Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9de9e04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9de9e04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9de9e04 Branch: refs/heads/master Commit: b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56 Parents: ca9f4eb Author: Steven She ste...@canopylabs.com Authored: Mon Apr 27 18:55:02 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Mon Apr 27 18:55:02 2015 -0400 -- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/rdd/PartitionerAwareUnionRDD.scala| 1 + .../scala/org/apache/spark/rdd/RDDSuite.scala | 21 3 files changed, 23 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 86269ea..ea4ddcc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** Build the union of a list of RDDs. */ def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { val partitioners = rdds.flatMap(_.partitioner).toSet -if (partitioners.size == 1) { +if (rdds.forall(_.partitioner.isDefined) partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 92b0641..7598ff6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]] ) extends RDD[T](sc, rdds.map(x = new OneToOneDependency(x))) { require(rdds.length 0) + require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, Parent RDDs have different partitioners: + rdds.flatMap(_.partitioner)) http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index df42faa..ef8c36a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) } + test(SparkContext.union creates UnionRDD if at least one RDD has no partitioner) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val rddWithNoPartitioner = sc.parallelize(Seq(2-true)) +val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner) +assert(unionRdd.isInstanceOf[UnionRDD[_]]) + } + + test(SparkContext.union creates PartitionAwareUnionRDD if all RDDs have partitioners) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner) +assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]]) + } + + test(PartitionAwareUnionRDD raises exception if at least one RDD has no partitioner) { +val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new HashPartitioner(1)) +val rddWithNoPartitioner = sc.parallelize(Seq(2-true)) +intercept[IllegalArgumentException] { + new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner,
spark git commit: [SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread
Repository: spark Updated Branches: refs/heads/master 4d9e560b5 - 874a2ca93 [SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread `HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will call ```Scala blockManagerMaster.driverEndpoint.askWithReply[Boolean]( BlockManagerHeartbeat(blockManagerId), 600 seconds) ``` finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1. jstack log: ``` sparkDriver-akka.actor.default-dispatcher-14 daemon prio=10 tid=0x7f2a8c02d000 nid=0x725 waiting on condition [0x7f2b1d6d] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x0006197a0868 (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355) at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128) at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` This PR moved this blocking operation to a separated thread. Author: zsxwing zsxw...@gmail.com Closes #5723 from zsxwing/SPARK-7174 and squashes the following commits: 98bfe48 [zsxwing] Use a single thread for checking timeout and reporting executorHeartbeatReceived 5b3b545 [zsxwing] Move calling
spark git commit: [SPARK-7090] [MLLIB] Introduce LDAOptimizer to LDA to further improve extensibility
Repository: spark Updated Branches: refs/heads/master 62888a4de - 4d9e560b5 [SPARK-7090] [MLLIB] Introduce LDAOptimizer to LDA to further improve extensibility jira: https://issues.apache.org/jira/browse/SPARK-7090 LDA was implemented with extensibility in mind. And with the development of OnlineLDA and Gibbs Sampling, we are collecting more detailed requirements from different algorithms. As Joseph Bradley jkbradley proposed in https://github.com/apache/spark/pull/4807 and with some further discussion, we'd like to adjust the code structure a little to present the common interface and extension point clearly. Basically class LDA would be a common entrance for LDA computing. And each LDA object will refer to a LDAOptimizer for the concrete algorithm implementation. Users can customize LDAOptimizer with specific parameters and assign it to LDA. Concrete changes: 1. Add a trait `LDAOptimizer`, which defines the common iterface for concrete implementations. Each subClass is a wrapper for a specific LDA algorithm. 2. Move EMOptimizer to file LDAOptimizer and inherits from LDAOptimizer, rename to EMLDAOptimizer. (in case a more generic EMOptimizer comes in the future) -adjust the constructor of EMOptimizer, since all the parameters should be passed in through initialState method. This can avoid unwanted confusion or overwrite. -move the code from LDA.initalState to initalState of EMLDAOptimizer 3. Add property ldaOptimizer to LDA and its getter/setter, and EMLDAOptimizer is the default Optimizer. 4. Change the return type of LDA.run from DistributedLDAModel to LDAModel. Further work: add OnlineLDAOptimizer and other possible Optimizers once ready. Author: Yuhao Yang hhb...@gmail.com Closes #5661 from hhbyyh/ldaRefactor and squashes the following commits: 0e2e006 [Yuhao Yang] respond to review comments 08a45da [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaRefactor e756ce4 [Yuhao Yang] solve mima exception d74fd8f [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into ldaRefactor 0bb8400 [Yuhao Yang] refactor LDA with Optimizer ec2f857 [Yuhao Yang] protoptype for discussion Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d9e560b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d9e560b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d9e560b Branch: refs/heads/master Commit: 4d9e560b5470029143926827b1cb9d72a0bfbeff Parents: 62888a4 Author: Yuhao Yang hhb...@gmail.com Authored: Mon Apr 27 19:02:51 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon Apr 27 19:02:51 2015 -0700 -- .../spark/examples/mllib/JavaLDAExample.java| 2 +- .../spark/examples/mllib/LDAExample.scala | 4 +- .../org/apache/spark/mllib/clustering/LDA.scala | 181 .../spark/mllib/clustering/LDAModel.scala | 2 +- .../spark/mllib/clustering/LDAOptimizer.scala | 210 +++ .../spark/mllib/clustering/JavaLDASuite.java| 2 +- .../spark/mllib/clustering/LDASuite.scala | 2 +- project/MimaExcludes.scala | 4 + 8 files changed, 256 insertions(+), 151 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d9e560b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java index 36207ae..fd53c81 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java @@ -58,7 +58,7 @@ public class JavaLDAExample { corpus.cache(); // Cluster the documents into three topics using LDA -DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus); +DistributedLDAModel ldaModel = (DistributedLDAModel)new LDA().setK(3).run(corpus); // Output topics. Each is a distribution over words (matching word count vectors) System.out.println(Learned topics (as distributions over vocab of + ldaModel.vocabSize() http://git-wip-us.apache.org/repos/asf/spark/blob/4d9e560b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 08a9359..a185039 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -26,7
spark git commit: [SPARK-7145] [CORE] commons-lang (2.x) classes used instead of commons-lang3 (3.x); commons-io used without dependency
Repository: spark Updated Branches: refs/heads/master 5d45e1f60 - ab5adb7a9 [SPARK-7145] [CORE] commons-lang (2.x) classes used instead of commons-lang3 (3.x); commons-io used without dependency Remove use of commons-lang in favor of commons-lang3 classes; remove commons-io use in favor of Guava Author: Sean Owen so...@cloudera.com Closes #5703 from srowen/SPARK-7145 and squashes the following commits: 21fbe03 [Sean Owen] Remove use of commons-lang in favor of commons-lang3 classes; remove commons-io use in favor of Guava Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab5adb7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab5adb7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab5adb7a Branch: refs/heads/master Commit: ab5adb7a973eec9d95c7575c864cba9f8d83a0fd Parents: 5d45e1f Author: Sean Owen so...@cloudera.com Authored: Mon Apr 27 19:50:55 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Mon Apr 27 19:50:55 2015 -0400 -- .../src/test/scala/org/apache/spark/FileServerSuite.scala | 7 +++ .../apache/spark/metrics/InputOutputMetricsSuite.scala| 4 ++-- .../network/netty/NettyBlockTransferSecuritySuite.scala | 10 +++--- external/flume-sink/pom.xml | 4 .../streaming/flume/sink/SparkAvroCallbackHandler.scala | 4 ++-- .../main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala| 6 +- .../sql/hive/thriftserver/AbstractSparkSQLDriver.scala| 4 ++-- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 +++- .../apache/spark/sql/hive/execution/UDFListString.java| 6 +++--- .../apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 9 - 10 files changed, 35 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab5adb7a/core/src/test/scala/org/apache/spark/FileServerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index a69e9b7..c0439f9 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -22,8 +22,7 @@ import java.net.URI import java.util.jar.{JarEntry, JarOutputStream} import javax.net.ssl.SSLException -import com.google.common.io.ByteStreams -import org.apache.commons.io.{FileUtils, IOUtils} +import com.google.common.io.{ByteStreams, Files} import org.apache.commons.lang3.RandomUtils import org.scalatest.FunSuite @@ -239,7 +238,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = { val randomContent = RandomUtils.nextBytes(100) val file = File.createTempFile(FileServerSuite, sslTests, tmpDir) -FileUtils.writeByteArrayToFile(file, randomContent) +Files.write(randomContent, file) server.addFile(file) val uri = new URI(server.serverUri + /files/ + file.getName) @@ -254,7 +253,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { Utils.setupSecureURLConnection(connection, sm) } -val buf = IOUtils.toByteArray(connection.getInputStream) +val buf = ByteStreams.toByteArray(connection.getInputStream) assert(buf === randomContent) } http://git-wip-us.apache.org/repos/asf/spark/blob/ab5adb7a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 190b08d..ef3e213 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -21,7 +21,7 @@ import java.io.{File, FileWriter, PrintWriter} import scala.collection.mutable.ArrayBuffer -import org.apache.commons.lang.math.RandomUtils +import org.apache.commons.lang3.RandomUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} @@ -60,7 +60,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext tmpFile = new File(testTempDir, getClass.getSimpleName + .txt) val pw = new PrintWriter(new FileWriter(tmpFile)) for (x - 1 to numRecords) { - pw.println(RandomUtils.nextInt(numBuckets)) + pw.println(RandomUtils.nextInt(0, numBuckets)) } pw.close()
spark git commit: [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact
Repository: spark Updated Branches: refs/heads/master 82bb7fd41 - 998aac21f [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact turned on hive-thriftserver profile in release script Author: Misha Chernetsov chernet...@gmail.com Closes #5429 from chernetsov/master and squashes the following commits: 9cc36af [Misha Chernetsov] [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact turned on hive-thriftserver profile in release script for scala 2.10 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/998aac21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/998aac21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/998aac21 Branch: refs/heads/master Commit: 998aac21f0a0588a70f8cf123ae4080163c612fb Parents: 82bb7fd Author: Misha Chernetsov chernet...@gmail.com Authored: Mon Apr 27 11:27:56 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Mon Apr 27 11:27:56 2015 -0700 -- dev/create-release/create-release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/998aac21/dev/create-release/create-release.sh -- diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index b5a67dd..3dbb35f 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -119,7 +119,7 @@ if [[ ! $@ =~ --skip-publish ]]; then rm -rf $SPARK_REPO build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ --Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ +-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-version-to-2.11.sh - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact
Repository: spark Updated Branches: refs/heads/branch-1.3 c4470b93f - 81de30ae5 [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact turned on hive-thriftserver profile in release script Author: Misha Chernetsov chernet...@gmail.com Closes #5429 from chernetsov/master and squashes the following commits: 9cc36af [Misha Chernetsov] [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact turned on hive-thriftserver profile in release script for scala 2.10 (cherry picked from commit 998aac21f0a0588a70f8cf123ae4080163c612fb) Signed-off-by: Patrick Wendell patr...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81de30ae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81de30ae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81de30ae Branch: refs/heads/branch-1.3 Commit: 81de30ae51d3858c30f748994e9249700847fcde Parents: c4470b9 Author: Misha Chernetsov chernet...@gmail.com Authored: Mon Apr 27 11:27:56 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Mon Apr 27 11:28:07 2015 -0700 -- dev/create-release/create-release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81de30ae/dev/create-release/create-release.sh -- diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 6f87fcd..0403594 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -117,7 +117,7 @@ if [[ ! $@ =~ --skip-publish ]]; then echo Created Nexus staging repository: $staged_repo_id build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \ --Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ +-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \ clean install ./dev/change-version-to-2.11.sh - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org