[1/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

2015-02-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 f8f9a64eb -> 2bd33ce62


http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 245161d..cb405f5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest {
 |   getStruct(1).f5 FROM src LIMIT 1
   """.stripMargin).head() === Row(1, 2, 3, 4, 5))
   }
-  
+
   test("SPARK-4785 When called with arguments referring column fields, PMOD 
throws NPE") {
 checkAnswer(
   sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"),
@@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest {
   test("SPARK-2693 udaf aggregates test") {
 checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"),
   sql("SELECT max(key) FROM src").collect().toSeq)
-  
+
 checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"),
   sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq)
   }
@@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest {
   test("Generic UDAF aggregates") {
 checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.9)) FROM src 
LIMIT 1"),
   sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)
-  
+
 checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src 
LIMIT 1"),
   sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq)
}
-  
+
   test("UDFIntegerToString") {
 val testData = TestHive.sparkContext.parallelize(
-  IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF
+  IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
 testData.registerTempTable("integerTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS 
'${classOf[UDFIntegerToString].getName}'")
@@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest {
 val testData = TestHive.sparkContext.parallelize(
   ListListIntCaseClass(Nil) ::
   ListListIntCaseClass(Seq((1, 2, 3))) ::
-  ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF
+  ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
 testData.registerTempTable("listListIntTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS 
'${classOf[UDFListListInt].getName}'")
@@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest {
   test("UDFListString") {
 val testData = TestHive.sparkContext.parallelize(
   ListStringCaseClass(Seq("a", "b", "c")) ::
-  ListStringCaseClass(Seq("d", "e")) :: Nil).toDF
+  ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
 testData.registerTempTable("listStringTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS 
'${classOf[UDFListString].getName}'")
@@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest {
 
   test("UDFStringString") {
 val testData = TestHive.sparkContext.parallelize(
-  StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF
+  StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
 testData.registerTempTable("stringTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS 
'${classOf[UDFStringString].getName}'")
@@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest {
   ListListIntCaseClass(Nil) ::
   ListListIntCaseClass(Seq((1, 2, 3))) ::
   ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
-  Nil).toDF
+  Nil).toDF()
 testData.registerTempTable("TwoListTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS 
'${classOf[UDFTwoListList].getName}'")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

2015-02-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 3912d3324 -> 61ab08549


http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index 245161d..cb405f5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest {
 |   getStruct(1).f5 FROM src LIMIT 1
   """.stripMargin).head() === Row(1, 2, 3, 4, 5))
   }
-  
+
   test("SPARK-4785 When called with arguments referring column fields, PMOD 
throws NPE") {
 checkAnswer(
   sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"),
@@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest {
   test("SPARK-2693 udaf aggregates test") {
 checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"),
   sql("SELECT max(key) FROM src").collect().toSeq)
-  
+
 checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"),
   sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq)
   }
@@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest {
   test("Generic UDAF aggregates") {
 checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.9)) FROM src 
LIMIT 1"),
   sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq)
-  
+
 checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src 
LIMIT 1"),
   sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq)
}
-  
+
   test("UDFIntegerToString") {
 val testData = TestHive.sparkContext.parallelize(
-  IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF
+  IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF()
 testData.registerTempTable("integerTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS 
'${classOf[UDFIntegerToString].getName}'")
@@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest {
 val testData = TestHive.sparkContext.parallelize(
   ListListIntCaseClass(Nil) ::
   ListListIntCaseClass(Seq((1, 2, 3))) ::
-  ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF
+  ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF()
 testData.registerTempTable("listListIntTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS 
'${classOf[UDFListListInt].getName}'")
@@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest {
   test("UDFListString") {
 val testData = TestHive.sparkContext.parallelize(
   ListStringCaseClass(Seq("a", "b", "c")) ::
-  ListStringCaseClass(Seq("d", "e")) :: Nil).toDF
+  ListStringCaseClass(Seq("d", "e")) :: Nil).toDF()
 testData.registerTempTable("listStringTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS 
'${classOf[UDFListString].getName}'")
@@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest {
 
   test("UDFStringString") {
 val testData = TestHive.sparkContext.parallelize(
-  StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF
+  StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF()
 testData.registerTempTable("stringTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS 
'${classOf[UDFStringString].getName}'")
@@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest {
   ListListIntCaseClass(Nil) ::
   ListListIntCaseClass(Seq((1, 2, 3))) ::
   ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) ::
-  Nil).toDF
+  Nil).toDF()
 testData.registerTempTable("TwoListTable")
 
 sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS 
'${classOf[UDFTwoListList].getName}'")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

2015-02-17 Thread rxin
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

Although we've migrated to the DataFrame API, lots of code still uses `rdd` or 
`srdd` as local variable names. This PR tries to address these naming 
inconsistencies and some other minor DataFrame related style issues.


[https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670)


Author: Cheng Lian 

Closes #4670 from liancheng/df-cleanup and squashes the following commits:

3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls

(cherry picked from commit 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bd33ce6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bd33ce6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bd33ce6

Branch: refs/heads/branch-1.3
Commit: 2bd33ce62b4c43be4dbac6b46f979c36a0e9aba8
Parents: f8f9a64
Author: Cheng Lian 
Authored: Tue Feb 17 23:36:20 2015 -0800
Committer: Reynold Xin 
Committed: Tue Feb 17 23:36:30 2015 -0800

--
 .../examples/ml/CrossValidatorExample.scala |  2 +-
 .../spark/examples/ml/DeveloperApiExample.scala |  4 +-
 .../apache/spark/examples/ml/MovieLensALS.scala |  6 +--
 .../spark/examples/ml/SimpleParamsExample.scala |  6 +--
 .../ml/SimpleTextClassificationPipeline.scala   |  4 +-
 .../spark/examples/mllib/DatasetExample.scala   |  2 +-
 .../apache/spark/examples/sql/RDDRelation.scala |  2 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |  2 +-
 .../spark/mllib/classification/NaiveBayes.scala |  2 +-
 .../impl/GLMClassificationModel.scala   |  2 +-
 .../regression/impl/GLMRegressionModel.scala|  2 +-
 .../mllib/tree/model/DecisionTreeModel.scala|  2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../spark/ml/recommendation/ALSSuite.scala  |  4 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  4 +-
 .../apache/spark/sql/parquet/ParquetTest.scala  |  6 +--
 .../org/apache/spark/sql/CachedTableSuite.scala | 14 +++---
 .../org/apache/spark/sql/DataFrameSuite.scala   | 26 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  8 +--
 .../scala/org/apache/spark/sql/QueryTest.scala  | 46 -
 .../org/apache/spark/sql/SQLQuerySuite.scala|  6 +--
 .../sql/ScalaReflectionRelationSuite.scala  | 10 ++--
 .../scala/org/apache/spark/sql/TestData.scala   | 48 ++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 46 -
 .../spark/sql/jdbc/MySQLIntegration.scala   | 53 +---
 .../spark/sql/jdbc/PostgresIntegration.scala| 30 ++-
 .../spark/sql/parquet/ParquetFilterSuite.scala  | 40 +++
 .../spark/sql/parquet/ParquetIOSuite.scala  | 28 +--
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  2 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  4 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala |  4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala|  8 +--
 .../apache/spark/sql/hive/StatisticsSuite.scala | 38 +++---
 .../sql/hive/execution/HiveQuerySuite.scala | 20 
 .../hive/execution/HiveResolutionSuite.scala|  6 +--
 .../spark/sql/hive/execution/HiveUdfSuite.scala | 18 +++
 37 files changed, 250 insertions(+), 259 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
index f024194..7ab892c 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
@@ -90,7 +90,7 @@ object CrossValidatorExample {
 crossval.setNumFolds(2) // Use 3+ in practice
 
 // Run cross-validation, and choose the best set of parameters.
-val cvModel = crossval.fit(training.toDF)
+val cvModel = crossval.fit(training.toDF())
 
 // Prepare test documents, which are unlabeled.
 val test = sc.parallelize(Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
index 54aadd2..df26798 100644
--- 

[2/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

2015-02-17 Thread rxin
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls

Although we've migrated to the DataFrame API, lots of code still uses `rdd` or 
`srdd` as local variable names. This PR tries to address these naming 
inconsistencies and some other minor DataFrame related style issues.


[https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670)


Author: Cheng Lian 

Closes #4670 from liancheng/df-cleanup and squashes the following commits:

3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61ab0854
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61ab0854
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61ab0854

Branch: refs/heads/master
Commit: 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa
Parents: 3912d33
Author: Cheng Lian 
Authored: Tue Feb 17 23:36:20 2015 -0800
Committer: Reynold Xin 
Committed: Tue Feb 17 23:36:20 2015 -0800

--
 .../examples/ml/CrossValidatorExample.scala |  2 +-
 .../spark/examples/ml/DeveloperApiExample.scala |  4 +-
 .../apache/spark/examples/ml/MovieLensALS.scala |  6 +--
 .../spark/examples/ml/SimpleParamsExample.scala |  6 +--
 .../ml/SimpleTextClassificationPipeline.scala   |  4 +-
 .../spark/examples/mllib/DatasetExample.scala   |  2 +-
 .../apache/spark/examples/sql/RDDRelation.scala |  2 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |  2 +-
 .../spark/mllib/classification/NaiveBayes.scala |  2 +-
 .../impl/GLMClassificationModel.scala   |  2 +-
 .../regression/impl/GLMRegressionModel.scala|  2 +-
 .../mllib/tree/model/DecisionTreeModel.scala|  2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../spark/ml/recommendation/ALSSuite.scala  |  4 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  4 +-
 .../apache/spark/sql/parquet/ParquetTest.scala  |  6 +--
 .../org/apache/spark/sql/CachedTableSuite.scala | 14 +++---
 .../org/apache/spark/sql/DataFrameSuite.scala   | 26 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  8 +--
 .../scala/org/apache/spark/sql/QueryTest.scala  | 46 -
 .../org/apache/spark/sql/SQLQuerySuite.scala|  6 +--
 .../sql/ScalaReflectionRelationSuite.scala  | 10 ++--
 .../scala/org/apache/spark/sql/TestData.scala   | 48 ++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 46 -
 .../spark/sql/jdbc/MySQLIntegration.scala   | 53 +---
 .../spark/sql/jdbc/PostgresIntegration.scala| 30 ++-
 .../spark/sql/parquet/ParquetFilterSuite.scala  | 40 +++
 .../spark/sql/parquet/ParquetIOSuite.scala  | 28 +--
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  2 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  4 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala |  4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala|  8 +--
 .../apache/spark/sql/hive/StatisticsSuite.scala | 38 +++---
 .../sql/hive/execution/HiveQuerySuite.scala | 20 
 .../hive/execution/HiveResolutionSuite.scala|  6 +--
 .../spark/sql/hive/execution/HiveUdfSuite.scala | 18 +++
 37 files changed, 250 insertions(+), 259 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
index f024194..7ab892c 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
@@ -90,7 +90,7 @@ object CrossValidatorExample {
 crossval.setNumFolds(2) // Use 3+ in practice
 
 // Run cross-validation, and choose the best set of parameters.
-val cvModel = crossval.fit(training.toDF)
+val cvModel = crossval.fit(training.toDF())
 
 // Prepare test documents, which are unlabeled.
 val test = sc.parallelize(Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
index 54aadd2..df26798 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
+++ 
b/examples/src/

spark git commit: [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

2015-02-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 6e82c46bf -> f8f9a64eb


[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

The test was incorrect. Instead of counting the number of records, it counted 
the number of partitions of RDD generated by DStream. Which is not its 
intention. I will be testing this patch multiple times to understand its 
flakiness.

PS: This was caused by my refactoring in 
https://github.com/apache/spark/pull/4384/

koeninger check it out.

Author: Tathagata Das 

Closes #4597 from tdas/kafka-flaky-test and squashes the following commits:

d236235 [Tathagata Das] Unignored last test.
e9a1820 [Tathagata Das] fix test

(cherry picked from commit 3912d332464dcd124c60b734724c34d9742466a4)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8f9a64e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8f9a64e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8f9a64e

Branch: refs/heads/branch-1.3
Commit: f8f9a64ebdfe581d62773a6276f66c75d4ba43e1
Parents: 6e82c46
Author: Tathagata Das 
Authored: Tue Feb 17 22:44:16 2015 -0800
Committer: Tathagata Das 
Committed: Tue Feb 17 22:44:27 2015 -0800

--
 .../kafka/DirectKafkaStreamSuite.scala  | 28 +++-
 1 file changed, 16 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f8f9a64e/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
--
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 9260944..17ca9d1 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
 import kafka.serializer.StringDecoder
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
 
 class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("basic stream receiving with multiple topics and smallest starting 
offset") {
+  test("basic stream receiving with multiple topics and smallest starting 
offset") {
 val topics = Set("basic1", "basic2", "basic3")
 val data = Map("a" -> 7, "b" -> 9)
 topics.foreach { t =>
   createTopic(t)
   sendMessages(t, data)
 }
+val totalSent = data.values.sum * topics.size
 val kafkaParams = Map(
   "metadata.broker.list" -> s"$brokerAddress",
   "auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
 ssc, kafkaParams, topics)
 }
-var total = 0L
+
+val allReceived = new ArrayBuffer[(String, String)]
 
 stream.foreachRDD { rdd =>
 // Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   collected.foreach { case (partSize, rangeSize) =>
 assert(partSize === rangeSize, "offset ranges are wrong")
   }
-  total += collected.size  // Add up all the collected items
 }
+stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
 ssc.start()
 eventually(timeout(2.milliseconds), interval(200.milliseconds)) {
-  assert(total === data.values.sum * topics.size, "didn't get all 
messages")
+  assert(allReceived.size === totalSent,
+"didn't get expected number of messages, messages:\n" + 
allReceived.mkString("\n"))
 }
 ssc.stop()
   }
 
-  ignore("receiving from largest starting offset") {

spark git commit: [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

2015-02-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e50934f11 -> 3912d3324


[SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite

The test was incorrect. Instead of counting the number of records, it counted 
the number of partitions of RDD generated by DStream. Which is not its 
intention. I will be testing this patch multiple times to understand its 
flakiness.

PS: This was caused by my refactoring in 
https://github.com/apache/spark/pull/4384/

koeninger check it out.

Author: Tathagata Das 

Closes #4597 from tdas/kafka-flaky-test and squashes the following commits:

d236235 [Tathagata Das] Unignored last test.
e9a1820 [Tathagata Das] fix test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3912d332
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3912d332
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3912d332

Branch: refs/heads/master
Commit: 3912d332464dcd124c60b734724c34d9742466a4
Parents: e50934f
Author: Tathagata Das 
Authored: Tue Feb 17 22:44:16 2015 -0800
Committer: Tathagata Das 
Committed: Tue Feb 17 22:44:16 2015 -0800

--
 .../kafka/DirectKafkaStreamSuite.scala  | 28 +++-
 1 file changed, 16 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3912d332/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
--
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 9260944..17ca9d1 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
 import java.io.File
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
 import kafka.serializer.StringDecoder
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.{DStream, InputDStream}
+import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.util.Utils
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
 
 class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   }
 
 
-  ignore("basic stream receiving with multiple topics and smallest starting 
offset") {
+  test("basic stream receiving with multiple topics and smallest starting 
offset") {
 val topics = Set("basic1", "basic2", "basic3")
 val data = Map("a" -> 7, "b" -> 9)
 topics.foreach { t =>
   createTopic(t)
   sendMessages(t, data)
 }
+val totalSent = data.values.sum * topics.size
 val kafkaParams = Map(
   "metadata.broker.list" -> s"$brokerAddress",
   "auto.offset.reset" -> "smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](
 ssc, kafkaParams, topics)
 }
-var total = 0L
+
+val allReceived = new ArrayBuffer[(String, String)]
 
 stream.foreachRDD { rdd =>
 // Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
   collected.foreach { case (partSize, rangeSize) =>
 assert(partSize === rangeSize, "offset ranges are wrong")
   }
-  total += collected.size  // Add up all the collected items
 }
+stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
 ssc.start()
 eventually(timeout(2.milliseconds), interval(200.milliseconds)) {
-  assert(total === data.values.sum * topics.size, "didn't get all 
messages")
+  assert(allReceived.size === totalSent,
+"didn't get expected number of messages, messages:\n" + 
allReceived.mkString("\n"))
 }
 ssc.stop()
   }
 
-  ignore("receiving from largest starting offset") {
+  test("receiving from largest starting offset") {
 val topic = "largest"
 val topicPartition = Top

spark git commit: [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master d5f12bfe8 -> e50934f11


[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

JIRA: https://issues.apache.org/jira/browse/SPARK-5723

Author: Yin Huai 

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust 

Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following 
commits:

a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
defaultCTASFileFormat
ad2b07d [Yin Huai] Update tests and error messages.
8af5b2a [Yin Huai] Update conf key and unit test.
5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when 
no storage format/handler is specified.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e50934f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e50934f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e50934f1

Branch: refs/heads/master
Commit: e50934f11e1e3ded21a631e5ab69db3c79467137
Parents: d5f12bf
Author: Yin Huai 
Authored: Tue Feb 17 18:14:33 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 18:14:33 2015 -0800

--
 .../org/apache/spark/sql/hive/HiveContext.scala | 15 
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 75 
 .../spark/sql/hive/execution/commands.scala | 17 +++--
 .../sql/hive/MetastoreDataSourcesSuite.scala|  6 +-
 .../sql/hive/execution/SQLQuerySuite.scala  | 70 ++
 5 files changed, 158 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6c55bc6..d3365b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[sql] def convertMetastoreParquet: Boolean =
 getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
 
+  /**
+   * When true, a table created by a Hive CTAS statement (no USING clause) 
will be
+   * converted to a data source table, using the data source set by 
spark.sql.sources.default.
+   * The table in CTAS statement will be converted when it meets any of the 
following conditions:
+   *   - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File 
Format (STORED AS), or
+   * a Storage Hanlder (STORED BY), and the value of 
hive.default.fileformat in hive-site.xml
+   * is either TextFile or SequenceFile.
+   *   - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the 
file format and no SerDe
+   * is specified (no ROW FORMAT SERDE clause).
+   *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as 
the file format
+   * and no SerDe is specified (no ROW FORMAT SERDE clause).
+   */
+  protected[sql] def convertCTAS: Boolean =
+getConf("spark.sql.hive.convertCTAS", "false").toBoolean
+
   override protected[sql] def executePlan(plan: LogicalPlan): 
this.QueryExecution =
 new this.QueryExecution(plan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cfd6f27..f7ad2ef 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, 
SerDeException}
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext}
+import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, 
OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => 
ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, 
ResolvedData

spark git commit: [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 2ab0ba04f -> 6e82c46bf


[SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.

JIRA: https://issues.apache.org/jira/browse/SPARK-5723

Author: Yin Huai 

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust 

Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following 
commits:

a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
defaultCTASFileFormat
ad2b07d [Yin Huai] Update tests and error messages.
8af5b2a [Yin Huai] Update conf key and unit test.
5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when 
no storage format/handler is specified.

(cherry picked from commit e50934f11e1e3ded21a631e5ab69db3c79467137)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e82c46b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e82c46b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e82c46b

Branch: refs/heads/branch-1.3
Commit: 6e82c46bf6cdfd227b5680f114e029219204e641
Parents: 2ab0ba0
Author: Yin Huai 
Authored: Tue Feb 17 18:14:33 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 18:14:45 2015 -0800

--
 .../org/apache/spark/sql/hive/HiveContext.scala | 15 
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 75 
 .../spark/sql/hive/execution/commands.scala | 17 +++--
 .../sql/hive/MetastoreDataSourcesSuite.scala|  6 +-
 .../sql/hive/execution/SQLQuerySuite.scala  | 70 ++
 5 files changed, 158 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e82c46b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6c55bc6..d3365b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[sql] def convertMetastoreParquet: Boolean =
 getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
 
+  /**
+   * When true, a table created by a Hive CTAS statement (no USING clause) 
will be
+   * converted to a data source table, using the data source set by 
spark.sql.sources.default.
+   * The table in CTAS statement will be converted when it meets any of the 
following conditions:
+   *   - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File 
Format (STORED AS), or
+   * a Storage Hanlder (STORED BY), and the value of 
hive.default.fileformat in hive-site.xml
+   * is either TextFile or SequenceFile.
+   *   - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the 
file format and no SerDe
+   * is specified (no ROW FORMAT SERDE clause).
+   *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as 
the file format
+   * and no SerDe is specified (no ROW FORMAT SERDE clause).
+   */
+  protected[sql] def convertCTAS: Boolean =
+getConf("spark.sql.hive.convertCTAS", "false").toBoolean
+
   override protected[sql] def executePlan(plan: LogicalPlan): 
this.QueryExecution =
 new this.QueryExecution(plan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6e82c46b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cfd6f27..f7ad2ef 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, 
SerDeException}
 import org.apache.hadoop.util.ReflectionUtils
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{AnalysisException, SQLContext}
+import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, 
OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => 
P

Git Push Summary

2015-02-17 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.3.0-rc1 [created] f97b0d4a6

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: Preparing Spark release v1.3.0-rc1

2015-02-17 Thread pwendell
Preparing Spark release v1.3.0-rc1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f97b0d4a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f97b0d4a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f97b0d4a

Branch: refs/heads/branch-1.3
Commit: f97b0d4a6b26504916816d7aefcf3132cd1da6c2
Parents: e8284b2
Author: Patrick Wendell 
Authored: Wed Feb 18 01:52:06 2015 +
Committer: Patrick Wendell 
Committed: Wed Feb 18 01:52:06 2015 +

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka-assembly/pom.xml   | 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/java8-tests/pom.xml| 2 +-
 extras/kinesis-asl/pom.xml| 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 network/common/pom.xml| 2 +-
 network/shuffle/pom.xml   | 2 +-
 network/yarn/pom.xml  | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 28 files changed, 28 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 4122e24..523e2e8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 510e926..4f73cf7 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index c993781..5612149 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 8caad2b..f7d6030 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 0706f1e..45aa775 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 1f26813..455304f 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index 503fc12..bcae38d 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/kafka/pom.xml
--
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index af96138..74ddc10 100644
---

[1/2] spark git commit: Preparing development version 1.3.1-SNAPSHOT

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e8284b29d -> 2ab0ba04f


Preparing development version 1.3.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ab0ba04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ab0ba04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ab0ba04

Branch: refs/heads/branch-1.3
Commit: 2ab0ba04f66683be25cbe0e83cecf2bdcb0f13ba
Parents: f97b0d4
Author: Patrick Wendell 
Authored: Wed Feb 18 01:52:06 2015 +
Committer: Patrick Wendell 
Committed: Wed Feb 18 01:52:06 2015 +

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka-assembly/pom.xml   | 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/java8-tests/pom.xml| 2 +-
 extras/kinesis-asl/pom.xml| 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 network/common/pom.xml| 2 +-
 network/shuffle/pom.xml   | 2 +-
 network/yarn/pom.xml  | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 28 files changed, 28 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 523e2e8..7752b41 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 4f73cf7..4a13c58 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 5612149..aca0f58 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index f7d6030..c424592 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 45aa775..ef960a8 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 455304f..f01d6e8 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index bcae38d..bcd2ca2 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/kafka/pom.xml
--

spark git commit: [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 7320605ad -> e8284b29d


[SPARK-5875][SQL]logical.Project should not be resolved if it contains 
aggregates or generators

https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the 
bug and explain the root cause.

Author: Yin Huai 

Closes #4663 from yhuai/projectResolved and squashes the following commits:

472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or 
Generator, it's resolved field should be false.

(cherry picked from commit d5f12bfe8f0a98d6fee114bb24376668ebe2898e)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8284b29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8284b29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8284b29

Branch: refs/heads/branch-1.3
Commit: e8284b29df0445bdf92f5ea2c74402a111718792
Parents: 7320605
Author: Yin Huai 
Authored: Tue Feb 17 17:50:39 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 17:50:51 2015 -0800

--
 .../catalyst/plans/logical/basicOperators.scala | 10 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 13 +++-
 .../sql/hive/execution/SQLQuerySuite.scala  | 32 +++-
 3 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93..89544ad 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
   def output = projectList.map(_.toAttribute)
+
+  override lazy val resolved: Boolean = {
+val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+case agg: AggregateExpression => agg
+case generator: Generator => generator
+  }.nonEmpty
+)
+
+!expressions.exists(!_.resolved) && childrenResolved && 
!containsAggregatesOrGenerators
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651..aec7847 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
 assert(caseInsensitiveAnalyze(plan).resolved)
   }
 
+  test("check project's resolved") {
+assert(Project(testRelation.output, testRelation).resolved)
+
+assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = 
true)())
+assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), 
testRelation).resolved)
+  }
+
   test("analyze project") {
 assert(
   caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), 
testRelation)) ===

http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8d9eec..ae03bc5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/h

spark git commit: [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master a51fc7ef9 -> d5f12bfe8


[SPARK-5875][SQL]logical.Project should not be resolved if it contains 
aggregates or generators

https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the 
bug and explain the root cause.

Author: Yin Huai 

Closes #4663 from yhuai/projectResolved and squashes the following commits:

472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or 
Generator, it's resolved field should be false.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5f12bfe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5f12bfe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5f12bfe

Branch: refs/heads/master
Commit: d5f12bfe8f0a98d6fee114bb24376668ebe2898e
Parents: a51fc7e
Author: Yin Huai 
Authored: Tue Feb 17 17:50:39 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 17:50:39 2015 -0800

--
 .../catalyst/plans/logical/basicOperators.scala | 10 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 13 +++-
 .../sql/hive/execution/SQLQuerySuite.scala  | 32 +++-
 3 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 9628e93..89544ad 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,6 +23,16 @@ import org.apache.spark.sql.types._
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
   def output = projectList.map(_.toAttribute)
+
+  override lazy val resolved: Boolean = {
+val containsAggregatesOrGenerators = projectList.exists ( _.collect {
+case agg: AggregateExpression => agg
+case generator: Generator => generator
+  }.nonEmpty
+)
+
+!expressions.exists(!_.resolved) && childrenResolved && 
!containsAggregatesOrGenerators
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index e70c651..aec7847 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
 
@@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
 assert(caseInsensitiveAnalyze(plan).resolved)
   }
 
+  test("check project's resolved") {
+assert(Project(testRelation.output, testRelation).resolved)
+
+assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved)
+
+val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = 
true)())
+assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved)
+
+assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), 
testRelation).resolved)
+  }
+
   test("analyze project") {
 assert(
   caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), 
testRelation)) ===

http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e8d9eec..ae03bc5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
-import or

[1/2] spark git commit: Revert "Preparing development version 1.3.1-SNAPSHOT"

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 7e5e4d82b -> 7320605ad


Revert "Preparing development version 1.3.1-SNAPSHOT"

This reverts commit e57c81b8c1a6581c2588973eaf30d3c7ae90ed0c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/932ae4d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/932ae4d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/932ae4d4

Branch: refs/heads/branch-1.3
Commit: 932ae4d4a05b6587d1564f801f8670b63a10c93b
Parents: 7e5e4d8
Author: Patrick Wendell 
Authored: Tue Feb 17 17:48:43 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:48:43 2015 -0800

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka-assembly/pom.xml   | 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/java8-tests/pom.xml| 2 +-
 extras/kinesis-asl/pom.xml| 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 network/common/pom.xml| 2 +-
 network/shuffle/pom.xml   | 2 +-
 network/yarn/pom.xml  | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 28 files changed, 28 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 7752b41..523e2e8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 4a13c58..4f73cf7 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index aca0f58..5612149 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index c424592..f7d6030 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index ef960a8..45aa775 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index f01d6e8..455304f 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index bcd2ca2..bcae38d 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.1-SNAPSHOT
+1.3.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/kafka/pom.xm

[2/2] spark git commit: Revert "Preparing Spark release v1.3.0-snapshot1"

2015-02-17 Thread pwendell
Revert "Preparing Spark release v1.3.0-snapshot1"

This reverts commit d97bfc6f28ec4b7acfb36410c7c167d8d3c145ec.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7320605a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7320605a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7320605a

Branch: refs/heads/branch-1.3
Commit: 7320605ad2e6ad6a94db96fe3fbcbfee349449ef
Parents: 932ae4d
Author: Patrick Wendell 
Authored: Tue Feb 17 17:48:47 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:48:47 2015 -0800

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka-assembly/pom.xml   | 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/java8-tests/pom.xml| 2 +-
 extras/kinesis-asl/pom.xml| 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 network/common/pom.xml| 2 +-
 network/shuffle/pom.xml   | 2 +-
 network/yarn/pom.xml  | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 28 files changed, 28 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 523e2e8..4122e24 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 4f73cf7..510e926 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 5612149..c993781 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index f7d6030..8caad2b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 45aa775..0706f1e 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 455304f..1f26813 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/kafka-assembly/pom.xml
--
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index bcae38d..503fc12 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.3.0
+1.3.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/kafka/pom.xml
--
diff --git a/ext

spark git commit: [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 07a401a7b -> 7e5e4d82b


[SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()

This method is performance-sensitive and this change wasn't necessary.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e5e4d82
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e5e4d82
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e5e4d82

Branch: refs/heads/branch-1.3
Commit: 7e5e4d82be7509edb64c71ca6189add556589613
Parents: 07a401a
Author: Josh Rosen 
Authored: Tue Feb 17 17:45:16 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 17:47:43 2015 -0800

--
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e5e4d82/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9c355d7..8b62d24 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -190,13 +190,15 @@ class DAGScheduler(
   }
 
   private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
-cacheLocs.getOrElseUpdate(rdd.id, {
+// Note: this doesn't use `getOrElse()` because this method is called 
O(num tasks) times
+if (!cacheLocs.contains(rdd.id)) {
   val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  blockIds.map { id =>
+  cacheLocs(rdd.id) = blockIds.map { id =>
 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
   }
-})
+}
+cacheLocs(rdd.id)
   }
 
   private def clearCacheLocs(): Unit = cacheLocs.synchronized {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master d46d6246d -> a51fc7ef9


[SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()

This method is performance-sensitive and this change wasn't necessary.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a51fc7ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a51fc7ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a51fc7ef

Branch: refs/heads/master
Commit: a51fc7ef9adb6a41c4857918217f800858fced2c
Parents: d46d624
Author: Josh Rosen 
Authored: Tue Feb 17 17:45:16 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 17:45:16 2015 -0800

--
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +---
 1 file changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a51fc7ef/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9c355d7..8b62d24 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -190,13 +190,15 @@ class DAGScheduler(
   }
 
   private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
-cacheLocs.getOrElseUpdate(rdd.id, {
+// Note: this doesn't use `getOrElse()` because this method is called 
O(num tasks) times
+if (!cacheLocs.contains(rdd.id)) {
   val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  blockIds.map { id =>
+  cacheLocs(rdd.id) = blockIds.map { id =>
 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
   }
-})
+}
+cacheLocs(rdd.id)
   }
 
   private def clearCacheLocs(): Unit = cacheLocs.synchronized {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cb905841b -> 07a401a7b


[SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

This patch addresses a race condition in DAGScheduler by properly synchronizing 
accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, 
which can be called by separate threads, since DAGScheduler's 
`getPreferredLocs()` method is called by SparkContext and indirectly calls 
`getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing 
thread while a user thread is submitting a job and computing preferred 
locations, then this can cause the user thread to throw 
"NoSuchElementException: key not found" errors.

Most accesses to DAGScheduler's internal state do not need synchronization 
because that state is only accessed from the event processing loop's thread.  
An alternative approach to fixing this bug would be to refactor this code so 
that SparkContext sends the DAGScheduler a message in order to get the list of 
preferred locations.  However, this would involve more extensive changes to 
this code and would be significantly harder to backport to maintenance branches 
since some of the related code has undergone significant refactoring (e.g. the 
introduction of EventLoop).  Since `cacheLocs` is the only state that's 
accessed in this way, adding simple synchronization seems like a better 
short-term fix.

See #3345 for additional context.

Author: Josh Rosen 

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs 
map.

(cherry picked from commit d46d6246d225ff3af09ebae1a09d4de2430c502d)
Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07a401a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07a401a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07a401a7

Branch: refs/heads/branch-1.3
Commit: 07a401a7beea864092ec8f8c451e05cba5a19bbb
Parents: cb90584
Author: Josh Rosen 
Authored: Tue Feb 17 17:39:58 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:40:04 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 34 ++--
 1 file changed, 24 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07a401a7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7903557..9c355d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -98,7 +98,13 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
-  // Contains the locations that each RDD's partitions are cached on
+  /**
+   * Contains the locations that each RDD's partitions are cached on.  This 
map's keys are RDD ids
+   * and its values are arrays indexed by partition numbers. Each array value 
is the set of
+   * locations where that RDD partition is cached.
+   *
+   * All accesses to this map should be guarded by synchronizing on it (see 
SPARK-4454).
+   */
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
   // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
@@ -183,18 +189,17 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
-if (!cacheLocs.contains(rdd.id)) {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
+cacheLocs.getOrElseUpdate(rdd.id, {
   val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  cacheLocs(rdd.id) = blockIds.map { id =>
+  blockIds.map { id =>
 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
   }
-}
-cacheLocs(rdd.id)
+})
   }
 
-  private def clearCacheLocs() {
+  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
 cacheLocs.clear()
   }
 
@@ -1276,17 +1281,26 @@ class DAGScheduler(
   }
 
   /**
-   * Synchronized method that might be called from other threads.
+   * Gets the locality information associated with a partition of a particular 
RDD.
+   *
+   * This method is thread-safe and is called from both DAGScheduler and 
SparkContext.
+   *
* @param rdd whose partitions are to 

spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master ae6cfb3ac -> d46d6246d


[SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

This patch addresses a race condition in DAGScheduler by properly synchronizing 
accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, 
which can be called by separate threads, since DAGScheduler's 
`getPreferredLocs()` method is called by SparkContext and indirectly calls 
`getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing 
thread while a user thread is submitting a job and computing preferred 
locations, then this can cause the user thread to throw 
"NoSuchElementException: key not found" errors.

Most accesses to DAGScheduler's internal state do not need synchronization 
because that state is only accessed from the event processing loop's thread.  
An alternative approach to fixing this bug would be to refactor this code so 
that SparkContext sends the DAGScheduler a message in order to get the list of 
preferred locations.  However, this would involve more extensive changes to 
this code and would be significantly harder to backport to maintenance branches 
since some of the related code has undergone significant refactoring (e.g. the 
introduction of EventLoop).  Since `cacheLocs` is the only state that's 
accessed in this way, adding simple synchronization seems like a better 
short-term fix.

See #3345 for additional context.

Author: Josh Rosen 

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs 
map.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d46d6246
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d46d6246
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d46d6246

Branch: refs/heads/master
Commit: d46d6246d225ff3af09ebae1a09d4de2430c502d
Parents: ae6cfb3
Author: Josh Rosen 
Authored: Tue Feb 17 17:39:58 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:39:58 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 34 ++--
 1 file changed, 24 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d46d6246/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7903557..9c355d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -98,7 +98,13 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
-  // Contains the locations that each RDD's partitions are cached on
+  /**
+   * Contains the locations that each RDD's partitions are cached on.  This 
map's keys are RDD ids
+   * and its values are arrays indexed by partition numbers. Each array value 
is the set of
+   * locations where that RDD partition is cached.
+   *
+   * All accesses to this map should be guarded by synchronizing on it (see 
SPARK-4454).
+   */
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
   // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
@@ -183,18 +189,17 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
-if (!cacheLocs.contains(rdd.id)) {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
+cacheLocs.getOrElseUpdate(rdd.id, {
   val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  cacheLocs(rdd.id) = blockIds.map { id =>
+  blockIds.map { id =>
 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
   }
-}
-cacheLocs(rdd.id)
+})
   }
 
-  private def clearCacheLocs() {
+  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
 cacheLocs.clear()
   }
 
@@ -1276,17 +1281,26 @@ class DAGScheduler(
   }
 
   /**
-   * Synchronized method that might be called from other threads.
+   * Gets the locality information associated with a partition of a particular 
RDD.
+   *
+   * This method is thread-safe and is called from both DAGScheduler and 
SparkContext.
+   *
* @param rdd whose partitions are to be looked at
* @param partition to lookup locality information for
* @return list of machines that are

spark git commit: [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master c3d2b90bd -> ae6cfb3ac


[SPARK-5811] Added documentation for maven coordinates and added Spark Packages 
support

Documentation for maven coordinates + Spark Package support. Added pyspark 
tests for `--packages`

Author: Burak Yavuz 
Author: Davies Liu 

Closes #4662 from brkyvz/SPARK-5811 and squashes the following commits:

56d [Burak Yavuz] fixed broken test
64cb8ee [Burak Yavuz] passed pep8 on local
c07b81e [Burak Yavuz] fixed pep8
a8bd6b7 [Burak Yavuz] submit PR
4ef4046 [Burak Yavuz] ready for PR
8fb02e5 [Burak Yavuz] merged master
25c9b9f [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
python-jar
560d13b [Burak Yavuz] before PR
17d3f76 [Davies Liu] support .jar as python package
a3eb717 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
SPARK-5811
c60156d [Burak Yavuz] [SPARK-5811] Added documentation for maven coordinates


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae6cfb3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae6cfb3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae6cfb3a

Branch: refs/heads/master
Commit: ae6cfb3acdbc2721d25793698a4a440f0519dbec
Parents: c3d2b90
Author: Burak Yavuz 
Authored: Tue Feb 17 17:15:43 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:23:22 2015 -0800

--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 52 ++-
 .../spark/deploy/SparkSubmitUtilsSuite.scala| 13 ++--
 docs/programming-guide.md   | 19 +-
 docs/submitting-applications.md |  5 ++
 python/pyspark/tests.py | 69 ++--
 5 files changed, 131 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ae6cfb3a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 012a89a..4c41108 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -252,6 +252,26 @@ object SparkSubmit {
 
 val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
 
+// Resolve maven dependencies if there are any and add classpath to jars. 
Add them to py-files
+// too for packages that include Python code
+val resolvedMavenCoordinates =
+  SparkSubmitUtils.resolveMavenCoordinates(
+args.packages, Option(args.repositories), Option(args.ivyRepoPath))
+if (!resolvedMavenCoordinates.trim.isEmpty) {
+  if (args.jars == null || args.jars.trim.isEmpty) {
+args.jars = resolvedMavenCoordinates
+  } else {
+args.jars += s",$resolvedMavenCoordinates"
+  }
+  if (args.isPython) {
+if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
+  args.pyFiles = resolvedMavenCoordinates
+} else {
+  args.pyFiles += s",$resolvedMavenCoordinates"
+}
+  }
+}
+
 // Require all python files to be local, so we can add them to the 
PYTHONPATH
 // In YARN cluster mode, python files are distributed as regular files, 
which can be non-local
 if (args.isPython && !isYarnCluster) {
@@ -307,18 +327,6 @@ object SparkSubmit {
 // Special flag to avoid deprecation warnings at the client
 sysProps("SPARK_SUBMIT") = "true"
 
-// Resolve maven dependencies if there are any and add classpath to jars
-val resolvedMavenCoordinates =
-  SparkSubmitUtils.resolveMavenCoordinates(
-args.packages, Option(args.repositories), Option(args.ivyRepoPath))
-if (!resolvedMavenCoordinates.trim.isEmpty) {
-  if (args.jars == null || args.jars.trim.isEmpty) {
-args.jars = resolvedMavenCoordinates
-  } else {
-args.jars += s",$resolvedMavenCoordinates"
-  }
-}
-
 // A list of rules to map each argument to system properties or 
command-line options in
 // each deploy mode; we iterate through these below
 val options = List[OptionAssigner](
@@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils {
   private[spark] case class MavenCoordinate(groupId: String, artifactId: 
String, version: String)
 
 /**
- * Extracts maven coordinates from a comma-delimited string
+ * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. 
The latter provides
+ * simplicity for Spark Package users.
  * @param coordinates Comma-delimited string of maven coordinates
  * @return Sequence of Maven coordinates
  */
   private[

spark git commit: [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 81202350a -> cb905841b


[SPARK-5811] Added documentation for maven coordinates and added Spark Packages 
support

Documentation for maven coordinates + Spark Package support. Added pyspark 
tests for `--packages`

Author: Burak Yavuz 
Author: Davies Liu 

Closes #4662 from brkyvz/SPARK-5811 and squashes the following commits:

56d [Burak Yavuz] fixed broken test
64cb8ee [Burak Yavuz] passed pep8 on local
c07b81e [Burak Yavuz] fixed pep8
a8bd6b7 [Burak Yavuz] submit PR
4ef4046 [Burak Yavuz] ready for PR
8fb02e5 [Burak Yavuz] merged master
25c9b9f [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
python-jar
560d13b [Burak Yavuz] before PR
17d3f76 [Davies Liu] support .jar as python package
a3eb717 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
SPARK-5811
c60156d [Burak Yavuz] [SPARK-5811] Added documentation for maven coordinates

(cherry picked from commit ae6cfb3acdbc2721d25793698a4a440f0519dbec)
Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb905841
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb905841
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb905841

Branch: refs/heads/branch-1.3
Commit: cb905841b2eaa19e28a1327cab0e5d51f805d233
Parents: 8120235
Author: Burak Yavuz 
Authored: Tue Feb 17 17:15:43 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 17:23:30 2015 -0800

--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 52 ++-
 .../spark/deploy/SparkSubmitUtilsSuite.scala| 13 ++--
 docs/programming-guide.md   | 19 +-
 docs/submitting-applications.md |  5 ++
 python/pyspark/tests.py | 69 ++--
 5 files changed, 131 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb905841/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 012a89a..4c41108 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -252,6 +252,26 @@ object SparkSubmit {
 
 val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
 
+// Resolve maven dependencies if there are any and add classpath to jars. 
Add them to py-files
+// too for packages that include Python code
+val resolvedMavenCoordinates =
+  SparkSubmitUtils.resolveMavenCoordinates(
+args.packages, Option(args.repositories), Option(args.ivyRepoPath))
+if (!resolvedMavenCoordinates.trim.isEmpty) {
+  if (args.jars == null || args.jars.trim.isEmpty) {
+args.jars = resolvedMavenCoordinates
+  } else {
+args.jars += s",$resolvedMavenCoordinates"
+  }
+  if (args.isPython) {
+if (args.pyFiles == null || args.pyFiles.trim.isEmpty) {
+  args.pyFiles = resolvedMavenCoordinates
+} else {
+  args.pyFiles += s",$resolvedMavenCoordinates"
+}
+  }
+}
+
 // Require all python files to be local, so we can add them to the 
PYTHONPATH
 // In YARN cluster mode, python files are distributed as regular files, 
which can be non-local
 if (args.isPython && !isYarnCluster) {
@@ -307,18 +327,6 @@ object SparkSubmit {
 // Special flag to avoid deprecation warnings at the client
 sysProps("SPARK_SUBMIT") = "true"
 
-// Resolve maven dependencies if there are any and add classpath to jars
-val resolvedMavenCoordinates =
-  SparkSubmitUtils.resolveMavenCoordinates(
-args.packages, Option(args.repositories), Option(args.ivyRepoPath))
-if (!resolvedMavenCoordinates.trim.isEmpty) {
-  if (args.jars == null || args.jars.trim.isEmpty) {
-args.jars = resolvedMavenCoordinates
-  } else {
-args.jars += s",$resolvedMavenCoordinates"
-  }
-}
-
 // A list of rules to map each argument to system properties or 
command-line options in
 // each deploy mode; we iterate through these below
 val options = List[OptionAssigner](
@@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils {
   private[spark] case class MavenCoordinate(groupId: String, artifactId: 
String, version: String)
 
 /**
- * Extracts maven coordinates from a comma-delimited string
+ * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. 
The latter provides
+ * simplicity for Spark Package users.
  * @param coor

svn commit: r1660555 - in /spark: community.md site/community.html

2015-02-17 Thread matei
Author: matei
Date: Wed Feb 18 01:08:19 2015
New Revision: 1660555

URL: http://svn.apache.org/r1660555
Log:
Berlin meetup and cleaning up meetup page

Modified:
spark/community.md
spark/site/community.html

Modified: spark/community.md
URL: 
http://svn.apache.org/viewvc/spark/community.md?rev=1660555&r1=1660554&r2=1660555&view=diff
==
--- spark/community.md (original)
+++ spark/community.md Wed Feb 18 01:08:19 2015
@@ -52,21 +52,20 @@ Spark Meetups are grass-roots events org
 
   
 http://www.meetup.com/spark-users/";>Bay Area Spark Meetup.
-This group has been running monthly events since January 2012 in the San 
Francisco area.
-The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past 
meetups, including videos and http://www.meetup.com/spark-users/files/";>slides on most of the 
recent pages.
-You can also find links to videos from past meetups on the Documentation Page.
+This group has been running since January 2012 in the San Francisco area.
+The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past 
meetups, including videos and http://www.meetup.com/spark-users/files/";>slides for most of the 
recent talks.
   
   
-http://www.meetup.com/Spark-London/";>London Spark Meetup
+http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup
   
   
-http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark 
Meetup
+http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark 
Meetup
   
   
-http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup
+http://www.meetup.com/Berlin-Apache-Spark-Meetup/";>Berlin Spark 
Meetup
   
   
-http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark 
Meetup
+http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark 
Meetup
   
   
 http://www.meetup.com/Boston-Apache-Spark-User-Group/";>Boston 
Spark Meetup
@@ -84,6 +83,9 @@ Spark Meetups are grass-roots events org
 http://www.meetup.com/Spark-User-Group-Hyderabad/";>Hyderabad 
Spark Meetup
   
   
+http://www.meetup.com/Spark-London/";>London Spark Meetup
+  
+  
 http://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark 
Meetup
   
   

Modified: spark/site/community.html
URL: 
http://svn.apache.org/viewvc/spark/site/community.html?rev=1660555&r1=1660554&r2=1660555&view=diff
==
--- spark/site/community.html (original)
+++ spark/site/community.html Wed Feb 18 01:08:19 2015
@@ -212,21 +212,20 @@
 
   
 http://www.meetup.com/spark-users/";>Bay Area Spark Meetup.
-This group has been running monthly events since January 2012 in the San 
Francisco area.
-The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past 
meetups, including videos and http://www.meetup.com/spark-users/files/";>slides on most of the 
recent pages.
-You can also find links to videos from past meetups on the Documentation Page.
+This group has been running since January 2012 in the San Francisco area.
+The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past 
meetups, including videos and http://www.meetup.com/spark-users/files/";>slides for most of the 
recent talks.
   
   
-http://www.meetup.com/Spark-London/";>London Spark Meetup
+http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup
   
   
-http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark 
Meetup
+http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark 
Meetup
   
   
-http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup
+http://www.meetup.com/Berlin-Apache-Spark-Meetup/";>Berlin Spark 
Meetup
   
   
-http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark 
Meetup
+http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark 
Meetup
   
   
 http://www.meetup.com/Boston-Apache-Spark-User-Group/";>Boston 
Spark Meetup
@@ -244,6 +243,9 @@
 http://www.meetup.com/Spark-User-Group-Hyderabad/";>Hyderabad 
Spark Meetup
   
   
+http://www.meetup.com/Spark-London/";>London Spark Meetup
+  
+  
 http://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark 
Meetup
   
   



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r1660554 - /spark/news/_posts/2015-02-09-spark-1-2-1-released.md

2015-02-17 Thread pwendell
Author: pwendell
Date: Wed Feb 18 01:07:50 2015
New Revision: 1660554

URL: http://svn.apache.org/r1660554
Log:
Adding missing news file for Spark 1.2.1 release

Added:
spark/news/_posts/2015-02-09-spark-1-2-1-released.md

Added: spark/news/_posts/2015-02-09-spark-1-2-1-released.md
URL: 
http://svn.apache.org/viewvc/spark/news/_posts/2015-02-09-spark-1-2-1-released.md?rev=1660554&view=auto
==
--- spark/news/_posts/2015-02-09-spark-1-2-1-released.md (added)
+++ spark/news/_posts/2015-02-09-spark-1-2-1-released.md Wed Feb 18 01:07:50 
2015
@@ -0,0 +1,16 @@
+---
+layout: post
+title: Spark 1.2.1 released
+categories:
+- News
+tags: []
+status: publish
+type: post
+published: true
+meta:
+  _edit_last: '4'
+  _wpas_done_all: '1'
+---
+We are happy to announce the availability of Spark 1.2.1! This is a maintenance release that includes 
contributions from 69 developers. Spark 1.2.1 includes fixes across several 
areas of Spark, including the core API, Streaming, PySpark, SQL, GraphX, and 
MLlib.
+
+Visit the release notes to read about this release or download the release today.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 07d8ef9e7 -> 81202350a


[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark

Currently, PySpark does not support narrow dependency during cogroup/join when 
the two RDDs have the partitioner, another unnecessary shuffle stage will come 
in.

The Python implementation of cogroup/join is different than Scala one, it 
depends on union() and partitionBy(). This patch will try to use 
PartitionerAwareUnionRDD() in union(), when all the RDDs have the same 
partitioner. It also fix `reservePartitioner` in all the map() or 
mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

Author: Davies Liu 

Closes #4629 from davies/narrow and squashes the following commits:

dffe34e [Davies Liu] improve test, check number of stages for join/cogroup
1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
narrow
4d29932 [Davies Liu] address comment
cc28d97 [Davies Liu] add unit tests
940245e [Davies Liu] address comments
ff5a0a6 [Davies Liu] skip the partitionBy() on Python side
eb26c62 [Davies Liu] narrow dependency in PySpark

(cherry picked from commit c3d2b90bde2e11823909605d518167548df66bd8)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81202350
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81202350
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81202350

Branch: refs/heads/branch-1.3
Commit: 81202350a08c50685676300218270929c76f648a
Parents: 07d8ef9
Author: Davies Liu 
Authored: Tue Feb 17 16:54:57 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 16:55:20 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   | 11 -
 .../org/apache/spark/api/python/PythonRDD.scala | 10 
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  8 +++-
 python/pyspark/join.py  |  8 ++--
 python/pyspark/rdd.py   | 49 +---
 python/pyspark/streaming/dstream.py |  2 +-
 python/pyspark/tests.py | 38 ++-
 7 files changed, 101 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81202350/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fd8fac6..d59b466 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /** Build the union of a list of RDDs. */
-  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
+val partitioners = rdds.flatMap(_.partitioner).toSet
+if (partitioners.size == 1) {
+  new PartitionerAwareUnionRDD(this, rdds)
+} else {
+  new UnionRDD(this, rdds)
+}
+  }
 
   /** Build the union of a list of RDDs passed as variable-length arguments. */
   def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
-new UnionRDD(this, Seq(first) ++ rest)
+union(Seq(first) ++ rest)
 
   /** Get an RDD that has no partitions or elements. */
   def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

http://git-wip-us.apache.org/repos/asf/spark/blob/81202350/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2527211..dcb6e63 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -303,6 +303,7 @@ private class PythonException(msg: String, cause: 
Exception) extends RuntimeExce
 private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   RDD[(Long, Array[Byte])](prev) {
   override def getPartitions = prev.partitions
+  override val partitioner = prev.partitioner
   override def compute(split: Partition, context: TaskContext) =
 prev.iterator(split, context).grouped(2).map {
   case Seq(a, b) => (Utils.deserializeLongValue(a), b)
@@ -330,6 +331,15 @@ private[spark] object PythonRDD extends Logging {
   }
 
   /**
+   * Return an RDD of values from an RDD of (Long, Array[Byte]), with 
preservePartitions=true
+   *
+   * This is useful for PySpark to have the partitioner after partitionBy()
+   */
+  def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] 
= {
+pa

spark git commit: [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 117121a4e -> c3d2b90bd


[SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark

Currently, PySpark does not support narrow dependency during cogroup/join when 
the two RDDs have the partitioner, another unnecessary shuffle stage will come 
in.

The Python implementation of cogroup/join is different than Scala one, it 
depends on union() and partitionBy(). This patch will try to use 
PartitionerAwareUnionRDD() in union(), when all the RDDs have the same 
partitioner. It also fix `reservePartitioner` in all the map() or 
mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage.

Author: Davies Liu 

Closes #4629 from davies/narrow and squashes the following commits:

dffe34e [Davies Liu] improve test, check number of stages for join/cogroup
1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
narrow
4d29932 [Davies Liu] address comment
cc28d97 [Davies Liu] add unit tests
940245e [Davies Liu] address comments
ff5a0a6 [Davies Liu] skip the partitionBy() on Python side
eb26c62 [Davies Liu] narrow dependency in PySpark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3d2b90b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3d2b90b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3d2b90b

Branch: refs/heads/master
Commit: c3d2b90bde2e11823909605d518167548df66bd8
Parents: 117121a
Author: Davies Liu 
Authored: Tue Feb 17 16:54:57 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 16:54:57 2015 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   | 11 -
 .../org/apache/spark/api/python/PythonRDD.scala | 10 
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  8 +++-
 python/pyspark/join.py  |  8 ++--
 python/pyspark/rdd.py   | 49 +---
 python/pyspark/streaming/dstream.py |  2 +-
 python/pyspark/tests.py | 38 ++-
 7 files changed, 101 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c3d2b90b/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fd8fac6..d59b466 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   }
 
   /** Build the union of a list of RDDs. */
-  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
+  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
+val partitioners = rdds.flatMap(_.partitioner).toSet
+if (partitioners.size == 1) {
+  new PartitionerAwareUnionRDD(this, rdds)
+} else {
+  new UnionRDD(this, rdds)
+}
+  }
 
   /** Build the union of a list of RDDs passed as variable-length arguments. */
   def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
-new UnionRDD(this, Seq(first) ++ rest)
+union(Seq(first) ++ rest)
 
   /** Get an RDD that has no partitions or elements. */
   def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d2b90b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2527211..dcb6e63 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -303,6 +303,7 @@ private class PythonException(msg: String, cause: 
Exception) extends RuntimeExce
 private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   RDD[(Long, Array[Byte])](prev) {
   override def getPartitions = prev.partitions
+  override val partitioner = prev.partitioner
   override def compute(split: Partition, context: TaskContext) =
 prev.iterator(split, context).grouped(2).map {
   case Seq(a, b) => (Utils.deserializeLongValue(a), b)
@@ -330,6 +331,15 @@ private[spark] object PythonRDD extends Logging {
   }
 
   /**
+   * Return an RDD of values from an RDD of (Long, Array[Byte]), with 
preservePartitions=true
+   *
+   * This is useful for PySpark to have the partitioner after partitionBy()
+   */
+  def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] 
= {
+pair.rdd.mapPartitions(it => it.map(_._2), true)
+  }
+
+  /**
* Adapter for calling SparkContext#runJo

spark git commit: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0dba382ee -> 07d8ef9e7


[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table 
to a data source parquet table.

The problem is that after we create an empty hive metastore parquet table (e.g. 
`CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir 
for us, which cause our data source `ParquetRelation2` fail to get the schema 
of the table. See JIRA for the case to reproduce the bug and the exception.

This PR is based on #4562 from chenghao-intel.

JIRA: https://issues.apache.org/jira/browse/SPARK-5852

Author: Yin Huai 
Author: Cheng Hao 

Closes #4655 from yhuai/CTASParquet and squashes the following commits:

b8b3450 [Yin Huai] Update tests.
2ac94f7 [Yin Huai] Update tests.
3db3d20 [Yin Huai] Minor update.
d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala.
36978d1 [Cheng Hao] Update the code as feedback
a04930b [Cheng Hao] fix bug of scan an empty parquet based table
442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext

(cherry picked from commit 117121a4ecaadda156a82255333670775e7727db)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07d8ef9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07d8ef9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07d8ef9e

Branch: refs/heads/branch-1.3
Commit: 07d8ef9e730251b66f962fa7acdaf7d7eacc62a1
Parents: 0dba382
Author: Yin Huai 
Authored: Tue Feb 17 15:47:59 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 15:48:23 2015 -0800

--
 .../apache/spark/sql/parquet/newParquet.scala   |  18 ++-
 .../sql/hive/MetastoreDataSourcesSuite.scala|  38 +++
 .../spark/sql/parquet/parquetSuites.scala   | 114 ++-
 3 files changed, 164 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07d8ef9e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 95bea92..16b7713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
 }
   }
 
-  parquetSchema = maybeSchema.getOrElse(readSchema())
+  // To get the schema. We first try to get the schema defined in 
maybeSchema.
+  // If maybeSchema is not defined, we will try to get the schema from 
existing parquet data
+  // (through readSchema). If data does not exist, we will try to get the 
schema defined in
+  // maybeMetastoreSchema (defined in the options of the data source).
+  // Finally, if we still could not get the schema. We throw an error.
+  parquetSchema =
+maybeSchema
+  .orElse(readSchema())
+  .orElse(maybeMetastoreSchema)
+  .getOrElse(sys.error("Failed to get the schema."))
 
   partitionKeysIncludedInParquetSchema =
 isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
   }
 }
 
-private def readSchema(): StructType = {
+private def readSchema(): Option[StructType] = {
   // Sees which file(s) we need to touch in order to figure out the schema.
   val filesToTouch =
   // Always tries the summary files first if users don't require a merged 
schema.  In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
-  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: 
SQLContext): StructType = {
+  private[parquet] def readSchema(
+  footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
 footers.map { footer =>
   val metadata = footer.getParquetMetadata.getFileMetaData
   val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
 sqlContext.conf.isParquetBinaryAsString,
 sqlContext.conf.isParquetINT96AsTimestamp))
   }
-}.reduce { (left, right) =>
+}.reduceOption { (left, right) =>
   try left.merge(right) catch { case e: Throwable =>
 throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/07d8ef9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive

spark git commit: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 4d4cc760f -> 117121a4e


[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table 
to a data source parquet table.

The problem is that after we create an empty hive metastore parquet table (e.g. 
`CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir 
for us, which cause our data source `ParquetRelation2` fail to get the schema 
of the table. See JIRA for the case to reproduce the bug and the exception.

This PR is based on #4562 from chenghao-intel.

JIRA: https://issues.apache.org/jira/browse/SPARK-5852

Author: Yin Huai 
Author: Cheng Hao 

Closes #4655 from yhuai/CTASParquet and squashes the following commits:

b8b3450 [Yin Huai] Update tests.
2ac94f7 [Yin Huai] Update tests.
3db3d20 [Yin Huai] Minor update.
d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala.
36978d1 [Cheng Hao] Update the code as feedback
a04930b [Cheng Hao] fix bug of scan an empty parquet based table
442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/117121a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/117121a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/117121a4

Branch: refs/heads/master
Commit: 117121a4ecaadda156a82255333670775e7727db
Parents: 4d4cc76
Author: Yin Huai 
Authored: Tue Feb 17 15:47:59 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 15:47:59 2015 -0800

--
 .../apache/spark/sql/parquet/newParquet.scala   |  18 ++-
 .../sql/hive/MetastoreDataSourcesSuite.scala|  38 +++
 .../spark/sql/parquet/parquetSuites.scala   | 114 ++-
 3 files changed, 164 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 95bea92..16b7713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
 }
   }
 
-  parquetSchema = maybeSchema.getOrElse(readSchema())
+  // To get the schema. We first try to get the schema defined in 
maybeSchema.
+  // If maybeSchema is not defined, we will try to get the schema from 
existing parquet data
+  // (through readSchema). If data does not exist, we will try to get the 
schema defined in
+  // maybeMetastoreSchema (defined in the options of the data source).
+  // Finally, if we still could not get the schema. We throw an error.
+  parquetSchema =
+maybeSchema
+  .orElse(readSchema())
+  .orElse(maybeMetastoreSchema)
+  .getOrElse(sys.error("Failed to get the schema."))
 
   partitionKeysIncludedInParquetSchema =
 isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
   }
 }
 
-private def readSchema(): StructType = {
+private def readSchema(): Option[StructType] = {
   // Sees which file(s) we need to touch in order to figure out the schema.
   val filesToTouch =
   // Always tries the summary files first if users don't require a merged 
schema.  In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
   // internally.
   private[sql] val METASTORE_SCHEMA = "metastoreSchema"
 
-  private[parquet] def readSchema(footers: Seq[Footer], sqlContext: 
SQLContext): StructType = {
+  private[parquet] def readSchema(
+  footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
 footers.map { footer =>
   val metadata = footer.getParquetMetadata.getFileMetaData
   val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
 sqlContext.conf.isParquetBinaryAsString,
 sqlContext.conf.isParquetINT96AsTimestamp))
   }
-}.reduce { (left, right) =>
+}.reduceOption { (left, right) =>
   try left.merge(right) catch { case e: Throwable =>
 throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuit

spark git commit: [SPARK-5872] [SQL] create a sqlCtx in pyspark shell

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cb061603c -> 0dba382ee


[SPARK-5872] [SQL] create a sqlCtx in pyspark shell

The sqlCtx will be HiveContext if hive is built in assembly jar, or SQLContext 
if not.

It also skip the Hive tests in pyspark.sql.tests if no hive is available.

Author: Davies Liu 

Closes #4659 from davies/sqlctx and squashes the following commits:

0e6629a [Davies Liu] sqlCtx in pyspark

(cherry picked from commit 4d4cc760fa9687ce563320094557ef9144488676)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dba382e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dba382e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dba382e

Branch: refs/heads/branch-1.3
Commit: 0dba382ee65694969704384c4968e3a656b3c833
Parents: cb06160
Author: Davies Liu 
Authored: Tue Feb 17 15:44:37 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 15:44:45 2015 -0800

--
 python/pyspark/shell.py | 13 -
 python/pyspark/sql/tests.py | 12 ++--
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0dba382e/python/pyspark/shell.py
--
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 4cf4b89..1a02fec 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -31,8 +31,12 @@ if sys.version_info[0] != 2:
 import atexit
 import os
 import platform
+
+import py4j
+
 import pyspark
 from pyspark.context import SparkContext
+from pyspark.sql import SQLContext, HiveContext
 from pyspark.storagelevel import StorageLevel
 
 # this is the deprecated equivalent of ADD_JARS
@@ -46,6 +50,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"):
 sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
 atexit.register(lambda: sc.stop())
 
+try:
+# Try to access HiveConf, it will raise exception if Hive is not added
+sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
+sqlCtx = HiveContext(sc)
+except py4j.protocol.Py4JError:
+sqlCtx = SQLContext(sc)
+
 print("""Welcome to
     __
  / __/__  ___ _/ /__
@@ -57,7 +68,7 @@ print("Using Python version %s (%s, %s)" % (
 platform.python_version(),
 platform.python_build()[0],
 platform.python_build()[1]))
-print("SparkContext available as sc.")
+print("SparkContext available as sc, %s available as sqlCtx." % 
sqlCtx.__class__.__name__)
 
 if add_files is not None:
 print("Warning: ADD_FILES environment variable is deprecated, use 
--py-files argument instead")

http://git-wip-us.apache.org/repos/asf/spark/blob/0dba382e/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index aa80bca..52f7e65 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -25,6 +25,8 @@ import pydoc
 import shutil
 import tempfile
 
+import py4j
+
 if sys.version_info[:2] <= (2, 6):
 try:
 import unittest2 as unittest
@@ -329,9 +331,12 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 def setUpClass(cls):
 ReusedPySparkTestCase.setUpClass()
 cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
+try:
+cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
+except py4j.protocol.Py4JError:
+cls.sqlCtx = None
+return
 os.unlink(cls.tempdir.name)
-print "type", type(cls.sc)
-print "type", type(cls.sc._jsc)
 _scala_HiveContext =\
 
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
 cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
@@ -344,6 +349,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 shutil.rmtree(cls.tempdir.name, ignore_errors=True)
 
 def test_save_and_load_table(self):
+if self.sqlCtx is None:
+return  # no hive available, skipped
+
 df = self.df
 tmpPath = tempfile.mkdtemp()
 shutil.rmtree(tmpPath)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5872] [SQL] create a sqlCtx in pyspark shell

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 3df85dccb -> 4d4cc760f


[SPARK-5872] [SQL] create a sqlCtx in pyspark shell

The sqlCtx will be HiveContext if hive is built in assembly jar, or SQLContext 
if not.

It also skip the Hive tests in pyspark.sql.tests if no hive is available.

Author: Davies Liu 

Closes #4659 from davies/sqlctx and squashes the following commits:

0e6629a [Davies Liu] sqlCtx in pyspark


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d4cc760
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d4cc760
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d4cc760

Branch: refs/heads/master
Commit: 4d4cc760fa9687ce563320094557ef9144488676
Parents: 3df85dc
Author: Davies Liu 
Authored: Tue Feb 17 15:44:37 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 15:44:37 2015 -0800

--
 python/pyspark/shell.py | 13 -
 python/pyspark/sql/tests.py | 12 ++--
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d4cc760/python/pyspark/shell.py
--
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 4cf4b89..1a02fec 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -31,8 +31,12 @@ if sys.version_info[0] != 2:
 import atexit
 import os
 import platform
+
+import py4j
+
 import pyspark
 from pyspark.context import SparkContext
+from pyspark.sql import SQLContext, HiveContext
 from pyspark.storagelevel import StorageLevel
 
 # this is the deprecated equivalent of ADD_JARS
@@ -46,6 +50,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"):
 sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
 atexit.register(lambda: sc.stop())
 
+try:
+# Try to access HiveConf, it will raise exception if Hive is not added
+sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
+sqlCtx = HiveContext(sc)
+except py4j.protocol.Py4JError:
+sqlCtx = SQLContext(sc)
+
 print("""Welcome to
     __
  / __/__  ___ _/ /__
@@ -57,7 +68,7 @@ print("Using Python version %s (%s, %s)" % (
 platform.python_version(),
 platform.python_build()[0],
 platform.python_build()[1]))
-print("SparkContext available as sc.")
+print("SparkContext available as sc, %s available as sqlCtx." % 
sqlCtx.__class__.__name__)
 
 if add_files is not None:
 print("Warning: ADD_FILES environment variable is deprecated, use 
--py-files argument instead")

http://git-wip-us.apache.org/repos/asf/spark/blob/4d4cc760/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index aa80bca..52f7e65 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -25,6 +25,8 @@ import pydoc
 import shutil
 import tempfile
 
+import py4j
+
 if sys.version_info[:2] <= (2, 6):
 try:
 import unittest2 as unittest
@@ -329,9 +331,12 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 def setUpClass(cls):
 ReusedPySparkTestCase.setUpClass()
 cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
+try:
+cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
+except py4j.protocol.Py4JError:
+cls.sqlCtx = None
+return
 os.unlink(cls.tempdir.name)
-print "type", type(cls.sc)
-print "type", type(cls.sc._jsc)
 _scala_HiveContext =\
 
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
 cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
@@ -344,6 +349,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 shutil.rmtree(cls.tempdir.name, ignore_errors=True)
 
 def test_save_and_load_table(self):
+if self.sqlCtx is None:
+return  # no hive available, skipped
+
 df = self.df
 tmpPath = tempfile.mkdtemp()
 shutil.rmtree(tmpPath)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5871] output explain in Python

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 35e23ff14 -> cb061603c


[SPARK-5871] output explain in Python

Author: Davies Liu 

Closes #4658 from davies/explain and squashes the following commits:

db87ea2 [Davies Liu] output explain in Python

(cherry picked from commit 3df85dccbc8fd1ba19bbcdb8d359c073b1494d98)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb061603
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb061603
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb061603

Branch: refs/heads/branch-1.3
Commit: cb061603c3ca4cd5162a36fc32de15779614e854
Parents: 35e23ff
Author: Davies Liu 
Authored: Tue Feb 17 13:48:38 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 13:51:00 2015 -0800

--
 python/pyspark/sql/dataframe.py | 23 ---
 1 file changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb061603/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 8417240..388033d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -244,8 +244,25 @@ class DataFrame(object):
 debugging purpose.
 
 If extended is False, only prints the physical plan.
-"""
-self._jdf.explain(extended)
+
+>>> df.explain()
+PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at 
SQLContext.scala:...
+
+>>> df.explain(True)
+== Parsed Logical Plan ==
+...
+== Analyzed Logical Plan ==
+...
+== Optimized Logical Plan ==
+...
+== Physical Plan ==
+...
+== RDD ==
+"""
+if extended:
+print self._jdf.queryExecution().toString()
+else:
+print self._jdf.queryExecution().executedPlan().toString()
 
 def isLocal(self):
 """
@@ -1034,7 +1051,7 @@ def _test():
   Row(name='Bob', age=5, height=85)]).toDF()
 (failure_count, test_count) = doctest.testmod(
 pyspark.sql.dataframe, globs=globs,
-optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | 
doctest.REPORT_NDIFF)
 globs['sc'].stop()
 if failure_count:
 exit(-1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5871] output explain in Python

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 445a755b8 -> 3df85dccb


[SPARK-5871] output explain in Python

Author: Davies Liu 

Closes #4658 from davies/explain and squashes the following commits:

db87ea2 [Davies Liu] output explain in Python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3df85dcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3df85dcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3df85dcc

Branch: refs/heads/master
Commit: 3df85dccbc8fd1ba19bbcdb8d359c073b1494d98
Parents: 445a755
Author: Davies Liu 
Authored: Tue Feb 17 13:48:38 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 13:48:38 2015 -0800

--
 python/pyspark/sql/dataframe.py | 23 ---
 1 file changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3df85dcc/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 8417240..388033d 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -244,8 +244,25 @@ class DataFrame(object):
 debugging purpose.
 
 If extended is False, only prints the physical plan.
-"""
-self._jdf.explain(extended)
+
+>>> df.explain()
+PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at 
SQLContext.scala:...
+
+>>> df.explain(True)
+== Parsed Logical Plan ==
+...
+== Analyzed Logical Plan ==
+...
+== Optimized Logical Plan ==
+...
+== Physical Plan ==
+...
+== RDD ==
+"""
+if extended:
+print self._jdf.queryExecution().toString()
+else:
+print self._jdf.queryExecution().executedPlan().toString()
 
 def isLocal(self):
 """
@@ -1034,7 +1051,7 @@ def _test():
   Row(name='Bob', age=5, height=85)]).toDF()
 (failure_count, test_count) = doctest.testmod(
 pyspark.sql.dataframe, globs=globs,
-optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | 
doctest.REPORT_NDIFF)
 globs['sc'].stop()
 if failure_count:
 exit(-1)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-4172] [PySpark] Progress API in Python

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master de4836f8f -> 445a755b8


[SPARK-4172] [PySpark] Progress API in Python

This patch bring the pull based progress API into Python, also a example in 
Python.

Author: Davies Liu 

Closes #3027 from davies/progress_api and squashes the following commits:

b1ba984 [Davies Liu] fix style
d3b9253 [Davies Liu] add tests, mute the exception after stop
4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
25590c9 [Davies Liu] update with Java API
360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
023afb3 [Davies Liu] add Python API and example for progress API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/445a755b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/445a755b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/445a755b

Branch: refs/heads/master
Commit: 445a755b884885b88c1778fd56a3151045b0b0ed
Parents: de4836f
Author: Davies Liu 
Authored: Tue Feb 17 13:36:43 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 13:36:43 2015 -0800

--
 .../spark/scheduler/TaskResultGetter.scala  | 40 
 examples/src/main/python/status_api_demo.py | 67 ++
 python/pyspark/__init__.py  | 15 +--
 python/pyspark/context.py   |  7 ++
 python/pyspark/status.py| 96 
 python/pyspark/tests.py | 31 +++
 6 files changed, 232 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/445a755b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 774f3d8..3938580 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
+import java.util.concurrent.RejectedExecutionException
 
 import scala.language.existentials
 import scala.util.control.NonFatal
@@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
   def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
TaskState,
 serializedData: ByteBuffer) {
 var reason : TaskEndReason = UnknownReason
-getTaskResultExecutor.execute(new Runnable {
-  override def run(): Unit = Utils.logUncaughtExceptions {
-try {
-  if (serializedData != null && serializedData.limit() > 0) {
-reason = serializer.get().deserialize[TaskEndReason](
-  serializedData, Utils.getSparkClassLoader)
+try {
+  getTaskResultExecutor.execute(new Runnable {
+override def run(): Unit = Utils.logUncaughtExceptions {
+  try {
+if (serializedData != null && serializedData.limit() > 0) {
+  reason = serializer.get().deserialize[TaskEndReason](
+serializedData, Utils.getSparkClassLoader)
+}
+  } catch {
+case cnd: ClassNotFoundException =>
+  // Log an error but keep going here -- the task failed, so not 
catastrophic
+  // if we can't deserialize the reason.
+  val loader = Utils.getContextOrSparkClassLoader
+  logError(
+"Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
+case ex: Exception => {}
   }
-} catch {
-  case cnd: ClassNotFoundException =>
-// Log an error but keep going here -- the task failed, so not 
catastrophic if we can't
-// deserialize the reason.
-val loader = Utils.getContextOrSparkClassLoader
-logError(
-  "Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
-  case ex: Exception => {}
+  scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
 }
-scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
-  }
-})
+  })
+} catch {
+  case e: RejectedExecutionException if sparkEnv.isStopped =>
+// ignore it
+}
   }
 
   def stop() {

http://git-wip-us.apache.org/repos/asf/spark/blob/445a755b/examples/src/main/python/status_api_demo.py
--

spark git commit: [SPARK-4172] [PySpark] Progress API in Python

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e65dc1fd5 -> 35e23ff14


[SPARK-4172] [PySpark] Progress API in Python

This patch bring the pull based progress API into Python, also a example in 
Python.

Author: Davies Liu 

Closes #3027 from davies/progress_api and squashes the following commits:

b1ba984 [Davies Liu] fix style
d3b9253 [Davies Liu] add tests, mute the exception after stop
4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
25590c9 [Davies Liu] update with Java API
360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
progress_api
023afb3 [Davies Liu] add Python API and example for progress API

(cherry picked from commit 445a755b884885b88c1778fd56a3151045b0b0ed)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35e23ff1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35e23ff1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35e23ff1

Branch: refs/heads/branch-1.3
Commit: 35e23ff140cd65a4121e769ee0a4e22a3490be37
Parents: e65dc1f
Author: Davies Liu 
Authored: Tue Feb 17 13:36:43 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 13:36:54 2015 -0800

--
 .../spark/scheduler/TaskResultGetter.scala  | 40 
 examples/src/main/python/status_api_demo.py | 67 ++
 python/pyspark/__init__.py  | 15 +--
 python/pyspark/context.py   |  7 ++
 python/pyspark/status.py| 96 
 python/pyspark/tests.py | 31 +++
 6 files changed, 232 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/35e23ff1/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 774f3d8..3938580 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
+import java.util.concurrent.RejectedExecutionException
 
 import scala.language.existentials
 import scala.util.control.NonFatal
@@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedul
   def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: 
TaskState,
 serializedData: ByteBuffer) {
 var reason : TaskEndReason = UnknownReason
-getTaskResultExecutor.execute(new Runnable {
-  override def run(): Unit = Utils.logUncaughtExceptions {
-try {
-  if (serializedData != null && serializedData.limit() > 0) {
-reason = serializer.get().deserialize[TaskEndReason](
-  serializedData, Utils.getSparkClassLoader)
+try {
+  getTaskResultExecutor.execute(new Runnable {
+override def run(): Unit = Utils.logUncaughtExceptions {
+  try {
+if (serializedData != null && serializedData.limit() > 0) {
+  reason = serializer.get().deserialize[TaskEndReason](
+serializedData, Utils.getSparkClassLoader)
+}
+  } catch {
+case cnd: ClassNotFoundException =>
+  // Log an error but keep going here -- the task failed, so not 
catastrophic
+  // if we can't deserialize the reason.
+  val loader = Utils.getContextOrSparkClassLoader
+  logError(
+"Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
+case ex: Exception => {}
   }
-} catch {
-  case cnd: ClassNotFoundException =>
-// Log an error but keep going here -- the task failed, so not 
catastrophic if we can't
-// deserialize the reason.
-val loader = Utils.getContextOrSparkClassLoader
-logError(
-  "Could not deserialize TaskEndReason: ClassNotFound with 
classloader " + loader)
-  case ex: Exception => {}
+  scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
 }
-scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
-  }
-})
+  })
+} catch {
+  case e: RejectedExecutionException if sparkEnv.isStopped =>
+// ignore it
+}
   }
 
   def stop() {

http://git-wip-us.apache.org/repos/asf/spark

spark git commit: [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 9d281fa56 -> de4836f8f


[SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext

Author: Michael Armbrust 

Closes #4657 from marmbrus/pythonUdfs and squashes the following commits:

a7823a8 [Michael Armbrust] [SPARK-5868][SQL] Fix python UDFs in HiveContext and 
checks in SQLContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de4836f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de4836f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de4836f8

Branch: refs/heads/master
Commit: de4836f8f12c36c1b350cef288a75b5e59155735
Parents: 9d281fa
Author: Michael Armbrust 
Authored: Tue Feb 17 13:23:45 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 13:23:45 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++-
 .../main/scala/org/apache/spark/sql/execution/pythonUdfs.scala| 3 +++
 2 files changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de4836f8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 31afa0e..709b350 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -113,6 +113,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] lazy val analyzer: Analyzer =
 new Analyzer(catalog, functionRegistry, caseSensitive = true) {
   override val extendedResolutionRules =
+ExtractPythonUdfs ::
 sources.PreWriteCheck(catalog) ::
 sources.PreInsertCastAndRename ::
 Nil
@@ -1059,7 +1060,7 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
   @DeveloperApi
   protected[sql] class QueryExecution(val logical: LogicalPlan) {
 
-lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical))
+lazy val analyzed: LogicalPlan = analyzer(logical)
 lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
 lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de4836f8/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3a2f8d7..69de4d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -205,6 +205,9 @@ case class EvaluatePython(
   extends logical.UnaryNode {
 
   def output = child.output :+ resultAttribute
+
+  // References should not include the produced attribute.
+  override def references = udf.references
 }
 
 /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 01356514e -> e65dc1fd5


[SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext

Author: Michael Armbrust 

Closes #4657 from marmbrus/pythonUdfs and squashes the following commits:

a7823a8 [Michael Armbrust] [SPARK-5868][SQL] Fix python UDFs in HiveContext and 
checks in SQLContext

(cherry picked from commit de4836f8f12c36c1b350cef288a75b5e59155735)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e65dc1fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e65dc1fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e65dc1fd

Branch: refs/heads/branch-1.3
Commit: e65dc1fd58e4f9445247c9fd4e94b34550e992fb
Parents: 0135651
Author: Michael Armbrust 
Authored: Tue Feb 17 13:23:45 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 13:23:56 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++-
 .../main/scala/org/apache/spark/sql/execution/pythonUdfs.scala| 3 +++
 2 files changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e65dc1fd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 31afa0e..709b350 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -113,6 +113,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] lazy val analyzer: Analyzer =
 new Analyzer(catalog, functionRegistry, caseSensitive = true) {
   override val extendedResolutionRules =
+ExtractPythonUdfs ::
 sources.PreWriteCheck(catalog) ::
 sources.PreInsertCastAndRename ::
 Nil
@@ -1059,7 +1060,7 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
   @DeveloperApi
   protected[sql] class QueryExecution(val logical: LogicalPlan) {
 
-lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical))
+lazy val analyzed: LogicalPlan = analyzer(logical)
 lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed)
 lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e65dc1fd/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 3a2f8d7..69de4d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -205,6 +205,9 @@ case class EvaluatePython(
   extends logical.UnaryNode {
 
   def output = child.output :+ resultAttribute
+
+  // References should not include the produced attribute.
+  override def references = udf.references
 }
 
 /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Minor][SQL] Use same function to check path parameter in JSONRelation

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 4611de1ce -> ac506b7c2


[Minor][SQL] Use same function to check path parameter in JSONRelation

Author: Liang-Chi Hsieh 

Closes #4649 from viirya/use_checkpath and squashes the following commits:

0f9a1a1 [Liang-Chi Hsieh] Use same function to check path parameter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac506b7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac506b7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac506b7c

Branch: refs/heads/master
Commit: ac506b7c2846f656e03839bbd0e93827c7cc613e
Parents: 4611de1
Author: Liang-Chi Hsieh 
Authored: Tue Feb 17 12:24:13 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:24:13 2015 -0800

--
 .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala  | 4 ++--
 .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala| 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ac506b7c/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 2484863..3b68b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -37,7 +37,7 @@ private[sql] class DefaultSource
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val path = parameters.getOrElse("path", sys.error("Option 'path' not 
specified"))
+val path = checkPath(parameters)
 val samplingRatio = 
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
 JSONRelation(path, samplingRatio, None)(sqlContext)
@@ -48,7 +48,7 @@ private[sql] class DefaultSource
   sqlContext: SQLContext,
   parameters: Map[String, String],
   schema: StructType): BaseRelation = {
-val path = parameters.getOrElse("path", sys.error("Option 'path' not 
specified"))
+val path = checkPath(parameters)
 val samplingRatio = 
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
 JSONRelation(path, samplingRatio, Some(schema))(sqlContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/ac506b7c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0263e3b..485d5c9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -547,7 +547,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
 Map.empty[String, String])
 }.getMessage
 assert(
-  message.contains("Option 'path' not specified"),
+  message.contains("'path' must be specified for json data."),
   "We should complain that path is not specified.")
 
 sql("DROP TABLE savedJsonTable")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] [Minor] Update the HiveContext Unittest

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master ac506b7c2 -> 9d281fa56


[SQL] [Minor] Update the HiveContext Unittest

In unit test, the table src(key INT, value STRING) is not the same as HIVE 
src(key STRING, value STRING)
https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql

And in the reflect.q, test failed for expression `reflect("java.lang.Integer", 
"valueOf", key, 16)`, which expect the argument `key` as STRING not INT.

This PR doesn't aim to change the `src` schema, we can do that after 1.3 
released, however, we probably need to re-generate all the golden files.

Author: Cheng Hao 

Closes #4584 from chenghao-intel/reflect and squashes the following commits:

e5bdc3a [Cheng Hao] Move the test case reflect into blacklist
184abfd [Cheng Hao] revert the change to table src1
d9bcf92 [Cheng Hao] Update the HiveContext Unittest


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d281fa5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d281fa5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d281fa5

Branch: refs/heads/master
Commit: 9d281fa56022800dc008a3de233fec44379a2bd7
Parents: ac506b7
Author: Cheng Hao 
Authored: Tue Feb 17 12:25:35 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:25:35 2015 -0800

--
 .../spark/sql/hive/execution/HiveCompatibilitySuite.scala  | 6 ++
 .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala   | 1 +
 .../golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada | 1 +
 .../golden/udf_reflect2-1-7bec330c7bc6f71cbaf9bf1883d1b184 | 1 +
 .../golden/udf_reflect2-2-c5a05379f482215a5a484bed0299bf19 | 3 +++
 .../golden/udf_reflect2-3-effc057c78c00b0af26a4ac0f5f116ca | 0
 .../golden/udf_reflect2-4-73d466e70e96e9e5f0cd373b37d4e1f4 | 5 +
 7 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 133f2d3..c6ead45 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -225,6 +225,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest 
with BeforeAndAfter {
 // Needs constant object inspectors
 "udf_round",
 
+// the table src(key INT, value STRING) is not the same as HIVE unittest. 
In Hive
+// is src(key STRING, value STRING), and in the reflect.q, it failed in
+// Integer.valueOf, which expect the first argument passed as STRING type 
not INT.
+"udf_reflect",
+
 // Sort with Limit clause causes failure.
 "ctas",
 "ctas_hadoop20",
@@ -886,6 +891,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
 "udf_power",
 "udf_radians",
 "udf_rand",
+"udf_reflect2",
 "udf_regexp",
 "udf_regexp_extract",
 "udf_regexp_replace",

http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 840fbc1..a2d99f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -196,6 +196,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
 
   // The test tables that are defined in the Hive QTestUtil.
   // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+  // 
https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
   val hiveQTestUtilTables = Seq(
 TestTable("src",
   "CREATE TABLE src (key INT, value STRING)".cmd,

http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
--
diff --git 
a/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
 
b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
new file mode 100644
index 000..573541a
--- /dev/null
+++ 
b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a

spark git commit: [SQL] [Minor] Update the HiveContext Unittest

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 d74d5e86a -> 01356514e


[SQL] [Minor] Update the HiveContext Unittest

In unit test, the table src(key INT, value STRING) is not the same as HIVE 
src(key STRING, value STRING)
https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql

And in the reflect.q, test failed for expression `reflect("java.lang.Integer", 
"valueOf", key, 16)`, which expect the argument `key` as STRING not INT.

This PR doesn't aim to change the `src` schema, we can do that after 1.3 
released, however, we probably need to re-generate all the golden files.

Author: Cheng Hao 

Closes #4584 from chenghao-intel/reflect and squashes the following commits:

e5bdc3a [Cheng Hao] Move the test case reflect into blacklist
184abfd [Cheng Hao] revert the change to table src1
d9bcf92 [Cheng Hao] Update the HiveContext Unittest

(cherry picked from commit 9d281fa56022800dc008a3de233fec44379a2bd7)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01356514
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01356514
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01356514

Branch: refs/heads/branch-1.3
Commit: 01356514ef42321e09e9a67ba08366bb2bd5af4b
Parents: d74d5e8
Author: Cheng Hao 
Authored: Tue Feb 17 12:25:35 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:25:43 2015 -0800

--
 .../spark/sql/hive/execution/HiveCompatibilitySuite.scala  | 6 ++
 .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala   | 1 +
 .../golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada | 1 +
 .../golden/udf_reflect2-1-7bec330c7bc6f71cbaf9bf1883d1b184 | 1 +
 .../golden/udf_reflect2-2-c5a05379f482215a5a484bed0299bf19 | 3 +++
 .../golden/udf_reflect2-3-effc057c78c00b0af26a4ac0f5f116ca | 0
 .../golden/udf_reflect2-4-73d466e70e96e9e5f0cd373b37d4e1f4 | 5 +
 7 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
--
diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 133f2d3..c6ead45 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -225,6 +225,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest 
with BeforeAndAfter {
 // Needs constant object inspectors
 "udf_round",
 
+// the table src(key INT, value STRING) is not the same as HIVE unittest. 
In Hive
+// is src(key STRING, value STRING), and in the reflect.q, it failed in
+// Integer.valueOf, which expect the first argument passed as STRING type 
not INT.
+"udf_reflect",
+
 // Sort with Limit clause causes failure.
 "ctas",
 "ctas_hadoop20",
@@ -886,6 +891,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
 "udf_power",
 "udf_radians",
 "udf_rand",
+"udf_reflect2",
 "udf_regexp",
 "udf_regexp_extract",
 "udf_regexp_replace",

http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 840fbc1..a2d99f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -196,6 +196,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
 
   // The test tables that are defined in the Hive QTestUtil.
   // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+  // 
https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
   val hiveQTestUtilTables = Seq(
 TestTable("src",
   "CREATE TABLE src (key INT, value STRING)".cmd,

http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
--
diff --git 
a/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
 
b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada
new file mode 100

spark git commit: [Minor][SQL] Use same function to check path parameter in JSONRelation

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 62063b7a3 -> d74d5e86a


[Minor][SQL] Use same function to check path parameter in JSONRelation

Author: Liang-Chi Hsieh 

Closes #4649 from viirya/use_checkpath and squashes the following commits:

0f9a1a1 [Liang-Chi Hsieh] Use same function to check path parameter.

(cherry picked from commit ac506b7c2846f656e03839bbd0e93827c7cc613e)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d74d5e86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d74d5e86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d74d5e86

Branch: refs/heads/branch-1.3
Commit: d74d5e86abcb2ee4f142bb76f641d66cb4ffeb42
Parents: 62063b7
Author: Liang-Chi Hsieh 
Authored: Tue Feb 17 12:24:13 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:24:21 2015 -0800

--
 .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala  | 4 ++--
 .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala| 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d74d5e86/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 2484863..3b68b7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -37,7 +37,7 @@ private[sql] class DefaultSource
   override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): BaseRelation = {
-val path = parameters.getOrElse("path", sys.error("Option 'path' not 
specified"))
+val path = checkPath(parameters)
 val samplingRatio = 
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
 JSONRelation(path, samplingRatio, None)(sqlContext)
@@ -48,7 +48,7 @@ private[sql] class DefaultSource
   sqlContext: SQLContext,
   parameters: Map[String, String],
   schema: StructType): BaseRelation = {
-val path = parameters.getOrElse("path", sys.error("Option 'path' not 
specified"))
+val path = checkPath(parameters)
 val samplingRatio = 
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
 JSONRelation(path, samplingRatio, Some(schema))(sqlContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/d74d5e86/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0263e3b..485d5c9 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -547,7 +547,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
 Map.empty[String, String])
 }.getMessage
 assert(
-  message.contains("Option 'path' not specified"),
+  message.contains("'path' must be specified for json data."),
   "We should complain that path is not specified.")
 
 sql("DROP TABLE savedJsonTable")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5636c4a58 -> 62063b7a3


[SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the 
given plan multiple times if there are many Metastore Parquet tables. Since the 
transformUp operation is recursive, it should be better to only perform it once.

Author: Liang-Chi Hsieh 

Closes #4651 from viirya/parquet_atonce and squashes the following commits:

c1ed29d [Liang-Chi Hsieh] Fix bug.
e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once.

(cherry picked from commit 4611de1cef7363bc71ec608560dfd866ae477747)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62063b7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62063b7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62063b7a

Branch: refs/heads/branch-1.3
Commit: 62063b7a3e2db9fc7320739d3b900a7840c2dee7
Parents: 5636c4a
Author: Liang-Chi Hsieh 
Authored: Tue Feb 17 12:23:18 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:23:26 2015 -0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 37 +++-
 1 file changed, 20 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62063b7a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0e43faa..cfd6f27 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   hive.convertMetastoreParquet &&
   hive.conf.parquetUseDataSourceApi &&
   
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-  relation
+  val parquetRelation = convertToParquetRelation(relation)
+  val attributedRewrites = relation.output.zip(parquetRelation.output)
+  (relation, parquetRelation, attributedRewrites)
 
 // Read path
 case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
 if hive.convertMetastoreParquet &&
   hive.conf.parquetUseDataSourceApi &&
   
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-  relation
+  val parquetRelation = convertToParquetRelation(relation)
+  val attributedRewrites = relation.output.zip(parquetRelation.output)
+  (relation, parquetRelation, attributedRewrites)
   }
 
+  val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
+  val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
+
   // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
   // attribute IDs referenced in other nodes.
-  toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
-val parquetRelation = convertToParquetRelation(relation)
-val attributedRewrites = 
AttributeMap(relation.output.zip(parquetRelation.output))
-
-lastPlan.transformUp {
-  case r: MetastoreRelation if r == relation => {
-val withAlias =
-  r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
-Subquery(r.tableName, parquetRelation))
-
-withAlias
-  }
-  case other => other.transformExpressions {
-case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, 
a)
-  }
+  plan.transformUp {
+case r: MetastoreRelation if relationMap.contains(r) => {
+  val parquetRelation = relationMap(r)
+  val withAlias =
+r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+  Subquery(r.tableName, parquetRelation))
+
+  withAlias
+}
+case other => other.transformExpressions {
+  case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 31efb39c1 -> 4611de1ce


[SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the 
given plan multiple times if there are many Metastore Parquet tables. Since the 
transformUp operation is recursive, it should be better to only perform it once.

Author: Liang-Chi Hsieh 

Closes #4651 from viirya/parquet_atonce and squashes the following commits:

c1ed29d [Liang-Chi Hsieh] Fix bug.
e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4611de1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4611de1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4611de1c

Branch: refs/heads/master
Commit: 4611de1cef7363bc71ec608560dfd866ae477747
Parents: 31efb39
Author: Liang-Chi Hsieh 
Authored: Tue Feb 17 12:23:18 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:23:18 2015 -0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 37 +++-
 1 file changed, 20 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4611de1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0e43faa..cfd6f27 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   hive.convertMetastoreParquet &&
   hive.conf.parquetUseDataSourceApi &&
   
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-  relation
+  val parquetRelation = convertToParquetRelation(relation)
+  val attributedRewrites = relation.output.zip(parquetRelation.output)
+  (relation, parquetRelation, attributedRewrites)
 
 // Read path
 case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
 if hive.convertMetastoreParquet &&
   hive.conf.parquetUseDataSourceApi &&
   
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
-  relation
+  val parquetRelation = convertToParquetRelation(relation)
+  val attributedRewrites = relation.output.zip(parquetRelation.output)
+  (relation, parquetRelation, attributedRewrites)
   }
 
+  val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
+  val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
+
   // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
   // attribute IDs referenced in other nodes.
-  toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) =>
-val parquetRelation = convertToParquetRelation(relation)
-val attributedRewrites = 
AttributeMap(relation.output.zip(parquetRelation.output))
-
-lastPlan.transformUp {
-  case r: MetastoreRelation if r == relation => {
-val withAlias =
-  r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
-Subquery(r.tableName, parquetRelation))
-
-withAlias
-  }
-  case other => other.transformExpressions {
-case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, 
a)
-  }
+  plan.transformUp {
+case r: MetastoreRelation if relationMap.contains(r) => {
+  val parquetRelation = relationMap(r)
+  val withAlias =
+r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+  Subquery(r.tableName, parquetRelation))
+
+  withAlias
+}
+case other => other.transformExpressions {
+  case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Minor] fix typo in SQL document

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master fc4eb9505 -> 31efb39c1


[Minor] fix typo in SQL document

Author: CodingCat 

Closes #4656 from CodingCat/fix_typo and squashes the following commits:

b41d15c [CodingCat] recover
689fe46 [CodingCat] fix typo


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31efb39c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31efb39c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31efb39c

Branch: refs/heads/master
Commit: 31efb39c1deb253032b38e8fbafde4b2b1dde1f6
Parents: fc4eb95
Author: CodingCat 
Authored: Tue Feb 17 12:16:52 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:16:52 2015 -0800

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31efb39c/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 8022c5e..0146a4e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -159,7 +159,7 @@ you to construct DataFrames when the columns and their 
types are not known until
 
 
 
-The Scala interaface for Spark SQL supports automatically converting an RDD 
containing case classes
+The Scala interface for Spark SQL supports automatically converting an RDD 
containing case classes
 to a DataFrame.  The case class
 defines the schema of the table.  The names of the arguments to the case class 
are read using
 reflection and become the names of the columns. Case classes can also be 
nested or contain complex


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [Minor] fix typo in SQL document

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 71cf6e295 -> 5636c4a58


[Minor] fix typo in SQL document

Author: CodingCat 

Closes #4656 from CodingCat/fix_typo and squashes the following commits:

b41d15c [CodingCat] recover
689fe46 [CodingCat] fix typo

(cherry picked from commit 31efb39c1deb253032b38e8fbafde4b2b1dde1f6)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5636c4a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5636c4a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5636c4a5

Branch: refs/heads/branch-1.3
Commit: 5636c4a583bd28a5b54e14930d72fd9265c301de
Parents: 71cf6e2
Author: CodingCat 
Authored: Tue Feb 17 12:16:52 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 12:17:05 2015 -0800

--
 docs/sql-programming-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5636c4a5/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 8022c5e..0146a4e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -159,7 +159,7 @@ you to construct DataFrames when the columns and their 
types are not known until
 
 
 
-The Scala interaface for Spark SQL supports automatically converting an RDD 
containing case classes
+The Scala interface for Spark SQL supports automatically converting an RDD 
containing case classes
 to a DataFrame.  The case class
 defines the schema of the table.  The names of the arguments to the case class 
are read using
 reflection and become the names of the columns. Case classes can also be 
nested or contain complex


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5864] [PySpark] support .jar as python package

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e64afcd84 -> 71cf6e295


[SPARK-5864] [PySpark] support .jar as python package

A jar file containing Python sources in it could be used as a Python package, 
just like zip file.

spark-submit already put the jar file into PYTHONPATH, this patch also put it 
in the sys.path, then it could be used in Python worker.

Author: Davies Liu 

Closes #4652 from davies/jar and squashes the following commits:

17d3f76 [Davies Liu] support .jar as python package

(cherry picked from commit fc4eb9505adda192eb38cb4454d532027690bfa3)
Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71cf6e29
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71cf6e29
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71cf6e29

Branch: refs/heads/branch-1.3
Commit: 71cf6e295111e2522ef161b41cf9ee55db29fc6c
Parents: e64afcd
Author: Davies Liu 
Authored: Tue Feb 17 12:05:06 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 12:05:19 2015 -0800

--
 python/pyspark/context.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71cf6e29/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index bf1f61c..40b3152 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -64,6 +64,8 @@ class SparkContext(object):
 _lock = Lock()
 _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
 
+PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
+
 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
  environment=None, batchSize=0, serializer=PickleSerializer(), 
conf=None,
  gateway=None, jsc=None, profiler_cls=BasicProfiler):
@@ -185,7 +187,7 @@ class SparkContext(object):
 for path in self._conf.get("spark.submit.pyFiles", "").split(","):
 if path != "":
 (dirname, filename) = os.path.split(path)
-if filename.lower().endswith("zip") or 
filename.lower().endswith("egg"):
+if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
 self._python_includes.append(filename)
 sys.path.insert(1, 
os.path.join(SparkFiles.getRootDirectory(), filename))
 
@@ -705,7 +707,7 @@ class SparkContext(object):
 self.addFile(path)
 (dirname, filename) = os.path.split(path)  # dirname may be directory 
or HDFS/S3 prefix
 
-if filename.endswith('.zip') or filename.endswith('.ZIP') or 
filename.endswith('.egg'):
+if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
 self._python_includes.append(filename)
 # for tests in local mode
 sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), 
filename))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5864] [PySpark] support .jar as python package

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 49c19fdba -> fc4eb9505


[SPARK-5864] [PySpark] support .jar as python package

A jar file containing Python sources in it could be used as a Python package, 
just like zip file.

spark-submit already put the jar file into PYTHONPATH, this patch also put it 
in the sys.path, then it could be used in Python worker.

Author: Davies Liu 

Closes #4652 from davies/jar and squashes the following commits:

17d3f76 [Davies Liu] support .jar as python package


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc4eb950
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc4eb950
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc4eb950

Branch: refs/heads/master
Commit: fc4eb9505adda192eb38cb4454d532027690bfa3
Parents: 49c19fd
Author: Davies Liu 
Authored: Tue Feb 17 12:05:06 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 12:05:06 2015 -0800

--
 python/pyspark/context.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc4eb950/python/pyspark/context.py
--
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index bf1f61c..40b3152 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -64,6 +64,8 @@ class SparkContext(object):
 _lock = Lock()
 _python_includes = None  # zip and egg files that need to be added to 
PYTHONPATH
 
+PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
+
 def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
  environment=None, batchSize=0, serializer=PickleSerializer(), 
conf=None,
  gateway=None, jsc=None, profiler_cls=BasicProfiler):
@@ -185,7 +187,7 @@ class SparkContext(object):
 for path in self._conf.get("spark.submit.pyFiles", "").split(","):
 if path != "":
 (dirname, filename) = os.path.split(path)
-if filename.lower().endswith("zip") or 
filename.lower().endswith("egg"):
+if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
 self._python_includes.append(filename)
 sys.path.insert(1, 
os.path.join(SparkFiles.getRootDirectory(), filename))
 
@@ -705,7 +707,7 @@ class SparkContext(object):
 self.addFile(path)
 (dirname, filename) = os.path.split(path)  # dirname may be directory 
or HDFS/S3 prefix
 
-if filename.endswith('.zip') or filename.endswith('.ZIP') or 
filename.endswith('.egg'):
+if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
 self._python_includes.append(filename)
 # for tests in local mode
 sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), 
filename))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 24f358b9d -> 49c19fdba


SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager

Avoid call to remove shutdown hook being called from shutdown hook

CC pwendell JoshRosen MattWhelan

Author: Sean Owen 

Closes #4648 from srowen/SPARK-5841.2 and squashes the following commits:

51548db [Sean Owen] Avoid call to remove shutdown hook being called from 
shutdown hook


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49c19fdb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49c19fdb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49c19fdb

Branch: refs/heads/master
Commit: 49c19fdbad57f0609bbcc9278f9eaa8115a73604
Parents: 24f358b
Author: Sean Owen 
Authored: Tue Feb 17 19:40:06 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 19:40:06 2015 +

--
 .../main/scala/org/apache/spark/storage/DiskBlockManager.scala  | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/49c19fdb/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index ae9df8c..b297f3f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,7 +138,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
 val shutdownHook = new Thread("delete Spark local dirs") {
   override def run(): Unit = Utils.logUncaughtExceptions {
 logDebug("Shutdown hook called")
-DiskBlockManager.this.stop()
+DiskBlockManager.this.doStop()
   }
 }
 Runtime.getRuntime.addShutdownHook(shutdownHook)
@@ -149,7 +149,10 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   private[spark] def stop() {
 // Remove the shutdown hook.  It causes memory leaks if we leave it around.
 Runtime.getRuntime.removeShutdownHook(shutdownHook)
+doStop()
+  }
 
+  private def doStop(): Unit = {
 // Only perform cleanup if an external service is not serving our shuffle 
files.
 if (!blockManager.externalShuffleServiceEnabled || 
blockManager.blockManagerId.isDriver) {
   localDirs.foreach { localDir =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 420bc9b3a -> e64afcd84


SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager

Avoid call to remove shutdown hook being called from shutdown hook

CC pwendell JoshRosen MattWhelan

Author: Sean Owen 

Closes #4648 from srowen/SPARK-5841.2 and squashes the following commits:

51548db [Sean Owen] Avoid call to remove shutdown hook being called from 
shutdown hook

(cherry picked from commit 49c19fdbad57f0609bbcc9278f9eaa8115a73604)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e64afcd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e64afcd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e64afcd8

Branch: refs/heads/branch-1.3
Commit: e64afcd8449c161ea134125ca469f5b9c09dbc60
Parents: 420bc9b
Author: Sean Owen 
Authored: Tue Feb 17 19:40:06 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 19:40:15 2015 +

--
 .../main/scala/org/apache/spark/storage/DiskBlockManager.scala  | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e64afcd8/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index ae9df8c..b297f3f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,7 +138,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
 val shutdownHook = new Thread("delete Spark local dirs") {
   override def run(): Unit = Utils.logUncaughtExceptions {
 logDebug("Shutdown hook called")
-DiskBlockManager.this.stop()
+DiskBlockManager.this.doStop()
   }
 }
 Runtime.getRuntime.addShutdownHook(shutdownHook)
@@ -149,7 +149,10 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   private[spark] def stop() {
 // Remove the shutdown hook.  It causes memory leaks if we leave it around.
 Runtime.getRuntime.removeShutdownHook(shutdownHook)
+doStop()
+  }
 
+  private def doStop(): Unit = {
 // Only perform cleanup if an external service is not serving our shuffle 
files.
 if (!blockManager.externalShuffleServiceEnabled || 
blockManager.blockManagerId.isDriver) {
   localDirs.foreach { localDir =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: MAINTENANCE: Automated closing of pull requests.

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 9b746f380 -> 24f358b9d


MAINTENANCE: Automated closing of pull requests.

This commit exists to close the following pull requests on Github:

Closes #3297 (close requested by 'andrewor14')
Closes #3345 (close requested by 'pwendell')
Closes #2729 (close requested by 'srowen')
Closes #2320 (close requested by 'pwendell')
Closes #4529 (close requested by 'andrewor14')
Closes #2098 (close requested by 'srowen')
Closes #4120 (close requested by 'andrewor14')


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24f358b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24f358b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24f358b9

Branch: refs/heads/master
Commit: 24f358b9d6bc7a72a4fb493b7f845a40ed941a5d
Parents: 9b746f3
Author: Patrick Wendell 
Authored: Tue Feb 17 11:35:26 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 11:35:26 2015 -0800

--

--



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 2bf2b56ef -> 420bc9b3a


[SPARK-5661]function hasShutdownDeleteTachyonDir should use 
shutdownDeleteTachyonPaths to determine whether contains file

hasShutdownDeleteTachyonDir(file: TachyonFile) should use 
shutdownDeleteTachyonPaths(not shutdownDeletePaths) to determine Whether 
contain file. To solve it ,delete two unused function.

Author: xukun 00228947 
Author: viper-kun 

Closes #4418 from viper-kun/deleteunusedfun and squashes the following commits:

87340eb [viper-kun] fix style
3d6c69e [xukun 00228947] fix bug
2bc397e [xukun 00228947] deleteunusedfun

(cherry picked from commit b271c265b742fa6947522eda4592e9e6a7fd1f3a)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/420bc9b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/420bc9b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/420bc9b3

Branch: refs/heads/branch-1.3
Commit: 420bc9b3ae49883f17962b1cff18a3b8df3bd05c
Parents: 2bf2b56
Author: xukun 00228947 
Authored: Tue Feb 17 18:59:41 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 18:59:51 2015 +

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/420bc9b3/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c06bd6f..df21ed3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -213,8 +213,8 @@ private[spark] object Utils extends Logging {
   // Is the path already registered to be deleted via a shutdown hook ?
   def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
 val absolutePath = file.getPath()
-shutdownDeletePaths.synchronized {
-  shutdownDeletePaths.contains(absolutePath)
+shutdownDeleteTachyonPaths.synchronized {
+  shutdownDeleteTachyonPaths.contains(absolutePath)
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-3381] [MLlib] Eliminate bins for unordered features in DecisionTrees

2015-02-17 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master b271c265b -> 9b746f380


[SPARK-3381] [MLlib] Eliminate bins for unordered features in DecisionTrees

For unordered features, it is sufficient to use splits since the threshold of 
the split corresponds the threshold of the HighSplit of the bin and there is no 
use of the LowSplit.

Author: MechCoder 

Closes #4231 from MechCoder/spark-3381 and squashes the following commits:

58c19a5 [MechCoder] COSMIT
c274b74 [MechCoder] Remove unordered feature calculation in 
labeledPointToTreePoint
b2b9b89 [MechCoder] COSMIT
d3ee042 [MechCoder] [SPARK-3381] [MLlib] Eliminate bins for unordered features


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b746f38
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b746f38
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b746f38

Branch: refs/heads/master
Commit: 9b746f380869b54d673e3758ca5e4475f76c864a
Parents: b271c26
Author: MechCoder 
Authored: Tue Feb 17 11:19:23 2015 -0800
Committer: Joseph K. Bradley 
Committed: Tue Feb 17 11:19:23 2015 -0800

--
 .../apache/spark/mllib/tree/DecisionTree.scala  | 37 ++--
 .../spark/mllib/tree/impl/TreePoint.scala   | 14 +++-
 .../spark/mllib/tree/DecisionTreeSuite.scala| 37 +---
 3 files changed, 15 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b746f38/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index f1f8599..b9d0c56 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -327,14 +327,14 @@ object DecisionTree extends Serializable with Logging {
* @param agg  Array storing aggregate calculation, with a set of sufficient 
statistics for
* each (feature, bin).
* @param treePoint  Data point being aggregated.
-   * @param bins possible bins for all features, indexed (numFeatures)(numBins)
+   * @param splits possible splits indexed (numFeatures)(numSplits)
* @param unorderedFeatures  Set of indices of unordered features.
* @param instanceWeight  Weight (importance) of instance in dataset.
*/
   private def mixedBinSeqOp(
   agg: DTStatsAggregator,
   treePoint: TreePoint,
-  bins: Array[Array[Bin]],
+  splits: Array[Array[Split]],
   unorderedFeatures: Set[Int],
   instanceWeight: Double,
   featuresForNode: Option[Array[Int]]): Unit = {
@@ -362,7 +362,7 @@ object DecisionTree extends Serializable with Logging {
 val numSplits = agg.metadata.numSplits(featureIndex)
 var splitIndex = 0
 while (splitIndex < numSplits) {
-  if 
(bins(featureIndex)(splitIndex).highSplit.categories.contains(featureValue)) {
+  if 
(splits(featureIndex)(splitIndex).categories.contains(featureValue)) {
 agg.featureUpdate(leftNodeFeatureOffset, splitIndex, 
treePoint.label,
   instanceWeight)
   } else {
@@ -506,8 +506,8 @@ object DecisionTree extends Serializable with Logging {
 if (metadata.unorderedFeatures.isEmpty) {
   orderedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, 
instanceWeight, featuresForNode)
 } else {
-  mixedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, bins, 
metadata.unorderedFeatures,
-instanceWeight, featuresForNode)
+  mixedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, splits,
+metadata.unorderedFeatures, instanceWeight, featuresForNode)
 }
   }
 }
@@ -1024,35 +1024,15 @@ object DecisionTree extends Serializable with Logging {
 // Categorical feature
 val featureArity = metadata.featureArity(featureIndex)
 if (metadata.isUnordered(featureIndex)) {
-  // TODO: The second half of the bins are unused.  Actually, we 
could just use
-  //   splits and not build bins for unordered features.  That 
should be part of
-  //   a later PR since it will require changing other code 
(using splits instead
-  //   of bins in a few places).
   // Unordered features
-  //   2^(maxFeatureValue - 1) - 1 combinations
+  // 2^(maxFeatureValue - 1) - 1 combinations
   splits(featureIndex) = new Array[Split](numSplits)
-  bins(featureIndex) = new Array[Bin](numBins)
   var splitIndex = 0
   while (splitIndex < numSplits) {
 val categories: List[Double] 

spark git commit: [SPARK-5778] throw if nonexistent metrics config file provided

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 4a581aa3f -> 2bf2b56ef


[SPARK-5778] throw if nonexistent metrics config file provided

previous behavior was to log an error; this is fine in the general
case where no `spark.metrics.conf` parameter was specified, in which
case a default `metrics.properties` is looked for, and the execption
logged and suppressed if it doesn't exist.

if the user has purposefully specified a metrics.conf file, however,
it makes more sense to show them an error when said file doesn't
exist.

Author: Ryan Williams 

Closes #4571 from ryan-williams/metrics and squashes the following commits:

5bccb14 [Ryan Williams] private-ize some MetricsConfig members
08ff998 [Ryan Williams] rename METRICS_CONF: DEFAULT_METRICS_CONF_FILENAME
f4d7fab [Ryan Williams] fix tests
ad24b0e [Ryan Williams] add "metrics.properties" to .rat-excludes
94e810b [Ryan Williams] throw if nonexistent Sink class is specified
31d2c30 [Ryan Williams] metrics code review feedback
56287db [Ryan Williams] throw if nonexistent metrics config file provided

(cherry picked from commit d8f69cf78862d13a48392a0b94388b8d403523da)
Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bf2b56e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bf2b56e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bf2b56e

Branch: refs/heads/branch-1.3
Commit: 2bf2b56ef0c0fbb4cead030e44910453b2cbb6fd
Parents: 4a581aa
Author: Ryan Williams 
Authored: Tue Feb 17 10:57:16 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 10:57:47 2015 -0800

--
 .rat-excludes   |  1 +
 .../apache/spark/metrics/MetricsConfig.scala| 32 +++-
 .../apache/spark/metrics/MetricsSystem.scala|  5 ++-
 .../resources/test_metrics_system.properties|  2 --
 .../spark/metrics/MetricsConfigSuite.scala  |  2 +-
 5 files changed, 23 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2bf2b56e/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 769defb..e3c0331 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -18,6 +18,7 @@ fairscheduler.xml.template
 spark-defaults.conf.template
 log4j.properties
 log4j.properties.template
+metrics.properties
 metrics.properties.template
 slaves
 slaves.template

http://git-wip-us.apache.org/repos/asf/spark/blob/2bf2b56e/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
--
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 1b7a5d1..8edf493 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -28,12 +28,12 @@ import org.apache.spark.util.Utils
 
 private[spark] class MetricsConfig(val configFile: Option[String]) extends 
Logging {
 
-  val DEFAULT_PREFIX = "*"
-  val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
-  val METRICS_CONF = "metrics.properties"
+  private val DEFAULT_PREFIX = "*"
+  private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+  private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
 
-  val properties = new Properties()
-  var propertyCategories: mutable.HashMap[String, Properties] = null
+  private[metrics] val properties = new Properties()
+  private[metrics] var propertyCategories: mutable.HashMap[String, Properties] 
= null
 
   private def setDefaultProperties(prop: Properties) {
 prop.setProperty("*.sink.servlet.class", 
"org.apache.spark.metrics.sink.MetricsServlet")
@@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: 
Option[String]) extends Loggi
 setDefaultProperties(properties)
 
 // If spark.metrics.conf is not set, try to get file in class path
-var is: InputStream = null
-try {
-  is = configFile match {
-case Some(f) => new FileInputStream(f)
-case None => 
Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
+val isOpt: Option[InputStream] = configFile.map(new 
FileInputStream(_)).orElse {
+  try {
+
Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
+  } catch {
+case e: Exception =>
+  logError("Error loading default configuration file", e)
+  None
   }
+}
 
-  if (is != null) {
+isOpt.foreach { is =>
+  try {
 properties.load(is)
+  } finally {
+is.close()
   }
-} catch {
-  case e: Exception => logError("Error loading configure file", e)
-} finally {
-  if (is != null

spark git commit: [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master d8f69cf78 -> b271c265b


[SPARK-5661]function hasShutdownDeleteTachyonDir should use 
shutdownDeleteTachyonPaths to determine whether contains file

hasShutdownDeleteTachyonDir(file: TachyonFile) should use 
shutdownDeleteTachyonPaths(not shutdownDeletePaths) to determine Whether 
contain file. To solve it ,delete two unused function.

Author: xukun 00228947 
Author: viper-kun 

Closes #4418 from viper-kun/deleteunusedfun and squashes the following commits:

87340eb [viper-kun] fix style
3d6c69e [xukun 00228947] fix bug
2bc397e [xukun 00228947] deleteunusedfun


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b271c265
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b271c265
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b271c265

Branch: refs/heads/master
Commit: b271c265b742fa6947522eda4592e9e6a7fd1f3a
Parents: d8f69cf
Author: xukun 00228947 
Authored: Tue Feb 17 18:59:41 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 18:59:41 2015 +

--
 core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b271c265/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c06bd6f..df21ed3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -213,8 +213,8 @@ private[spark] object Utils extends Logging {
   // Is the path already registered to be deleted via a shutdown hook ?
   def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
 val absolutePath = file.getPath()
-shutdownDeletePaths.synchronized {
-  shutdownDeletePaths.contains(absolutePath)
+shutdownDeleteTachyonPaths.synchronized {
+  shutdownDeleteTachyonPaths.contains(absolutePath)
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5778] throw if nonexistent metrics config file provided

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master d8adefefc -> d8f69cf78


[SPARK-5778] throw if nonexistent metrics config file provided

previous behavior was to log an error; this is fine in the general
case where no `spark.metrics.conf` parameter was specified, in which
case a default `metrics.properties` is looked for, and the execption
logged and suppressed if it doesn't exist.

if the user has purposefully specified a metrics.conf file, however,
it makes more sense to show them an error when said file doesn't
exist.

Author: Ryan Williams 

Closes #4571 from ryan-williams/metrics and squashes the following commits:

5bccb14 [Ryan Williams] private-ize some MetricsConfig members
08ff998 [Ryan Williams] rename METRICS_CONF: DEFAULT_METRICS_CONF_FILENAME
f4d7fab [Ryan Williams] fix tests
ad24b0e [Ryan Williams] add "metrics.properties" to .rat-excludes
94e810b [Ryan Williams] throw if nonexistent Sink class is specified
31d2c30 [Ryan Williams] metrics code review feedback
56287db [Ryan Williams] throw if nonexistent metrics config file provided


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f69cf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f69cf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f69cf7

Branch: refs/heads/master
Commit: d8f69cf78862d13a48392a0b94388b8d403523da
Parents: d8adefe
Author: Ryan Williams 
Authored: Tue Feb 17 10:57:16 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 10:57:16 2015 -0800

--
 .rat-excludes   |  1 +
 .../apache/spark/metrics/MetricsConfig.scala| 32 +++-
 .../apache/spark/metrics/MetricsSystem.scala|  5 ++-
 .../resources/test_metrics_system.properties|  2 --
 .../spark/metrics/MetricsConfigSuite.scala  |  2 +-
 5 files changed, 23 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d8f69cf7/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index a788e82..8c61e67 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -19,6 +19,7 @@ fairscheduler.xml.template
 spark-defaults.conf.template
 log4j.properties
 log4j.properties.template
+metrics.properties
 metrics.properties.template
 slaves
 slaves.template

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f69cf7/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
--
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 1b7a5d1..8edf493 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -28,12 +28,12 @@ import org.apache.spark.util.Utils
 
 private[spark] class MetricsConfig(val configFile: Option[String]) extends 
Logging {
 
-  val DEFAULT_PREFIX = "*"
-  val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
-  val METRICS_CONF = "metrics.properties"
+  private val DEFAULT_PREFIX = "*"
+  private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+  private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
 
-  val properties = new Properties()
-  var propertyCategories: mutable.HashMap[String, Properties] = null
+  private[metrics] val properties = new Properties()
+  private[metrics] var propertyCategories: mutable.HashMap[String, Properties] 
= null
 
   private def setDefaultProperties(prop: Properties) {
 prop.setProperty("*.sink.servlet.class", 
"org.apache.spark.metrics.sink.MetricsServlet")
@@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: 
Option[String]) extends Loggi
 setDefaultProperties(properties)
 
 // If spark.metrics.conf is not set, try to get file in class path
-var is: InputStream = null
-try {
-  is = configFile match {
-case Some(f) => new FileInputStream(f)
-case None => 
Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF)
+val isOpt: Option[InputStream] = configFile.map(new 
FileInputStream(_)).orElse {
+  try {
+
Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
+  } catch {
+case e: Exception =>
+  logError("Error loading default configuration file", e)
+  None
   }
+}
 
-  if (is != null) {
+isOpt.foreach { is =>
+  try {
 properties.load(is)
+  } finally {
+is.close()
   }
-} catch {
-  case e: Exception => logError("Error loading configure file", e)
-} finally {
-  if (is != null) is.close()
 }
 
 propertyCategories = subProperties(properties, INSTANCE_REGEX)

http://git-wip-us.a

spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cd3d41587 -> 4a581aa3f


[SPARK-5859] [PySpark] [SQL] fix DataFrame Python API

1. added explain()
2. add isLocal()
3. do not call show() in __repl__
4. add foreach() and foreachPartition()
5. add distinct()
6. fix functions.col()/column()/lit()
7. fix unit tests in sql/functions.py
8. fix unicode in showString()

Author: Davies Liu 

Closes #4645 from davies/df6 and squashes the following commits:

6b46a2c [Davies Liu] fix DataFrame Python API

(cherry picked from commit d8adefefcc2a4af32295440ed1d4917a6968f017)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a581aa3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a581aa3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a581aa3

Branch: refs/heads/branch-1.3
Commit: 4a581aa3f9144732cc0f6dab6e46f72035e072f4
Parents: cd3d415
Author: Davies Liu 
Authored: Tue Feb 17 10:22:48 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 10:22:56 2015 -0800

--
 python/pyspark/sql/dataframe.py | 65 ++--
 python/pyspark/sql/functions.py | 12 +++
 2 files changed, 59 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a581aa3/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 28a59e7..8417240 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -238,6 +238,22 @@ class DataFrame(object):
 """
 print (self._jdf.schema().treeString())
 
+def explain(self, extended=False):
+"""
+Prints the plans (logical and physical) to the console for
+debugging purpose.
+
+If extended is False, only prints the physical plan.
+"""
+self._jdf.explain(extended)
+
+def isLocal(self):
+"""
+Returns True if the `collect` and `take` methods can be run locally
+(without any Spark executors).
+"""
+return self._jdf.isLocal()
+
 def show(self):
 """
 Print the first 20 rows.
@@ -247,14 +263,12 @@ class DataFrame(object):
 2   Alice
 5   Bob
 >>> df
-age name
-2   Alice
-5   Bob
+DataFrame[age: int, name: string]
 """
-print (self)
+print self._jdf.showString().encode('utf8', 'ignore')
 
 def __repr__(self):
-return self._jdf.showString()
+return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
 
 def count(self):
 """Return the number of elements in this RDD.
@@ -336,6 +350,8 @@ class DataFrame(object):
 """
 Return a new RDD by applying a function to each partition.
 
+It's a shorthand for df.rdd.mapPartitions()
+
 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
 >>> def f(iterator): yield 1
 >>> rdd.mapPartitions(f).sum()
@@ -343,6 +359,31 @@ class DataFrame(object):
 """
 return self.rdd.mapPartitions(f, preservesPartitioning)
 
+def foreach(self, f):
+"""
+Applies a function to all rows of this DataFrame.
+
+It's a shorthand for df.rdd.foreach()
+
+>>> def f(person):
+... print person.name
+>>> df.foreach(f)
+"""
+return self.rdd.foreach(f)
+
+def foreachPartition(self, f):
+"""
+Applies a function to each partition of this DataFrame.
+
+It's a shorthand for df.rdd.foreachPartition()
+
+>>> def f(people):
+... for person in people:
+... print person.name
+>>> df.foreachPartition(f)
+"""
+return self.rdd.foreachPartition(f)
+
 def cache(self):
 """ Persist with the default storage level (C{MEMORY_ONLY_SER}).
 """
@@ -377,8 +418,13 @@ class DataFrame(object):
 """ Return a new :class:`DataFrame` that has exactly `numPartitions`
 partitions.
 """
-rdd = self._jdf.repartition(numPartitions, None)
-return DataFrame(rdd, self.sql_ctx)
+return DataFrame(self._jdf.repartition(numPartitions, None), 
self.sql_ctx)
+
+def distinct(self):
+"""
+Return a new :class:`DataFrame` containing the distinct rows in this 
DataFrame.
+"""
+return DataFrame(self._jdf.distinct(), self.sql_ctx)
 
 def sample(self, withReplacement, fraction, seed=None):
 """
@@ -957,10 +1003,7 @@ class Column(DataFrame):
 return Column(jc, self.sql_ctx)
 
 def __repr__(self):
-if self._jdf.isComputable():
-return self._jdf.samples()
-else:
-ret

[1/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 97cb568a2 -> cd3d41587


http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
index 89920f2..4f38110 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
@@ -143,7 +143,7 @@ class MySQLDatabase {
   }
 
   test("Basic test") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
 val rows = rdd.collect
 assert(rows.length == 2)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -153,7 +153,7 @@ class MySQLDatabase {
   }
 
   test("Numeric types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -181,7 +181,7 @@ class MySQLDatabase {
   }
 
   test("Date types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -199,7 +199,7 @@ class MySQLDatabase {
   }
 
   test("String types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -225,9 +225,9 @@ class MySQLDatabase {
   }
 
   test("Basic write test") {
-val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
-val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
-val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
+val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
+val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
 rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
 rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false)
 rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false)

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
index c174d7a..7b47fee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
@@ -113,7 +113,7 @@ class PostgresDatabase {
   }
 
   test("Type mapping for various types") {
-val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -142,7 +142,7 @@ class PostgresDatabase {
   }
 
   test("Basic write test") {
-val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
 rdd.createJDBCTable(url(db.ip), "public.barcopy", false)
 // Test only that it doesn't bomb out.
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index bfacc51..07b5a84 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
 import org.apache.spark.sql.SQLContext
 
 /**
- * Implementation for "describe [extended] table".
- *
  * :: DeveloperApi ::
+ *
+ * Implementation for "describe [extended] table".
  */
 @DeveloperApi
 case class DescribeHiveTableCommand(

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/

[1/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master c76da36c2 -> c74b07fa9


http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
index 89920f2..4f38110 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala
@@ -143,7 +143,7 @@ class MySQLDatabase {
   }
 
   test("Basic test") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
 val rows = rdd.collect
 assert(rows.length == 2)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -153,7 +153,7 @@ class MySQLDatabase {
   }
 
   test("Numeric types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -181,7 +181,7 @@ class MySQLDatabase {
   }
 
   test("Date types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -199,7 +199,7 @@ class MySQLDatabase {
   }
 
   test("String types") {
-val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -225,9 +225,9 @@ class MySQLDatabase {
   }
 
   test("Basic write test") {
-val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
-val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
-val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
+val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
+val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
+val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
 rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
 rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false)
 rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false)

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
index c174d7a..7b47fee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala
@@ -113,7 +113,7 @@ class PostgresDatabase {
   }
 
   test("Type mapping for various types") {
-val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
 val rows = rdd.collect
 assert(rows.length == 1)
 val types = rows(0).toSeq.map(x => x.getClass.toString)
@@ -142,7 +142,7 @@ class PostgresDatabase {
   }
 
   test("Basic write test") {
-val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
+val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
 rdd.createJDBCTable(url(db.ip), "public.barcopy", false)
 // Test only that it doesn't bomb out.
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
index bfacc51..07b5a84 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
 import org.apache.spark.sql.SQLContext
 
 /**
- * Implementation for "describe [extended] table".
- *
  * :: DeveloperApi ::
+ *
+ * Implementation for "describe [extended] table".
  */
 @DeveloperApi
 case class DescribeHiveTableCommand(

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/

[2/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

2015-02-17 Thread marmbrus
[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

Author: Michael Armbrust 

Closes #4642 from marmbrus/docs and squashes the following commits:

d291c34 [Michael Armbrust] python tests
9be66e3 [Michael Armbrust] comments
d56afc2 [Michael Armbrust] fix style
f004747 [Michael Armbrust] fix build
c4a907b [Michael Armbrust] fix tests
42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up.

(cherry picked from commit c74b07fa94a8da50437d952ae05cf6ac70fbb93e)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd3d4158
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd3d4158
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd3d4158

Branch: refs/heads/branch-1.3
Commit: cd3d4158721b5c3cc35df47675f4f4d9540be6f1
Parents: 97cb568
Author: Michael Armbrust 
Authored: Tue Feb 17 10:21:17 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 10:21:33 2015 -0800

--
 project/SparkBuild.scala|  12 +-
 python/pyspark/sql/context.py   |  28 +--
 .../org/apache/spark/sql/jdbc/JDBCUtils.java|  59 --
 .../scala/org/apache/spark/sql/DataFrame.scala  | 153 +-
 .../org/apache/spark/sql/DataFrameImpl.scala|  33 ++-
 .../apache/spark/sql/ExperimentalMethods.scala  |   5 +
 .../apache/spark/sql/IncomputableColumn.scala   |   4 +
 .../scala/org/apache/spark/sql/SQLContext.scala | 200 +++
 .../apache/spark/sql/UserDefinedFunction.scala  |   3 +-
 .../org/apache/spark/sql/api/package.scala  |  23 +++
 .../apache/spark/sql/execution/commands.scala   |   2 +-
 .../spark/sql/execution/debug/package.scala |  10 +-
 .../spark/sql/jdbc/JavaJDBCTrampoline.scala |  30 ---
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  74 ++-
 .../sql/parquet/ParquetTableOperations.scala|   4 +-
 .../apache/spark/sql/parquet/ParquetTest.scala  |   4 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   6 +-
 .../spark/sql/parquet/timestamp/NanoTime.scala  |   2 +-
 .../org/apache/spark/sql/sources/ddl.scala  |   4 +-
 .../org/apache/spark/sql/jdbc/JavaJDBCTest.java | 102 --
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |   7 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  20 +-
 .../spark/sql/jdbc/MySQLIntegration.scala   |  14 +-
 .../spark/sql/jdbc/PostgresIntegration.scala|   4 +-
 .../execution/DescribeHiveTableCommand.scala|   4 +-
 .../spark/sql/hive/execution/commands.scala |   8 +
 .../spark/sql/hive/execution/package.scala  |  25 +++
 .../org/apache/spark/sql/hive/package.scala |  10 +
 .../sql/hive/parquet/FakeParquetSerDe.scala |  56 --
 .../org/apache/spark/sql/hive/Shim12.scala  |   9 +-
 .../org/apache/spark/sql/hive/Shim13.scala  |   9 +-
 31 files changed, 501 insertions(+), 423 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8fb1239..e4b1b96 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -361,9 +361,16 @@ object Unidoc {
 publish := {},
 
 unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-  inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
catalyst, streamingFlumeSink, yarn),
+  inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
streamingFlumeSink, yarn),
 unidocProjectFilter in(JavaUnidoc, unidoc) :=
-  inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, 
tools, catalyst, streamingFlumeSink, yarn),
+  inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, 
tools, streamingFlumeSink, yarn),
+
+// Skip actual catalyst, but include the subproject.
+// Catalyst is not public API and contains quasiquotes which break 
scaladoc.
+unidocAllSources in (ScalaUnidoc, unidoc) := {
+  (unidocAllSources in (ScalaUnidoc, unidoc)).value
+.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+},
 
 // Skip class names containing $ and some internal packages in Javadocs
 unidocAllSources in (JavaUnidoc, unidoc) := {
@@ -376,6 +383,7 @@ object Unidoc {
 .map(_.filterNot(_.getCanonicalPath.contains("executor")))
 .map(_.filterNot(_.getCanonicalPath.contains("python")))
 .map(_.filterNot(_.getCanonicalPath.contains("collection")))
+.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
 },
 
 // Javadoc options: create a window title, and group key packages on index 
page

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/python/pyspark/sql/context.py
---

spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API

2015-02-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master c74b07fa9 -> d8adefefc


[SPARK-5859] [PySpark] [SQL] fix DataFrame Python API

1. added explain()
2. add isLocal()
3. do not call show() in __repl__
4. add foreach() and foreachPartition()
5. add distinct()
6. fix functions.col()/column()/lit()
7. fix unit tests in sql/functions.py
8. fix unicode in showString()

Author: Davies Liu 

Closes #4645 from davies/df6 and squashes the following commits:

6b46a2c [Davies Liu] fix DataFrame Python API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8adefef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8adefef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8adefef

Branch: refs/heads/master
Commit: d8adefefcc2a4af32295440ed1d4917a6968f017
Parents: c74b07f
Author: Davies Liu 
Authored: Tue Feb 17 10:22:48 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 10:22:48 2015 -0800

--
 python/pyspark/sql/dataframe.py | 65 ++--
 python/pyspark/sql/functions.py | 12 +++
 2 files changed, 59 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 28a59e7..8417240 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -238,6 +238,22 @@ class DataFrame(object):
 """
 print (self._jdf.schema().treeString())
 
+def explain(self, extended=False):
+"""
+Prints the plans (logical and physical) to the console for
+debugging purpose.
+
+If extended is False, only prints the physical plan.
+"""
+self._jdf.explain(extended)
+
+def isLocal(self):
+"""
+Returns True if the `collect` and `take` methods can be run locally
+(without any Spark executors).
+"""
+return self._jdf.isLocal()
+
 def show(self):
 """
 Print the first 20 rows.
@@ -247,14 +263,12 @@ class DataFrame(object):
 2   Alice
 5   Bob
 >>> df
-age name
-2   Alice
-5   Bob
+DataFrame[age: int, name: string]
 """
-print (self)
+print self._jdf.showString().encode('utf8', 'ignore')
 
 def __repr__(self):
-return self._jdf.showString()
+return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
 
 def count(self):
 """Return the number of elements in this RDD.
@@ -336,6 +350,8 @@ class DataFrame(object):
 """
 Return a new RDD by applying a function to each partition.
 
+It's a shorthand for df.rdd.mapPartitions()
+
 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
 >>> def f(iterator): yield 1
 >>> rdd.mapPartitions(f).sum()
@@ -343,6 +359,31 @@ class DataFrame(object):
 """
 return self.rdd.mapPartitions(f, preservesPartitioning)
 
+def foreach(self, f):
+"""
+Applies a function to all rows of this DataFrame.
+
+It's a shorthand for df.rdd.foreach()
+
+>>> def f(person):
+... print person.name
+>>> df.foreach(f)
+"""
+return self.rdd.foreach(f)
+
+def foreachPartition(self, f):
+"""
+Applies a function to each partition of this DataFrame.
+
+It's a shorthand for df.rdd.foreachPartition()
+
+>>> def f(people):
+... for person in people:
+... print person.name
+>>> df.foreachPartition(f)
+"""
+return self.rdd.foreachPartition(f)
+
 def cache(self):
 """ Persist with the default storage level (C{MEMORY_ONLY_SER}).
 """
@@ -377,8 +418,13 @@ class DataFrame(object):
 """ Return a new :class:`DataFrame` that has exactly `numPartitions`
 partitions.
 """
-rdd = self._jdf.repartition(numPartitions, None)
-return DataFrame(rdd, self.sql_ctx)
+return DataFrame(self._jdf.repartition(numPartitions, None), 
self.sql_ctx)
+
+def distinct(self):
+"""
+Return a new :class:`DataFrame` containing the distinct rows in this 
DataFrame.
+"""
+return DataFrame(self._jdf.distinct(), self.sql_ctx)
 
 def sample(self, withReplacement, fraction, seed=None):
 """
@@ -957,10 +1003,7 @@ class Column(DataFrame):
 return Column(jc, self.sql_ctx)
 
 def __repr__(self):
-if self._jdf.isComputable():
-return self._jdf.samples()
-else:
-return 'Column<%s>' % self._jdf.toString()
+return 'Column<%s>' % self._jdf.toString().encode('utf8')
 
  

[2/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

2015-02-17 Thread marmbrus
[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

Author: Michael Armbrust 

Closes #4642 from marmbrus/docs and squashes the following commits:

d291c34 [Michael Armbrust] python tests
9be66e3 [Michael Armbrust] comments
d56afc2 [Michael Armbrust] fix style
f004747 [Michael Armbrust] fix build
c4a907b [Michael Armbrust] fix tests
42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c74b07fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c74b07fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c74b07fa

Branch: refs/heads/master
Commit: c74b07fa94a8da50437d952ae05cf6ac70fbb93e
Parents: c76da36
Author: Michael Armbrust 
Authored: Tue Feb 17 10:21:17 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Feb 17 10:21:17 2015 -0800

--
 project/SparkBuild.scala|  12 +-
 python/pyspark/sql/context.py   |  28 +--
 .../org/apache/spark/sql/jdbc/JDBCUtils.java|  59 --
 .../scala/org/apache/spark/sql/DataFrame.scala  | 153 +-
 .../org/apache/spark/sql/DataFrameImpl.scala|  33 ++-
 .../apache/spark/sql/ExperimentalMethods.scala  |   5 +
 .../apache/spark/sql/IncomputableColumn.scala   |   4 +
 .../scala/org/apache/spark/sql/SQLContext.scala | 200 +++
 .../apache/spark/sql/UserDefinedFunction.scala  |   3 +-
 .../org/apache/spark/sql/api/package.scala  |  23 +++
 .../apache/spark/sql/execution/commands.scala   |   2 +-
 .../spark/sql/execution/debug/package.scala |  10 +-
 .../spark/sql/jdbc/JavaJDBCTrampoline.scala |  30 ---
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  74 ++-
 .../sql/parquet/ParquetTableOperations.scala|   4 +-
 .../apache/spark/sql/parquet/ParquetTest.scala  |   4 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   6 +-
 .../spark/sql/parquet/timestamp/NanoTime.scala  |   2 +-
 .../org/apache/spark/sql/sources/ddl.scala  |   4 +-
 .../org/apache/spark/sql/jdbc/JavaJDBCTest.java | 102 --
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |   7 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  20 +-
 .../spark/sql/jdbc/MySQLIntegration.scala   |  14 +-
 .../spark/sql/jdbc/PostgresIntegration.scala|   4 +-
 .../execution/DescribeHiveTableCommand.scala|   4 +-
 .../spark/sql/hive/execution/commands.scala |   8 +
 .../spark/sql/hive/execution/package.scala  |  25 +++
 .../org/apache/spark/sql/hive/package.scala |  10 +
 .../sql/hive/parquet/FakeParquetSerDe.scala |  56 --
 .../org/apache/spark/sql/hive/Shim12.scala  |   9 +-
 .../org/apache/spark/sql/hive/Shim13.scala  |   9 +-
 31 files changed, 501 insertions(+), 423 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 8fb1239..e4b1b96 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -361,9 +361,16 @@ object Unidoc {
 publish := {},
 
 unidocProjectFilter in(ScalaUnidoc, unidoc) :=
-  inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
catalyst, streamingFlumeSink, yarn),
+  inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, 
streamingFlumeSink, yarn),
 unidocProjectFilter in(JavaUnidoc, unidoc) :=
-  inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, 
tools, catalyst, streamingFlumeSink, yarn),
+  inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, 
tools, streamingFlumeSink, yarn),
+
+// Skip actual catalyst, but include the subproject.
+// Catalyst is not public API and contains quasiquotes which break 
scaladoc.
+unidocAllSources in (ScalaUnidoc, unidoc) := {
+  (unidocAllSources in (ScalaUnidoc, unidoc)).value
+.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
+},
 
 // Skip class names containing $ and some internal packages in Javadocs
 unidocAllSources in (JavaUnidoc, unidoc) := {
@@ -376,6 +383,7 @@ object Unidoc {
 .map(_.filterNot(_.getCanonicalPath.contains("executor")))
 .map(_.filterNot(_.getCanonicalPath.contains("python")))
 .map(_.filterNot(_.getCanonicalPath.contains("collection")))
+.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
 },
 
 // Javadoc options: create a window title, and group key packages on index 
page

http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index dd2cd5e..2e230

spark git commit: [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM

2015-02-17 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 824062912 -> 97cb568a2


[SPARK-5858][MLLIB] Remove unnecessary first() call in GLM

`numFeatures` is only used by multinomial logistic regression. Calling 
`.first()` for every GLM causes performance regression, especially in Python.

Author: Xiangrui Meng 

Closes #4647 from mengxr/SPARK-5858 and squashes the following commits:

036dc7f [Xiangrui Meng] remove unnecessary first() call
12c5548 [Xiangrui Meng] check numFeatures only once

(cherry picked from commit c76da36c2163276b5c34e59fbb139eeb34ed0faa)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97cb568a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97cb568a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97cb568a

Branch: refs/heads/branch-1.3
Commit: 97cb568a219dfc6bcae1d8813cc552b11b7ba414
Parents: 8240629
Author: Xiangrui Meng 
Authored: Tue Feb 17 10:17:45 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Feb 17 10:17:50 2015 -0800

--
 .../spark/mllib/classification/LogisticRegression.scala   | 6 +-
 .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala   | 7 ---
 2 files changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/97cb568a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 420d6e2..b787667 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -355,6 +355,10 @@ class LogisticRegressionWithLBFGS
   }
 
   override protected def createModel(weights: Vector, intercept: Double) = {
-new LogisticRegressionModel(weights, intercept, numFeatures, 
numOfLinearPredictor + 1)
+if (numOfLinearPredictor == 1) {
+  new LogisticRegressionModel(weights, intercept)
+} else {
+  new LogisticRegressionModel(weights, intercept, numFeatures, 
numOfLinearPredictor + 1)
+}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/97cb568a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 2b71453..7c66e8c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
   /**
* The dimension of training features.
*/
-  protected var numFeatures: Int = 0
+  protected var numFeatures: Int = -1
 
   /**
* Set if the algorithm should use feature scaling to improve the 
convergence during optimization.
@@ -163,7 +163,9 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
* RDD of LabeledPoint entries.
*/
   def run(input: RDD[LabeledPoint]): M = {
-numFeatures = input.first().features.size
+if (numFeatures < 0) {
+  numFeatures = input.map(_.features.size).first()
+}
 
 /**
  * When `numOfLinearPredictor > 1`, the intercepts are encapsulated into 
weights,
@@ -193,7 +195,6 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
* of LabeledPoint entries starting from the initial weights provided.
*/
   def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
-numFeatures = input.first().features.size
 
 if (input.getStorageLevel == StorageLevel.NONE) {
   logWarning("The input data is not directly cached, which may hurt 
performance if its"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM

2015-02-17 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master 3ce46e94f -> c76da36c2


[SPARK-5858][MLLIB] Remove unnecessary first() call in GLM

`numFeatures` is only used by multinomial logistic regression. Calling 
`.first()` for every GLM causes performance regression, especially in Python.

Author: Xiangrui Meng 

Closes #4647 from mengxr/SPARK-5858 and squashes the following commits:

036dc7f [Xiangrui Meng] remove unnecessary first() call
12c5548 [Xiangrui Meng] check numFeatures only once


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c76da36c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c76da36c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c76da36c

Branch: refs/heads/master
Commit: c76da36c2163276b5c34e59fbb139eeb34ed0faa
Parents: 3ce46e9
Author: Xiangrui Meng 
Authored: Tue Feb 17 10:17:45 2015 -0800
Committer: Xiangrui Meng 
Committed: Tue Feb 17 10:17:45 2015 -0800

--
 .../spark/mllib/classification/LogisticRegression.scala   | 6 +-
 .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala   | 7 ---
 2 files changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c76da36c/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 420d6e2..b787667 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -355,6 +355,10 @@ class LogisticRegressionWithLBFGS
   }
 
   override protected def createModel(weights: Vector, intercept: Double) = {
-new LogisticRegressionModel(weights, intercept, numFeatures, 
numOfLinearPredictor + 1)
+if (numOfLinearPredictor == 1) {
+  new LogisticRegressionModel(weights, intercept)
+} else {
+  new LogisticRegressionModel(weights, intercept, numFeatures, 
numOfLinearPredictor + 1)
+}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c76da36c/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 2b71453..7c66e8c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
   /**
* The dimension of training features.
*/
-  protected var numFeatures: Int = 0
+  protected var numFeatures: Int = -1
 
   /**
* Set if the algorithm should use feature scaling to improve the 
convergence during optimization.
@@ -163,7 +163,9 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
* RDD of LabeledPoint entries.
*/
   def run(input: RDD[LabeledPoint]): M = {
-numFeatures = input.first().features.size
+if (numFeatures < 0) {
+  numFeatures = input.map(_.features.size).first()
+}
 
 /**
  * When `numOfLinearPredictor > 1`, the intercepts are encapsulated into 
weights,
@@ -193,7 +195,6 @@ abstract class GeneralizedLinearAlgorithm[M <: 
GeneralizedLinearModel]
* of LabeledPoint entries starting from the initial weights provided.
*/
   def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
-numFeatures = input.first().features.size
 
 if (input.getStorageLevel == StorageLevel.NONE) {
   logWarning("The input data is not directly cached, which may hurt 
performance if its"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5856: In Maven build script, launch Zinc with more memory

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 aeb85cdee -> 824062912


SPARK-5856: In Maven build script, launch Zinc with more memory

I've seen out of memory exceptions when trying
to run many parallel builds against the same Zinc
server during packaging. We should use the same
increased memory settings we use for Maven itself.

I tested this and confirmed that the Nailgun JVM
launched with higher memory.

Author: Patrick Wendell 

Closes #4643 from pwendell/zinc-memory and squashes the following commits:

717cfb0 [Patrick Wendell] SPARK-5856: Launch Zinc with larger memory options.

(cherry picked from commit 3ce46e94fe77d15f18e916b76b37fa96356ace93)
Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82406291
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82406291
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82406291

Branch: refs/heads/branch-1.3
Commit: 82406291290b5e1a6506a01bceba396aca78363e
Parents: aeb85cd
Author: Patrick Wendell 
Authored: Tue Feb 17 10:10:01 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 10:10:08 2015 -0800

--
 build/mvn | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82406291/build/mvn
--
diff --git a/build/mvn b/build/mvn
index 53babf5..3561110 100755
--- a/build/mvn
+++ b/build/mvn
@@ -21,6 +21,8 @@
 _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 # Preserve the calling directory
 _CALLING_DIR="$(pwd)"
+# Options used during compilation
+_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
 
 # Installs any application tarball given a URL, the expected tarball name,
 # and, optionally, a checkable binary path to determine if the binary has
@@ -136,6 +138,7 @@ cd "${_CALLING_DIR}"
 # Now that zinc is ensured to be installed, check its status and, if its
 # not running or just installed, start it
 if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
+  export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
   ${ZINC_BIN} -shutdown
   ${ZINC_BIN} -start -port ${ZINC_PORT} \
 -scala-compiler "${SCALA_COMPILER}" \
@@ -143,7 +146,7 @@ if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} 
-status`" ]; then
 fi
 
 # Set any `mvn` options if not already present
-export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M 
-XX:ReservedCodeCacheSize=512m"}
+export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
 
 # Last, call the `mvn` command as usual
 ${MVN_BIN} "$@"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: SPARK-5856: In Maven build script, launch Zinc with more memory

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master ee6e3eff0 -> 3ce46e94f


SPARK-5856: In Maven build script, launch Zinc with more memory

I've seen out of memory exceptions when trying
to run many parallel builds against the same Zinc
server during packaging. We should use the same
increased memory settings we use for Maven itself.

I tested this and confirmed that the Nailgun JVM
launched with higher memory.

Author: Patrick Wendell 

Closes #4643 from pwendell/zinc-memory and squashes the following commits:

717cfb0 [Patrick Wendell] SPARK-5856: Launch Zinc with larger memory options.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ce46e94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ce46e94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ce46e94

Branch: refs/heads/master
Commit: 3ce46e94fe77d15f18e916b76b37fa96356ace93
Parents: ee6e3ef
Author: Patrick Wendell 
Authored: Tue Feb 17 10:10:01 2015 -0800
Committer: Patrick Wendell 
Committed: Tue Feb 17 10:10:01 2015 -0800

--
 build/mvn | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ce46e94/build/mvn
--
diff --git a/build/mvn b/build/mvn
index 53babf5..3561110 100755
--- a/build/mvn
+++ b/build/mvn
@@ -21,6 +21,8 @@
 _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 # Preserve the calling directory
 _CALLING_DIR="$(pwd)"
+# Options used during compilation
+_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
 
 # Installs any application tarball given a URL, the expected tarball name,
 # and, optionally, a checkable binary path to determine if the binary has
@@ -136,6 +138,7 @@ cd "${_CALLING_DIR}"
 # Now that zinc is ensured to be installed, check its status and, if its
 # not running or just installed, start it
 if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then
+  export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
   ${ZINC_BIN} -shutdown
   ${ZINC_BIN} -start -port ${ZINC_PORT} \
 -scala-compiler "${SCALA_COMPILER}" \
@@ -143,7 +146,7 @@ if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} 
-status`" ]; then
 fi
 
 # Set any `mvn` options if not already present
-export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M 
-XX:ReservedCodeCacheSize=512m"}
+export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}
 
 # Last, call the `mvn` command as usual
 ${MVN_BIN} "$@"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 b8da5c390 -> aeb85cdee


Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and 
c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aeb85cde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aeb85cde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aeb85cde

Branch: refs/heads/branch-1.3
Commit: aeb85cdeea39ace5a41e5196663d5aa3ebf1517f
Parents: b8da5c3
Author: Josh Rosen 
Authored: Tue Feb 17 07:48:27 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 07:51:05 2015 -0800

--
 .../org/apache/spark/api/python/PythonRDD.scala | 21 
 python/pyspark/worker.py|  1 -
 2 files changed, 4 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aeb85cde/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e94c390..2527211 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,24 +144,11 @@ private[spark] class PythonRDD(
 stream.readFully(update)
 accumulator += Collections.singletonList(update)
   }
-
   // Check whether the worker is ready to be re-used.
-  if (reuse_worker) {
-// It has a high possibility that the ending mark is already 
available,
-// And current task should not be blocked by checking it
-
-if (stream.available() >= 4) {
-  val ending = stream.readInt()
-  if (ending == SpecialLengths.END_OF_STREAM) {
-env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-released = true
-logInfo(s"Communication with worker ended cleanly, re-use 
it: $worker")
-  } else {
-logInfo(s"Communication with worker did not end cleanly " +
-  s"(ending with $ending), close it: $worker")
-  }
-} else {
-  logInfo(s"The ending mark from worker is not available, 
close it: $worker")
+  if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+if (reuse_worker) {
+  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+  released = true
 }
   }
   null

http://git-wip-us.apache.org/repos/asf/spark/blob/aeb85cde/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 180bdbb..8a93c32 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -121,7 +121,6 @@ def main(infile, outfile):
 write_int(len(_accumulatorRegistry), outfile)
 for (aid, accum) in _accumulatorRegistry.items():
 pickleSer._write_with_length((aid, accum._value), outfile)
-outfile.flush()
 
 # check end of stream
 if read_int(infile) == SpecialLengths.END_OF_STREAM:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 432ceca2a -> 6be36d5a8


Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and 
c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be36d5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be36d5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be36d5a

Branch: refs/heads/branch-1.2
Commit: 6be36d5a88c172b446cc69ebde6176e606cf09f1
Parents: 432ceca
Author: Josh Rosen 
Authored: Tue Feb 17 07:48:27 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 07:51:37 2015 -0800

--
 .../org/apache/spark/api/python/PythonRDD.scala | 21 
 python/pyspark/worker.py|  1 -
 2 files changed, 4 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b513fb8..0d508d6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -145,24 +145,11 @@ private[spark] class PythonRDD(
 stream.readFully(update)
 accumulator += Collections.singletonList(update)
   }
-
   // Check whether the worker is ready to be re-used.
-  if (reuse_worker) {
-// It has a high possibility that the ending mark is already 
available,
-// And current task should not be blocked by checking it
-
-if (stream.available() >= 4) {
-  val ending = stream.readInt()
-  if (ending == SpecialLengths.END_OF_STREAM) {
-env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-released = true
-logInfo(s"Communication with worker ended cleanly, re-use 
it: $worker")
-  } else {
-logInfo(s"Communication with worker did not end cleanly " +
-  s"(ending with $ending), close it: $worker")
-  }
-} else {
-  logInfo(s"The ending mark from worker is not available, 
close it: $worker")
+  if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+if (reuse_worker) {
+  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+  released = true
 }
   }
   null

http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c2ddd4d..7e5343c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -127,7 +127,6 @@ def main(infile, outfile):
 write_int(len(_accumulatorRegistry), outfile)
 for (aid, accum) in _accumulatorRegistry.items():
 pickleSer._write_with_length((aid, accum._value), outfile)
-outfile.flush()
 
 # check end of stream
 if read_int(infile) == SpecialLengths.END_OF_STREAM:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

2015-02-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master a65766bf0 -> ee6e3eff0


Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and 
c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6e3eff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6e3eff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6e3eff

Branch: refs/heads/master
Commit: ee6e3eff02e9e08b1113ba6faf3397d7e7775087
Parents: a65766b
Author: Josh Rosen 
Authored: Tue Feb 17 07:48:27 2015 -0800
Committer: Josh Rosen 
Committed: Tue Feb 17 07:49:02 2015 -0800

--
 .../org/apache/spark/api/python/PythonRDD.scala | 21 
 python/pyspark/worker.py|  1 -
 2 files changed, 4 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e3eff/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index e94c390..2527211 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,24 +144,11 @@ private[spark] class PythonRDD(
 stream.readFully(update)
 accumulator += Collections.singletonList(update)
   }
-
   // Check whether the worker is ready to be re-used.
-  if (reuse_worker) {
-// It has a high possibility that the ending mark is already 
available,
-// And current task should not be blocked by checking it
-
-if (stream.available() >= 4) {
-  val ending = stream.readInt()
-  if (ending == SpecialLengths.END_OF_STREAM) {
-env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-released = true
-logInfo(s"Communication with worker ended cleanly, re-use 
it: $worker")
-  } else {
-logInfo(s"Communication with worker did not end cleanly " +
-  s"(ending with $ending), close it: $worker")
-  }
-} else {
-  logInfo(s"The ending mark from worker is not available, 
close it: $worker")
+  if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+if (reuse_worker) {
+  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+  released = true
 }
   }
   null

http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e3eff/python/pyspark/worker.py
--
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 180bdbb..8a93c32 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -121,7 +121,6 @@ def main(infile, outfile):
 write_int(len(_accumulatorRegistry), outfile)
 for (aid, accum) in _accumulatorRegistry.items():
 pickleSer._write_with_length((aid, accum._value), outfile)
-outfile.flush()
 
 # check end of stream
 if read_int(infile) == SpecialLengths.END_OF_STREAM:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5826][Streaming] Fix Configuration not serializable problem

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e9241fa70 -> b8da5c390


[SPARK-5826][Streaming] Fix Configuration not serializable problem

Author: jerryshao 

Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits:

7ec71db [jerryshao] Remove transient for conf statement
88d84e6 [jerryshao] Fix Configuration not serializable problem

(cherry picked from commit a65766bf0244a41b793b9dc5fbdd2882664ad00e)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8da5c39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8da5c39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8da5c39

Branch: refs/heads/branch-1.3
Commit: b8da5c390b7272ded5476d4531dcd757c5431fab
Parents: e9241fa
Author: jerryshao 
Authored: Tue Feb 17 10:45:18 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 10:45:33 2015 +

--
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala  | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b8da5c39/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6379b88..4f7db41 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
+import org.apache.spark.SerializableWritable
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.util.{TimeStampedHashMap, Utils}
@@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
+  private val serializableConfOpt = conf.map(new SerializableWritable(_))
+
   // This is a def so that it works during checkpoint recovery:
   private def clock = ssc.scheduler.clock
 
@@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = files.map(file =>{
-  val rdd = conf match {
+  val rdd = serializableConfOpt.map(_.value) match {
 case Some(config) => context.sparkContext.newAPIHadoopFile(
   file,
   fm.runtimeClass.asInstanceOf[Class[F]],


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5826][Streaming] Fix Configuration not serializable problem

2015-02-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master c06e42f2c -> a65766bf0


[SPARK-5826][Streaming] Fix Configuration not serializable problem

Author: jerryshao 

Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits:

7ec71db [jerryshao] Remove transient for conf statement
88d84e6 [jerryshao] Fix Configuration not serializable problem


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a65766bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a65766bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a65766bf

Branch: refs/heads/master
Commit: a65766bf0244a41b793b9dc5fbdd2882664ad00e
Parents: c06e42f
Author: jerryshao 
Authored: Tue Feb 17 10:45:18 2015 +
Committer: Sean Owen 
Committed: Tue Feb 17 10:45:18 2015 +

--
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala  | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a65766bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6379b88..4f7db41 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.streaming.dstream
 
 import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
@@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
+import org.apache.spark.SerializableWritable
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.util.{TimeStampedHashMap, Utils}
@@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
 (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
   extends InputDStream[(K, V)](ssc_) {
 
+  private val serializableConfOpt = conf.map(new SerializableWritable(_))
+
   // This is a def so that it works during checkpoint recovery:
   private def clock = ssc.scheduler.clock
 
@@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
 val fileRDDs = files.map(file =>{
-  val rdd = conf match {
+  val rdd = serializableConfOpt.map(_.value) match {
 case Some(config) => context.sparkContext.newAPIHadoopFile(
   file,
   fm.runtimeClass.asInstanceOf[Class[F]],


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org