spark git commit: [SPARK-6856] [R] Make RDD information more useful in SparkR

2015-04-27 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 998aac21f - 7078f6028


[SPARK-6856] [R] Make RDD information more useful in SparkR

Author: Jeff Harrison jeffrharri...@gmail.com

Closes #5667 from His-name-is-Joof/joofspark and squashes the following commits:

f8814a6 [Jeff Harrison] newline added after RDD show() output
4d9d972 [Jeff Harrison] Merge branch 'master' into joofspark
9d2295e [Jeff Harrison] parallelize with 1:10
878b830 [Jeff Harrison] Merge branch 'master' into joofspark
c8c0b80 [Jeff Harrison] add test for RDD function show()
123be65 [Jeff Harrison] SPARK-6856


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7078f602
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7078f602
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7078f602

Branch: refs/heads/master
Commit: 7078f6028bf012235c664b02ec3541cbb0a248a7
Parents: 998aac2
Author: Jeff Harrison jeffrharri...@gmail.com
Authored: Mon Apr 27 13:38:25 2015 -0700
Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu
Committed: Mon Apr 27 13:38:25 2015 -0700

--
 R/pkg/R/RDD.R   | 5 +
 R/pkg/inst/tests/test_rdd.R | 5 +
 2 files changed, 10 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7078f602/R/pkg/R/RDD.R
--
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 1662d6b..f90c26b 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -66,6 +66,11 @@ setMethod(initialize, RDD, function(.Object, jrdd, 
serializedMode,
   .Object
 })
 
+setMethod(show, RDD,
+  function(.Object) {
+  cat(paste(callJMethod(.Object@jrdd, toString), \n, sep=))
+  })
+
 setMethod(initialize, PipelinedRDD, function(.Object, prev, func, 
jrdd_val) {
   .Object@env - new.env()
   .Object@env$isCached - FALSE

http://git-wip-us.apache.org/repos/asf/spark/blob/7078f602/R/pkg/inst/tests/test_rdd.R
--
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
index d55af93..0320735 100644
--- a/R/pkg/inst/tests/test_rdd.R
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -759,6 +759,11 @@ test_that(collectAsMap() on a pairwise RDD, {
   expect_equal(vals, list(`1` = a, `2` = b))
 })
 
+test_that(show(), {
+  rdd - parallelize(sc, list(1:10))
+  expect_output(show(rdd), ParallelCollectionRDD\\[\\d+\\] at parallelize at 
RRDD\\.scala:\\d+)
+})
+
 test_that(sampleByKey() on pairwise RDDs, {
   rdd - parallelize(sc, 1:2000)
   pairsRDD - lapply(rdd, function(x) { if (x %% 2 == 0) list(a, x) else 
list(b, x) })


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat...

2015-04-27 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 7078f6028 - ef82bddc1


SPARK-7107 Add parameter for zookeeper.znode.parent to hbase_inputformat...

py

Author: tedyu yuzhih...@gmail.com

Closes #5673 from tedyu/master and squashes the following commits:

ab7c72b [tedyu] SPARK-7107 Adjust indentation to pass Python style tests
6e25939 [tedyu] Adjust line length to be shorter than 100 characters
18d172a [tedyu] SPARK-7107 Add parameter for zookeeper.znode.parent to 
hbase_inputformat.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef82bddc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef82bddc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef82bddc

Branch: refs/heads/master
Commit: ef82bddc11d1aea42e22d2f85613a869cbe9a990
Parents: 7078f60
Author: tedyu yuzhih...@gmail.com
Authored: Mon Apr 27 14:42:40 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Apr 27 14:42:40 2015 -0700

--
 examples/src/main/python/hbase_inputformat.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef82bddc/examples/src/main/python/hbase_inputformat.py
--
diff --git a/examples/src/main/python/hbase_inputformat.py 
b/examples/src/main/python/hbase_inputformat.py
index e17819d..5b82a14 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -54,8 +54,9 @@ if __name__ == __main__:
 
 Run with example jar:
 ./bin/spark-submit --driver-class-path /path/to/example/jar \
-/path/to/examples/hbase_inputformat.py host table
+/path/to/examples/hbase_inputformat.py host table [znode]
 Assumes you have some data in HBase already, running on host, in 
table
+  optionally, you can specify parent znode for your hbase cluster - 
znode
 , file=sys.stderr)
 exit(-1)
 
@@ -64,6 +65,9 @@ if __name__ == __main__:
 sc = SparkContext(appName=HBaseInputFormat)
 
 conf = {hbase.zookeeper.quorum: host, hbase.mapreduce.inputtable: 
table}
+if len(sys.argv)  3:
+conf = {hbase.zookeeper.quorum: host, zookeeper.znode.parent: 
sys.argv[3],
+hbase.mapreduce.inputtable: table}
 keyConv = 
org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter
 valueConv = 
org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper

2015-04-27 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master d188b8bad - 82bb7fd41


[SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper

according liancheng‘s  comment in 
https://issues.apache.org/jira/browse/SPARK-6505,  this patch remove the  
reflection call in HiveFunctionWrapper, and implement the functions named 
deserializeObjectByKryo and serializeObjectByKryo according the functions 
with the save name in
org.apache.hadoop.hive.ql.exec.Utilities.java

Author: baishuo vc_j...@hotmail.com

Closes #5660 from baishuo/SPARK-6505-20150423 and squashes the following 
commits:

ae61ec4 [baishuo] modify code style
78d9fa3 [baishuo] modify code style
0b522a7 [baishuo] modify code style
a5ff9c7 [baishuo] Remove the reflection call in HiveFunctionWrapper


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82bb7fd4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82bb7fd4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82bb7fd4

Branch: refs/heads/master
Commit: 82bb7fd41a2c7992e0aea69623c504bd439744f7
Parents: d188b8b
Author: baishuo vc_j...@hotmail.com
Authored: Mon Apr 27 14:08:05 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Mon Apr 27 14:08:05 2015 +0800

--
 .../org/apache/spark/sql/hive/Shim13.scala  | 44 ++--
 1 file changed, 22 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82bb7fd4/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
--
diff --git 
a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala 
b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index d331c21..dbc5e02 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.hive
 
 import java.rmi.server.UID
 import java.util.{Properties, ArrayList = JArrayList}
+import java.io.{OutputStream, InputStream}
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
+import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Input
+import com.esotericsoftware.kryo.io.Output
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
@@ -46,6 +50,7 @@ import org.apache.hadoop.{io = hadoopIo}
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
+import org.apache.spark.util.Utils._
 
 /**
  * This class provides the UDF creation and also the UDF instance 
serialization and
@@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var 
functionClassName: String)
   // for Serialization
   def this() = this(null)
 
-  import org.apache.spark.util.Utils._
-
   @transient
-  private val methodDeSerialize = {
-val method = classOf[Utilities].getDeclaredMethod(
-  deserializeObjectByKryo,
-  classOf[Kryo],
-  classOf[java.io.InputStream],
-  classOf[Class[_]])
-method.setAccessible(true)
-
-method
+  def deserializeObjectByKryo[T: ClassTag](
+  kryo: Kryo,
+  in: InputStream,
+  clazz: Class[_]): T = {
+val inp = new Input(in)
+val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
+inp.close()
+t
   }
 
   @transient
-  private val methodSerialize = {
-val method = classOf[Utilities].getDeclaredMethod(
-  serializeObjectByKryo,
-  classOf[Kryo],
-  classOf[Object],
-  classOf[java.io.OutputStream])
-method.setAccessible(true)
-
-method
+  def serializeObjectByKryo(
+  kryo: Kryo,
+  plan: Object,
+  out: OutputStream ) {
+val output: Output = new Output(out)
+kryo.writeObject(output, plan)
+output.close()
   }
 
   def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): 
UDFType = {
-methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), 
is, clazz)
+deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, 
clazz)
   .asInstanceOf[UDFType]
   }
 
   def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
-methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), 
function, out)
+serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, 
out)
   }
 
   private var instance: AnyRef = null


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-6738] [CORE] Improve estimate the size of a large array

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master b9de9e040 - 8e1c00dbf


[SPARK-6738] [CORE] Improve estimate the size of a large array

Currently, SizeEstimator.visitArray is not correct in the follow case,
```
array size  200,
elem has the share object
```

when I add a debug log in SizeTracker.scala:
```
 System.err.println(snumUpdates:$numUpdates, size:$ts, 
bytesPerUpdate:$bytesPerUpdate, cost time:$b)
```
I get the following log:
```
 numUpdates:1, size:262448, bytesPerUpdate:0.0, cost time:35
 numUpdates:2, size:420698, bytesPerUpdate:158250.0, cost time:35
 numUpdates:4, size:420754, bytesPerUpdate:28.0, cost time:32
 numUpdates:7, size:420754, bytesPerUpdate:0.0, cost time:27
 numUpdates:12, size:420754, bytesPerUpdate:0.0, cost time:28
 numUpdates:20, size:420754, bytesPerUpdate:0.0, cost time:25
 numUpdates:32, size:420754, bytesPerUpdate:0.0, cost time:21
 numUpdates:52, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:84, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:135, size:420754, bytesPerUpdate:0.0, cost time:20
 numUpdates:216, size:420754, bytesPerUpdate:0.0, cost time:11
 numUpdates:346, size:420754, bytesPerUpdate:0.0, cost time:6
 numUpdates:554, size:488911, bytesPerUpdate:327.67788461538464, cost time:8
 numUpdates:887, size:2312259426, bytesPerUpdate:6942253.798798799, cost 
time:198
15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling 
in-memory map of 3.0 GB to disk (1 time so far)
15/04/21 14:27:26 INFO collection.ExternalAppendOnlyMap: 
/data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
```
But in fact the file size is only 162K:
```
$ ll -h 
/data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
-rw-r- 1 spark users 162K Apr 21 14:27 
/data11/yarnenv/local/usercache/spark/appcache/application_1426746631567_11745/spark-local-20150421142719-c001/30/temp_local_066af981-c2fc-4b70-a00e-110e23006fbc
```

In order to test case, I change visitArray to:
```
   var size = 0l
 for (i - 0 until length) {
  val obj = JArray.get(array, i)
  size += SizeEstimator.estimate(obj, state.visited).toLong
}
   state.size += size
```
I get the following log:
```
...
14895 277016088 566.9046118590662 time:8470
23832 281840544 552.3308270676691 time:8031
38132 289891824 539.8294729775092 time:7897
61012 302803640 563.0265734265735 time:13044
97620 322904416 564.3276223776223 time:13554
15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: Thread 51 spilling 
in-memory map of 314.5 MB to disk (1 time so far)
15/04/14 11:46:43 INFO collection.ExternalAppendOnlyMap: 
/data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-local-20150414114020-2fcb/14/temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0
```
 the file size is 85M.
```
$ ll -h 
/data1/yarnenv/local/usercache/spark/appcache/application_1426746631567_8477/spark-
 local-20150414114020-2fcb/14/
total 85M
-rw-r- 1 spark users 85M Apr 14 11:46 
temp_local_5b6b98d5-5bfa-47e2-8216-059482ccbda0
```

The following log is when I use this patch,
```

numUpdates:32, size:365484, bytesPerUpdate:0.0, cost time:7
numUpdates:52, size:365484, bytesPerUpdate:0.0, cost time:5
numUpdates:84, size:365484, bytesPerUpdate:0.0, cost time:5
numUpdates:135, size:372208, bytesPerUpdate:131.84313725490196, cost time:86
numUpdates:216, size:379020, bytesPerUpdate:84.09876543209876, cost time:21
numUpdates:346, size:1865208, bytesPerUpdate:11432.215384615385, cost time:23
numUpdates:554, size:2052380, bytesPerUpdate:899.8653846153846, cost time:16
numUpdates:887, size:2142820, bytesPerUpdate:271.59159159159157, cost time:15
..
numUpdates:14895, size:251675500, bytesPerUpdate:438.5263157894737, cost time:13
numUpdates:23832, size:257010268, bytesPerUpdate:596.9305135951662, cost time:14
numUpdates:38132, size:263922396, bytesPerUpdate:483.3655944055944, cost time:15
numUpdates:61012, size:268962596, bytesPerUpdate:220.28846153846155, cost 
time:24
numUpdates:97620, size:286980644, bytesPerUpdate:492.1888111888112, cost time:22
15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: Thread 53 spilling 
in-memory map of 328.7 MB to disk (1 time so far)
15/04/21 14:45:12 INFO collection.ExternalAppendOnlyMap: 
/data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/temp_local_9c109510-af16-4468-8f23-48cad04da88f
```
 the file size is 88M.
```
$ ll -h 
/data4/yarnenv/local/usercache/spark/appcache/application_1426746631567_11758/spark-local-20150421144456-a2a5/2a/
total 88M
-rw-r- 1 spark users 88M Apr 21 14:45 
temp_local_9c109510-af16-4468-8f23-48cad04da88f
```

Author: Hong Shen hongs...@tencent.com

Closes #5608 from shenh062326/my_change5 and squashes the following 

spark git commit: [SPARK-7162] [YARN] Launcher error in yarn-client

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ab5adb7a9 - 62888a4de


[SPARK-7162] [YARN] Launcher error in yarn-client

jira: https://issues.apache.org/jira/browse/SPARK-7162

Author: GuoQiang Li wi...@qq.com

Closes #5716 from witgo/SPARK-7162 and squashes the following commits:

b64564c [GuoQiang Li] Launcher error in yarn-client


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62888a4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62888a4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62888a4d

Branch: refs/heads/master
Commit: 62888a4ded91b3c2cbb05936c374c7ebfc10799e
Parents: ab5adb7
Author: GuoQiang Li wi...@qq.com
Authored: Mon Apr 27 19:52:41 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Mon Apr 27 19:52:41 2015 -0400

--
 yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62888a4d/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 019afbd..741239c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -354,7 +354,7 @@ private[spark] class Client(
 val dir = new File(path)
 if (dir.isDirectory()) {
   dir.listFiles().foreach { file =
-if (!hadoopConfFiles.contains(file.getName())) {
+if (file.isFile  !hadoopConfFiles.contains(file.getName())) {
   hadoopConfFiles(file.getName()) = file
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 81de30ae5 - d13080aa2


[SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner

Added a check to the SparkContext.union method to check that a partitioner is 
defined on all RDDs when instantiating a PartitionerAwareUnionRDD.

Author: Steven She ste...@canopylabs.com

Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits:

5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at 
least one RDD has no partitioner

(cherry picked from commit b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56)
Signed-off-by: Sean Owen so...@cloudera.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d13080aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d13080aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d13080aa

Branch: refs/heads/branch-1.3
Commit: d13080aa24106a348d3d1e2b8a788292d5915f21
Parents: 81de30a
Author: Steven She ste...@canopylabs.com
Authored: Mon Apr 27 18:55:02 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Mon Apr 27 18:55:15 2015 -0400

--
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../spark/rdd/PartitionerAwareUnionRDD.scala|  1 +
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 21 
 3 files changed, 23 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 495227b..66426f3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -972,7 +972,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   /** Build the union of a list of RDDs. */
   def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
 val partitioners = rdds.flatMap(_.partitioner).toSet
-if (partitioners.size == 1) {
+if (rdds.forall(_.partitioner.isDefined)  partitioners.size == 1) {
   new PartitionerAwareUnionRDD(this, rdds)
 } else {
   new UnionRDD(this, rdds)

http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 92b0641..7598ff6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
 var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x = new OneToOneDependency(x))) {
   require(rdds.length  0)
+  require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
 Parent RDDs have different partitioners:  + rdds.flatMap(_.partitioner))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index bede1ff..b5f4a19 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 
2, 3, 4))
   }
 
+  test(SparkContext.union creates UnionRDD if at least one RDD has no 
partitioner) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val rddWithNoPartitioner = sc.parallelize(Seq(2-true))
+val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
+assert(unionRdd.isInstanceOf[UnionRDD[_]])
+  }
+
+  test(SparkContext.union creates PartitionAwareUnionRDD if all RDDs have 
partitioners) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
+assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
+  }
+
+  test(PartitionAwareUnionRDD raises exception if at least one RDD has no 
partitioner) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val rddWithNoPartitioner = sc.parallelize(Seq(2-true))
+

spark git commit: [SPARK-3090] [CORE] Stop SparkContext if user forgets to.

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 8e1c00dbf - 5d45e1f60


[SPARK-3090] [CORE] Stop SparkContext if user forgets to.

Set up a shutdown hook to try to stop the Spark context in
case the user forgets to do it. The main effect is that any
open logs files are flushed and closed, which is particularly
interesting for event logs.

Author: Marcelo Vanzin van...@cloudera.com

Closes #5696 from vanzin/SPARK-3090 and squashes the following commits:

3b554b5 [Marcelo Vanzin] [SPARK-3090] [core] Stop SparkContext if user forgets 
to.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d45e1f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d45e1f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d45e1f6

Branch: refs/heads/master
Commit: 5d45e1f60059e2f2fc8ad64778b9ddcc8887c570
Parents: 8e1c00d
Author: Marcelo Vanzin van...@cloudera.com
Authored: Mon Apr 27 19:46:17 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Mon Apr 27 19:46:17 2015 -0400

--
 .../scala/org/apache/spark/SparkContext.scala   | 38 +---
 .../scala/org/apache/spark/util/Utils.scala | 10 --
 .../spark/deploy/yarn/ApplicationMaster.scala   | 10 ++
 3 files changed, 35 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5d45e1f6/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ea4ddcc..65b903a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -223,6 +223,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   private var _listenerBusStarted: Boolean = false
   private var _jars: Seq[String] = _
   private var _files: Seq[String] = _
+  private var _shutdownHookRef: AnyRef = _
 
   /* 
-
 *
| Accessors and public fields. These provide access to the internal state 
of the|
@@ -517,6 +518,14 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
 _taskScheduler.postStartHook()
 _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
 _env.metricsSystem.registerSource(new 
BlockManagerSource(_env.blockManager))
+
+// Make sure the context is stopped if the user forgets about it. This 
avoids leaving
+// unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
+// is killed, though.
+_shutdownHookRef = 
Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =
+  logInfo(Invoking stop() from shutdown hook)
+  stop()
+}
   } catch {
 case NonFatal(e) =
   logError(Error initializing SparkContext., e)
@@ -1481,6 +1490,9 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   logInfo(SparkContext already stopped.)
   return
 }
+if (_shutdownHookRef != null) {
+  Utils.removeShutdownHook(_shutdownHookRef)
+}
 
 postApplicationEnd()
 _ui.foreach(_.stop())
@@ -1891,7 +1903,7 @@ object SparkContext extends Logging {
*
* Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
-  private val activeContext: AtomicReference[SparkContext] = 
+  private val activeContext: AtomicReference[SparkContext] =
 new AtomicReference[SparkContext](null)
 
   /**
@@ -1944,11 +1956,11 @@ object SparkContext extends Logging {
   }
 
   /**
-   * This function may be used to get or instantiate a SparkContext and 
register it as a 
-   * singleton object. Because we can only have one active SparkContext per 
JVM, 
-   * this is useful when applications may wish to share a SparkContext. 
+   * This function may be used to get or instantiate a SparkContext and 
register it as a
+   * singleton object. Because we can only have one active SparkContext per 
JVM,
+   * this is useful when applications may wish to share a SparkContext.
*
-   * Note: This function cannot be used to create multiple SparkContext 
instances 
+   * Note: This function cannot be used to create multiple SparkContext 
instances
* even if multiple contexts are allowed.
*/
   def getOrCreate(config: SparkConf): SparkContext = {
@@ -1961,17 +1973,17 @@ object SparkContext extends Logging {
   activeContext.get()
 }
   }
-  
+
   /**
-   * This function may be used to get or instantiate a SparkContext and 
register it as a 
-   * singleton object. Because we can only have one active SparkContext per 
JVM, 
+   * This function may be used to get 

spark git commit: [SPARK-6991] [SPARKR] Adds support for zipPartitions.

2015-04-27 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master ef82bddc1 - ca9f4ebb8


[SPARK-6991] [SPARKR] Adds support for zipPartitions.

Author: hlin09 hlin0...@gmail.com

Closes #5568 from hlin09/zipPartitions and squashes the following commits:

12c08a5 [hlin09] Fix comments
d2d32db [hlin09] Merge branch 'master' into zipPartitions
ec56d2f [hlin09] Fix test.
27655d3 [hlin09] Adds support for zipPartitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9f4ebb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9f4ebb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9f4ebb

Branch: refs/heads/master
Commit: ca9f4ebb8e510e521bf4df0331375ddb385fb9d2
Parents: ef82bdd
Author: hlin09 hlin0...@gmail.com
Authored: Mon Apr 27 15:04:37 2015 -0700
Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu
Committed: Mon Apr 27 15:04:37 2015 -0700

--
 R/pkg/NAMESPACE |  1 +
 R/pkg/R/RDD.R   | 46 
 R/pkg/R/generics.R  |  5 +++
 R/pkg/inst/tests/test_binary_function.R | 33 
 4 files changed, 85 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8028364..e077eac 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -71,6 +71,7 @@ exportMethods(
   unpersist,
   value,
   values,
+  zipPartitions,
   zipRDD,
   zipWithIndex,
   zipWithUniqueId

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/RDD.R
--
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index f90c26b..a3a0421 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -1595,3 +1595,49 @@ setMethod(intersection,
 
 keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), 
filterFunction))
   })
+
+#' Zips an RDD's partitions with one (or more) RDD(s).
+#' Same as zipPartitions in Spark.
+#' 
+#' @param ... RDDs to be zipped.
+#' @param func A function to transform zipped partitions.
+#' @return A new RDD by applying a function to the zipped partitions. 
+#' Assumes that all the RDDs have the *same number of partitions*, but 
+#' does *not* require them to have the same number of elements in each 
partition.
+#' @examples
+#'\dontrun{
+#' sc - sparkR.init()
+#' rdd1 - parallelize(sc, 1:2, 2L)  # 1, 2
+#' rdd2 - parallelize(sc, 1:4, 2L)  # 1:2, 3:4
+#' rdd3 - parallelize(sc, 1:6, 2L)  # 1:3, 4:6
+#' collect(zipPartitions(rdd1, rdd2, rdd3, 
+#'   func = function(x, y, z) { list(list(x, y, z))} ))
+#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
+#'}
+#' @rdname zipRDD
+#' @aliases zipPartitions,RDD
+setMethod(zipPartitions,
+  RDD,
+  function(..., func) {
+rrdds - list(...)
+if (length(rrdds) == 1) {
+  return(rrdds[[1]])
+}
+nPart - sapply(rrdds, numPartitions)
+if (length(unique(nPart)) != 1) {
+  stop(Can only zipPartitions RDDs which have the same number of 
partitions.)
+}
+
+rrdds - lapply(rrdds, function(rdd) {
+  mapPartitionsWithIndex(rdd, function(partIndex, part) {
+print(length(part))
+list(list(partIndex, part))
+  })
+})
+union.rdd - Reduce(unionRDD, rrdds)
+zipped.rdd - values(groupByKey(union.rdd, numPartitions = 
nPart[1]))
+res - mapPartitions(zipped.rdd, function(plist) {
+  do.call(func, plist[[1]])
+})
+res
+  })

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 34dbe84..e887293 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -217,6 +217,11 @@ setGeneric(unpersist, function(x, ...) { 
standardGeneric(unpersist) })
 #' @export
 setGeneric(zipRDD, function(x, other) { standardGeneric(zipRDD) })
 
+#' @rdname zipRDD
+#' @export
+setGeneric(zipPartitions, function(..., func) { 
standardGeneric(zipPartitions) }, 
+   signature = ...)
+
 #' @rdname zipWithIndex
 #' @seealso zipWithUniqueId
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9f4ebb/R/pkg/inst/tests/test_binary_function.R
--
diff --git a/R/pkg/inst/tests/test_binary_function.R 

spark git commit: [SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ca9f4ebb8 - b9de9e040


[SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner

Added a check to the SparkContext.union method to check that a partitioner is 
defined on all RDDs when instantiating a PartitionerAwareUnionRDD.

Author: Steven She ste...@canopylabs.com

Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits:

5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at 
least one RDD has no partitioner


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9de9e04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9de9e04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9de9e04

Branch: refs/heads/master
Commit: b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56
Parents: ca9f4eb
Author: Steven She ste...@canopylabs.com
Authored: Mon Apr 27 18:55:02 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Mon Apr 27 18:55:02 2015 -0400

--
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../spark/rdd/PartitionerAwareUnionRDD.scala|  1 +
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 21 
 3 files changed, 23 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 86269ea..ea4ddcc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1055,7 +1055,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   /** Build the union of a list of RDDs. */
   def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
 val partitioners = rdds.flatMap(_.partitioner).toSet
-if (partitioners.size == 1) {
+if (rdds.forall(_.partitioner.isDefined)  partitioners.size == 1) {
   new PartitionerAwareUnionRDD(this, rdds)
 } else {
   new UnionRDD(this, rdds)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 92b0641..7598ff6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
 var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x = new OneToOneDependency(x))) {
   require(rdds.length  0)
+  require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
 Parent RDDs have different partitioners:  + rdds.flatMap(_.partitioner))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9de9e04/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index df42faa..ef8c36a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
 assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 
2, 3, 4))
   }
 
+  test(SparkContext.union creates UnionRDD if at least one RDD has no 
partitioner) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val rddWithNoPartitioner = sc.parallelize(Seq(2-true))
+val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
+assert(unionRdd.isInstanceOf[UnionRDD[_]])
+  }
+
+  test(SparkContext.union creates PartitionAwareUnionRDD if all RDDs have 
partitioners) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
+assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
+  }
+
+  test(PartitionAwareUnionRDD raises exception if at least one RDD has no 
partitioner) {
+val rddWithPartitioner = sc.parallelize(Seq(1-true)).partitionBy(new 
HashPartitioner(1))
+val rddWithNoPartitioner = sc.parallelize(Seq(2-true))
+intercept[IllegalArgumentException] {
+  new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, 

spark git commit: [SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread

2015-04-27 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4d9e560b5 - 874a2ca93


[SPARK-7174][Core] Move calling `TaskScheduler.executorHeartbeatReceived` to 
another thread

`HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which 
is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will 
call

```Scala
blockManagerMaster.driverEndpoint.askWithReply[Boolean](
  BlockManagerHeartbeat(blockManagerId), 600 seconds)
```

finally. Even if it asks from a local Actor, it may block the current Akka 
thread. E.g., the reply may be dispatched to the same thread of the ask 
operation. So the reply cannot be processed. An extreme case is setting the 
thread number of Akka dispatch thread pool to 1.

jstack log:

```
sparkDriver-akka.actor.default-dispatcher-14 daemon prio=10 
tid=0x7f2a8c02d000 nid=0x725 waiting on condition [0x7f2b1d6d]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  0x0006197a0868 (a 
scala.concurrent.impl.Promise$CompletionLatch)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
at 
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
at 
org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
at 
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This PR moved this blocking operation to a separated thread.

Author: zsxwing zsxw...@gmail.com

Closes #5723 from zsxwing/SPARK-7174 and squashes the following commits:

98bfe48 [zsxwing] Use a single thread for checking timeout and reporting 
executorHeartbeatReceived
5b3b545 [zsxwing] Move calling 

spark git commit: [SPARK-7090] [MLLIB] Introduce LDAOptimizer to LDA to further improve extensibility

2015-04-27 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 62888a4de - 4d9e560b5


[SPARK-7090] [MLLIB] Introduce LDAOptimizer to LDA to further improve 
extensibility

jira: https://issues.apache.org/jira/browse/SPARK-7090

LDA was implemented with extensibility in mind. And with the development of 
OnlineLDA and Gibbs Sampling, we are collecting more detailed requirements from 
different algorithms.
As Joseph Bradley jkbradley proposed in 
https://github.com/apache/spark/pull/4807 and with some further discussion, 
we'd like to adjust the code structure a little to present the common interface 
and extension point clearly.
Basically class LDA would be a common entrance for LDA computing. And each LDA 
object will refer to a LDAOptimizer for the concrete algorithm implementation. 
Users can customize LDAOptimizer with specific parameters and assign it to LDA.

Concrete changes:

1. Add a trait `LDAOptimizer`, which defines the common iterface for concrete 
implementations. Each subClass is a wrapper for a specific LDA algorithm.

2. Move EMOptimizer to file LDAOptimizer and inherits from LDAOptimizer, rename 
to EMLDAOptimizer. (in case a more generic EMOptimizer comes in the future)
-adjust the constructor of EMOptimizer, since all the parameters should 
be passed in through initialState method. This can avoid unwanted confusion or 
overwrite.
-move the code from LDA.initalState to initalState of EMLDAOptimizer

3. Add property ldaOptimizer to LDA and its getter/setter, and EMLDAOptimizer 
is the default Optimizer.

4. Change the return type of LDA.run from DistributedLDAModel to LDAModel.

Further work:
add OnlineLDAOptimizer and other possible Optimizers once ready.

Author: Yuhao Yang hhb...@gmail.com

Closes #5661 from hhbyyh/ldaRefactor and squashes the following commits:

0e2e006 [Yuhao Yang] respond to review comments
08a45da [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaRefactor
e756ce4 [Yuhao Yang] solve mima exception
d74fd8f [Yuhao Yang] Merge remote-tracking branch 'upstream/master' into 
ldaRefactor
0bb8400 [Yuhao Yang] refactor LDA with Optimizer
ec2f857 [Yuhao Yang] protoptype for discussion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d9e560b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d9e560b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d9e560b

Branch: refs/heads/master
Commit: 4d9e560b5470029143926827b1cb9d72a0bfbeff
Parents: 62888a4
Author: Yuhao Yang hhb...@gmail.com
Authored: Mon Apr 27 19:02:51 2015 -0700
Committer: Joseph K. Bradley jos...@databricks.com
Committed: Mon Apr 27 19:02:51 2015 -0700

--
 .../spark/examples/mllib/JavaLDAExample.java|   2 +-
 .../spark/examples/mllib/LDAExample.scala   |   4 +-
 .../org/apache/spark/mllib/clustering/LDA.scala | 181 
 .../spark/mllib/clustering/LDAModel.scala   |   2 +-
 .../spark/mllib/clustering/LDAOptimizer.scala   | 210 +++
 .../spark/mllib/clustering/JavaLDASuite.java|   2 +-
 .../spark/mllib/clustering/LDASuite.scala   |   2 +-
 project/MimaExcludes.scala  |   4 +
 8 files changed, 256 insertions(+), 151 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d9e560b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java
index 36207ae..fd53c81 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLDAExample.java
@@ -58,7 +58,7 @@ public class JavaLDAExample {
 corpus.cache();
 
 // Cluster the documents into three topics using LDA
-DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus);
+DistributedLDAModel ldaModel = (DistributedLDAModel)new 
LDA().setK(3).run(corpus);
 
 // Output topics. Each is a distribution over words (matching word count 
vectors)
 System.out.println(Learned topics (as distributions over vocab of  + 
ldaModel.vocabSize()

http://git-wip-us.apache.org/repos/asf/spark/blob/4d9e560b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
index 08a9359..a185039 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala
@@ -26,7 

spark git commit: [SPARK-7145] [CORE] commons-lang (2.x) classes used instead of commons-lang3 (3.x); commons-io used without dependency

2015-04-27 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 5d45e1f60 - ab5adb7a9


[SPARK-7145] [CORE] commons-lang (2.x) classes used instead of commons-lang3 
(3.x); commons-io used without dependency

Remove use of commons-lang in favor of commons-lang3 classes; remove commons-io 
use in favor of Guava

Author: Sean Owen so...@cloudera.com

Closes #5703 from srowen/SPARK-7145 and squashes the following commits:

21fbe03 [Sean Owen] Remove use of commons-lang in favor of commons-lang3 
classes; remove commons-io use in favor of Guava


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab5adb7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab5adb7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab5adb7a

Branch: refs/heads/master
Commit: ab5adb7a973eec9d95c7575c864cba9f8d83a0fd
Parents: 5d45e1f
Author: Sean Owen so...@cloudera.com
Authored: Mon Apr 27 19:50:55 2015 -0400
Committer: Sean Owen so...@cloudera.com
Committed: Mon Apr 27 19:50:55 2015 -0400

--
 .../src/test/scala/org/apache/spark/FileServerSuite.scala |  7 +++
 .../apache/spark/metrics/InputOutputMetricsSuite.scala|  4 ++--
 .../network/netty/NettyBlockTransferSecuritySuite.scala   | 10 +++---
 external/flume-sink/pom.xml   |  4 
 .../streaming/flume/sink/SparkAvroCallbackHandler.scala   |  4 ++--
 .../main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala|  6 +-
 .../sql/hive/thriftserver/AbstractSparkSQLDriver.scala|  4 ++--
 .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala   |  8 +++-
 .../apache/spark/sql/hive/execution/UDFListString.java|  6 +++---
 .../apache/spark/sql/hive/MetastoreDataSourcesSuite.scala |  9 -
 10 files changed, 35 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab5adb7a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index a69e9b7..c0439f9 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -22,8 +22,7 @@ import java.net.URI
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl.SSLException
 
-import com.google.common.io.ByteStreams
-import org.apache.commons.io.{FileUtils, IOUtils}
+import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.lang3.RandomUtils
 import org.scalatest.FunSuite
 
@@ -239,7 +238,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): 
Unit = {
 val randomContent = RandomUtils.nextBytes(100)
 val file = File.createTempFile(FileServerSuite, sslTests, tmpDir)
-FileUtils.writeByteArrayToFile(file, randomContent)
+Files.write(randomContent, file)
 server.addFile(file)
 
 val uri = new URI(server.serverUri + /files/ + file.getName)
@@ -254,7 +253,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   Utils.setupSecureURLConnection(connection, sm)
 }
 
-val buf = IOUtils.toByteArray(connection.getInputStream)
+val buf = ByteStreams.toByteArray(connection.getInputStream)
 assert(buf === randomContent)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ab5adb7a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index 190b08d..ef3e213 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -21,7 +21,7 @@ import java.io.{File, FileWriter, PrintWriter}
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.commons.lang.math.RandomUtils
+import org.apache.commons.lang3.RandomUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{LongWritable, Text}
@@ -60,7 +60,7 @@ class InputOutputMetricsSuite extends FunSuite with 
SharedSparkContext
 tmpFile = new File(testTempDir, getClass.getSimpleName + .txt)
 val pw = new PrintWriter(new FileWriter(tmpFile))
 for (x - 1 to numRecords) {
-  pw.println(RandomUtils.nextInt(numBuckets))
+  pw.println(RandomUtils.nextInt(0, numBuckets))
 }
 pw.close()
 


spark git commit: [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact

2015-04-27 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 82bb7fd41 - 998aac21f


[SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact

turned on hive-thriftserver profile in release script

Author: Misha Chernetsov chernet...@gmail.com

Closes #5429 from chernetsov/master and squashes the following commits:

9cc36af [Misha Chernetsov] [SPARK-4925] Publish Spark SQL hive-thriftserver 
maven artifact turned on hive-thriftserver profile in release script for scala 
2.10


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/998aac21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/998aac21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/998aac21

Branch: refs/heads/master
Commit: 998aac21f0a0588a70f8cf123ae4080163c612fb
Parents: 82bb7fd
Author: Misha Chernetsov chernet...@gmail.com
Authored: Mon Apr 27 11:27:56 2015 -0700
Committer: Patrick Wendell patr...@databricks.com
Committed: Mon Apr 27 11:27:56 2015 -0700

--
 dev/create-release/create-release.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/998aac21/dev/create-release/create-release.sh
--
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index b5a67dd..3dbb35f 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -119,7 +119,7 @@ if [[ ! $@ =~ --skip-publish ]]; then
   rm -rf $SPARK_REPO
 
   build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
--Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
+-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl \
 clean install
 
   ./dev/change-version-to-2.11.sh


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact

2015-04-27 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 c4470b93f - 81de30ae5


[SPARK-4925] Publish Spark SQL hive-thriftserver maven artifact

turned on hive-thriftserver profile in release script

Author: Misha Chernetsov chernet...@gmail.com

Closes #5429 from chernetsov/master and squashes the following commits:

9cc36af [Misha Chernetsov] [SPARK-4925] Publish Spark SQL hive-thriftserver 
maven artifact turned on hive-thriftserver profile in release script for scala 
2.10

(cherry picked from commit 998aac21f0a0588a70f8cf123ae4080163c612fb)
Signed-off-by: Patrick Wendell patr...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81de30ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81de30ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81de30ae

Branch: refs/heads/branch-1.3
Commit: 81de30ae51d3858c30f748994e9249700847fcde
Parents: c4470b9
Author: Misha Chernetsov chernet...@gmail.com
Authored: Mon Apr 27 11:27:56 2015 -0700
Committer: Patrick Wendell patr...@databricks.com
Committed: Mon Apr 27 11:28:07 2015 -0700

--
 dev/create-release/create-release.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81de30ae/dev/create-release/create-release.sh
--
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index 6f87fcd..0403594 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -117,7 +117,7 @@ if [[ ! $@ =~ --skip-publish ]]; then
   echo Created Nexus staging repository: $staged_repo_id
 
   build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
--Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
+-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl \
 clean install
 
   ./dev/change-version-to-2.11.sh


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org