[1/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
Repository: spark Updated Branches: refs/heads/branch-1.3 f8f9a64eb -> 2bd33ce62 http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 245161d..cb405f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest { | getStruct(1).f5 FROM src LIMIT 1 """.stripMargin).head() === Row(1, 2, 3, 4, 5)) } - + test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") { checkAnswer( sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"), @@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest { test("SPARK-2693 udaf aggregates test") { checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"), sql("SELECT max(key) FROM src").collect().toSeq) - + checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"), sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) } @@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest { test("Generic UDAF aggregates") { checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.9)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) - + checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"), sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq) } - + test("UDFIntegerToString") { val testData = TestHive.sparkContext.parallelize( - IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF + IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() testData.registerTempTable("integerTable") sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'") @@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: - ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF + ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() testData.registerTempTable("listListIntTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") @@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest { test("UDFListString") { val testData = TestHive.sparkContext.parallelize( ListStringCaseClass(Seq("a", "b", "c")) :: - ListStringCaseClass(Seq("d", "e")) :: Nil).toDF + ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() testData.registerTempTable("listStringTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") @@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest { test("UDFStringString") { val testData = TestHive.sparkContext.parallelize( - StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF + StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() testData.registerTempTable("stringTable") sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'") @@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest { ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: - Nil).toDF + Nil).toDF() testData.registerTempTable("TwoListTable") sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
Repository: spark Updated Branches: refs/heads/master 3912d3324 -> 61ab08549 http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index 245161d..cb405f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -62,7 +62,7 @@ class HiveUdfSuite extends QueryTest { | getStruct(1).f5 FROM src LIMIT 1 """.stripMargin).head() === Row(1, 2, 3, 4, 5)) } - + test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") { checkAnswer( sql("SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1"), @@ -96,7 +96,7 @@ class HiveUdfSuite extends QueryTest { test("SPARK-2693 udaf aggregates test") { checkAnswer(sql("SELECT percentile(key, 1) FROM src LIMIT 1"), sql("SELECT max(key) FROM src").collect().toSeq) - + checkAnswer(sql("SELECT percentile(key, array(1, 1)) FROM src LIMIT 1"), sql("SELECT array(max(key), max(key)) FROM src").collect().toSeq) } @@ -104,14 +104,14 @@ class HiveUdfSuite extends QueryTest { test("Generic UDAF aggregates") { checkAnswer(sql("SELECT ceiling(percentile_approx(key, 0.9)) FROM src LIMIT 1"), sql("SELECT max(key) FROM src LIMIT 1").collect().toSeq) - + checkAnswer(sql("SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1"), sql("SELECT array(100, 100) FROM src LIMIT 1").collect().toSeq) } - + test("UDFIntegerToString") { val testData = TestHive.sparkContext.parallelize( - IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF + IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() testData.registerTempTable("integerTable") sql(s"CREATE TEMPORARY FUNCTION testUDFIntegerToString AS '${classOf[UDFIntegerToString].getName}'") @@ -127,7 +127,7 @@ class HiveUdfSuite extends QueryTest { val testData = TestHive.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: - ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF + ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() testData.registerTempTable("listListIntTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListListInt AS '${classOf[UDFListListInt].getName}'") @@ -142,7 +142,7 @@ class HiveUdfSuite extends QueryTest { test("UDFListString") { val testData = TestHive.sparkContext.parallelize( ListStringCaseClass(Seq("a", "b", "c")) :: - ListStringCaseClass(Seq("d", "e")) :: Nil).toDF + ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() testData.registerTempTable("listStringTable") sql(s"CREATE TEMPORARY FUNCTION testUDFListString AS '${classOf[UDFListString].getName}'") @@ -156,7 +156,7 @@ class HiveUdfSuite extends QueryTest { test("UDFStringString") { val testData = TestHive.sparkContext.parallelize( - StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF + StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() testData.registerTempTable("stringTable") sql(s"CREATE TEMPORARY FUNCTION testStringStringUdf AS '${classOf[UDFStringString].getName}'") @@ -173,7 +173,7 @@ class HiveUdfSuite extends QueryTest { ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: - Nil).toDF + Nil).toDF() testData.registerTempTable("TwoListTable") sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670) Author: Cheng Lian Closes #4670 from liancheng/df-cleanup and squashes the following commits: 3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls (cherry picked from commit 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bd33ce6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bd33ce6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bd33ce6 Branch: refs/heads/branch-1.3 Commit: 2bd33ce62b4c43be4dbac6b46f979c36a0e9aba8 Parents: f8f9a64 Author: Cheng Lian Authored: Tue Feb 17 23:36:20 2015 -0800 Committer: Reynold Xin Committed: Tue Feb 17 23:36:30 2015 -0800 -- .../examples/ml/CrossValidatorExample.scala | 2 +- .../spark/examples/ml/DeveloperApiExample.scala | 4 +- .../apache/spark/examples/ml/MovieLensALS.scala | 6 +-- .../spark/examples/ml/SimpleParamsExample.scala | 6 +-- .../ml/SimpleTextClassificationPipeline.scala | 4 +- .../spark/examples/mllib/DatasetExample.scala | 2 +- .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../spark/mllib/classification/NaiveBayes.scala | 2 +- .../impl/GLMClassificationModel.scala | 2 +- .../regression/impl/GLMRegressionModel.scala| 2 +- .../mllib/tree/model/DecisionTreeModel.scala| 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../spark/ml/recommendation/ALSSuite.scala | 4 +- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../apache/spark/sql/parquet/ParquetTest.scala | 6 +-- .../org/apache/spark/sql/CachedTableSuite.scala | 14 +++--- .../org/apache/spark/sql/DataFrameSuite.scala | 26 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 8 +-- .../scala/org/apache/spark/sql/QueryTest.scala | 46 - .../org/apache/spark/sql/SQLQuerySuite.scala| 6 +-- .../sql/ScalaReflectionRelationSuite.scala | 10 ++-- .../scala/org/apache/spark/sql/TestData.scala | 48 ++ .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 46 - .../spark/sql/jdbc/MySQLIntegration.scala | 53 +--- .../spark/sql/jdbc/PostgresIntegration.scala| 30 ++- .../spark/sql/parquet/ParquetFilterSuite.scala | 40 +++ .../spark/sql/parquet/ParquetIOSuite.scala | 28 +-- .../spark/sql/parquet/ParquetQuerySuite.scala | 2 +- .../spark/sql/parquet/ParquetSchemaSuite.scala | 4 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 8 +-- .../apache/spark/sql/hive/StatisticsSuite.scala | 38 +++--- .../sql/hive/execution/HiveQuerySuite.scala | 20 .../hive/execution/HiveResolutionSuite.scala| 6 +-- .../spark/sql/hive/execution/HiveUdfSuite.scala | 18 +++ 37 files changed, 250 insertions(+), 259 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index f024194..7ab892c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -90,7 +90,7 @@ object CrossValidatorExample { crossval.setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. -val cvModel = crossval.fit(training.toDF) +val cvModel = crossval.fit(training.toDF()) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/2bd33ce6/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index 54aadd2..df26798 100644 ---
[2/2] spark git commit: [Minor] [SQL] Cleans up DataFrame variable names and toDF() calls
[Minor] [SQL] Cleans up DataFrame variable names and toDF() calls Although we've migrated to the DataFrame API, lots of code still uses `rdd` or `srdd` as local variable names. This PR tries to address these naming inconsistencies and some other minor DataFrame related style issues. [https://reviewable.io/review_button.png"; height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4670) Author: Cheng Lian Closes #4670 from liancheng/df-cleanup and squashes the following commits: 3e14448 [Cheng Lian] Cleans up DataFrame variable names and toDF() calls Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61ab0854 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61ab0854 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61ab0854 Branch: refs/heads/master Commit: 61ab08549cb6fceb6de1b5c490c55a89d4bd28fa Parents: 3912d33 Author: Cheng Lian Authored: Tue Feb 17 23:36:20 2015 -0800 Committer: Reynold Xin Committed: Tue Feb 17 23:36:20 2015 -0800 -- .../examples/ml/CrossValidatorExample.scala | 2 +- .../spark/examples/ml/DeveloperApiExample.scala | 4 +- .../apache/spark/examples/ml/MovieLensALS.scala | 6 +-- .../spark/examples/ml/SimpleParamsExample.scala | 6 +-- .../ml/SimpleTextClassificationPipeline.scala | 4 +- .../spark/examples/mllib/DatasetExample.scala | 2 +- .../apache/spark/examples/sql/RDDRelation.scala | 2 +- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../spark/mllib/classification/NaiveBayes.scala | 2 +- .../impl/GLMClassificationModel.scala | 2 +- .../regression/impl/GLMRegressionModel.scala| 2 +- .../mllib/tree/model/DecisionTreeModel.scala| 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../spark/ml/recommendation/ALSSuite.scala | 4 +- .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../apache/spark/sql/parquet/ParquetTest.scala | 6 +-- .../org/apache/spark/sql/CachedTableSuite.scala | 14 +++--- .../org/apache/spark/sql/DataFrameSuite.scala | 26 +- .../scala/org/apache/spark/sql/JoinSuite.scala | 8 +-- .../scala/org/apache/spark/sql/QueryTest.scala | 46 - .../org/apache/spark/sql/SQLQuerySuite.scala| 6 +-- .../sql/ScalaReflectionRelationSuite.scala | 10 ++-- .../scala/org/apache/spark/sql/TestData.scala | 48 ++ .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 46 - .../spark/sql/jdbc/MySQLIntegration.scala | 53 +--- .../spark/sql/jdbc/PostgresIntegration.scala| 30 ++- .../spark/sql/parquet/ParquetFilterSuite.scala | 40 +++ .../spark/sql/parquet/ParquetIOSuite.scala | 28 +-- .../spark/sql/parquet/ParquetQuerySuite.scala | 2 +- .../spark/sql/parquet/ParquetSchemaSuite.scala | 4 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala| 8 +-- .../apache/spark/sql/hive/StatisticsSuite.scala | 38 +++--- .../sql/hive/execution/HiveQuerySuite.scala | 20 .../hive/execution/HiveResolutionSuite.scala| 6 +-- .../spark/sql/hive/execution/HiveUdfSuite.scala | 18 +++ 37 files changed, 250 insertions(+), 259 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index f024194..7ab892c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -90,7 +90,7 @@ object CrossValidatorExample { crossval.setNumFolds(2) // Use 3+ in practice // Run cross-validation, and choose the best set of parameters. -val cvModel = crossval.fit(training.toDF) +val cvModel = crossval.fit(training.toDF()) // Prepare test documents, which are unlabeled. val test = sc.parallelize(Seq( http://git-wip-us.apache.org/repos/asf/spark/blob/61ab0854/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index 54aadd2..df26798 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala +++ b/examples/src/
spark git commit: [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite
Repository: spark Updated Branches: refs/heads/branch-1.3 6e82c46bf -> f8f9a64eb [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness. PS: This was caused by my refactoring in https://github.com/apache/spark/pull/4384/ koeninger check it out. Author: Tathagata Das Closes #4597 from tdas/kafka-flaky-test and squashes the following commits: d236235 [Tathagata Das] Unignored last test. e9a1820 [Tathagata Das] fix test (cherry picked from commit 3912d332464dcd124c60b734724c34d9742466a4) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8f9a64e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8f9a64e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8f9a64e Branch: refs/heads/branch-1.3 Commit: f8f9a64ebdfe581d62773a6276f66c75d4ba43e1 Parents: 6e82c46 Author: Tathagata Das Authored: Tue Feb 17 22:44:16 2015 -0800 Committer: Tathagata Das Committed: Tue Feb 17 22:44:27 2015 -0800 -- .../kafka/DirectKafkaStreamSuite.scala | 28 +++- 1 file changed, 16 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f8f9a64e/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9260944..17ca9d1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka import java.io.File import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata class DirectKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with BeforeAndAfterAll with Eventually { @@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } - ignore("basic stream receiving with multiple topics and smallest starting offset") { + test("basic stream receiving with multiple topics and smallest starting offset") { val topics = Set("basic1", "basic2", "basic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => createTopic(t) sendMessages(t, data) } +val totalSent = data.values.sum * topics.size val kafkaParams = Map( "metadata.broker.list" -> s"$brokerAddress", "auto.offset.reset" -> "smallest" @@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics) } -var total = 0L + +val allReceived = new ArrayBuffer[(String, String)] stream.foreachRDD { rdd => // Get the offset ranges in the RDD @@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase collected.foreach { case (partSize, rangeSize) => assert(partSize === rangeSize, "offset ranges are wrong") } - total += collected.size // Add up all the collected items } +stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() eventually(timeout(2.milliseconds), interval(200.milliseconds)) { - assert(total === data.values.sum * topics.size, "didn't get all messages") + assert(allReceived.size === totalSent, +"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) } ssc.stop() } - ignore("receiving from largest starting offset") {
spark git commit: [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite
Repository: spark Updated Branches: refs/heads/master e50934f11 -> 3912d3324 [SPARK-5731][Streaming][Test] Fix incorrect test in DirectKafkaStreamSuite The test was incorrect. Instead of counting the number of records, it counted the number of partitions of RDD generated by DStream. Which is not its intention. I will be testing this patch multiple times to understand its flakiness. PS: This was caused by my refactoring in https://github.com/apache/spark/pull/4384/ koeninger check it out. Author: Tathagata Das Closes #4597 from tdas/kafka-flaky-test and squashes the following commits: d236235 [Tathagata Das] Unignored last test. e9a1820 [Tathagata Das] fix test Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3912d332 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3912d332 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3912d332 Branch: refs/heads/master Commit: 3912d332464dcd124c60b734724c34d9742466a4 Parents: e50934f Author: Tathagata Das Authored: Tue Feb 17 22:44:16 2015 -0800 Committer: Tathagata Das Committed: Tue Feb 17 22:44:16 2015 -0800 -- .../kafka/DirectKafkaStreamSuite.scala | 28 +++- 1 file changed, 16 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3912d332/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9260944..17ca9d1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka import java.io.File import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps +import kafka.common.TopicAndPartition +import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.{DStream, InputDStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata class DirectKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with BeforeAndAfterAll with Eventually { @@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase } - ignore("basic stream receiving with multiple topics and smallest starting offset") { + test("basic stream receiving with multiple topics and smallest starting offset") { val topics = Set("basic1", "basic2", "basic3") val data = Map("a" -> 7, "b" -> 9) topics.foreach { t => createTopic(t) sendMessages(t, data) } +val totalSent = data.values.sum * topics.size val kafkaParams = Map( "metadata.broker.list" -> s"$brokerAddress", "auto.offset.reset" -> "smallest" @@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics) } -var total = 0L + +val allReceived = new ArrayBuffer[(String, String)] stream.foreachRDD { rdd => // Get the offset ranges in the RDD @@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase collected.foreach { case (partSize, rangeSize) => assert(partSize === rangeSize, "offset ranges are wrong") } - total += collected.size // Add up all the collected items } +stream.foreachRDD { rdd => allReceived ++= rdd.collect() } ssc.start() eventually(timeout(2.milliseconds), interval(200.milliseconds)) { - assert(total === data.values.sum * topics.size, "didn't get all messages") + assert(allReceived.size === totalSent, +"didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) } ssc.stop() } - ignore("receiving from largest starting offset") { + test("receiving from largest starting offset") { val topic = "largest" val topicPartition = Top
spark git commit: [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.
Repository: spark Updated Branches: refs/heads/master d5f12bfe8 -> e50934f11 [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements. JIRA: https://issues.apache.org/jira/browse/SPARK-5723 Author: Yin Huai This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following commits: a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into defaultCTASFileFormat ad2b07d [Yin Huai] Update tests and error messages. 8af5b2a [Yin Huai] Update conf key and unit test. 5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when no storage format/handler is specified. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e50934f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e50934f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e50934f1 Branch: refs/heads/master Commit: e50934f11e1e3ded21a631e5ab69db3c79467137 Parents: d5f12bf Author: Yin Huai Authored: Tue Feb 17 18:14:33 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 18:14:33 2015 -0800 -- .../org/apache/spark/sql/hive/HiveContext.scala | 15 .../spark/sql/hive/HiveMetastoreCatalog.scala | 75 .../spark/sql/hive/execution/commands.scala | 17 +++-- .../sql/hive/MetastoreDataSourcesSuite.scala| 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 70 ++ 5 files changed, 158 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6c55bc6..d3365b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" + /** + * When true, a table created by a Hive CTAS statement (no USING clause) will be + * converted to a data source table, using the data source set by spark.sql.sources.default. + * The table in CTAS statement will be converted when it meets any of the following conditions: + * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or + * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * is either TextFile or SequenceFile. + * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe + * is specified (no ROW FORMAT SERDE clause). + * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format + * and no SerDe is specified (no ROW FORMAT SERDE clause). + */ + protected[sql] def convertCTAS: Boolean = +getConf("spark.sql.hive.convertCTAS", "false").toBoolean + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) http://git-wip-us.apache.org/repos/asf/spark/blob/e50934f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index cfd6f27..f7ad2ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec} -import org.apache.spark.sql.sources.{DDLParser, LogicalRelation, ResolvedData
spark git commit: [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements.
Repository: spark Updated Branches: refs/heads/branch-1.3 2ab0ba04f -> 6e82c46bf [SPARK-5723][SQL]Change the default file format to Parquet for CTAS statements. JIRA: https://issues.apache.org/jira/browse/SPARK-5723 Author: Yin Huai This patch had conflicts when merged, resolved by Committer: Michael Armbrust Closes #4639 from yhuai/defaultCTASFileFormat and squashes the following commits: a568137 [Yin Huai] Merge remote-tracking branch 'upstream/master' into defaultCTASFileFormat ad2b07d [Yin Huai] Update tests and error messages. 8af5b2a [Yin Huai] Update conf key and unit test. 5a67903 [Yin Huai] Use data source write path for Hive's CTAS statements when no storage format/handler is specified. (cherry picked from commit e50934f11e1e3ded21a631e5ab69db3c79467137) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e82c46b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e82c46b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e82c46b Branch: refs/heads/branch-1.3 Commit: 6e82c46bf6cdfd227b5680f114e029219204e641 Parents: 2ab0ba0 Author: Yin Huai Authored: Tue Feb 17 18:14:33 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 18:14:45 2015 -0800 -- .../org/apache/spark/sql/hive/HiveContext.scala | 15 .../spark/sql/hive/HiveMetastoreCatalog.scala | 75 .../spark/sql/hive/execution/commands.scala | 17 +++-- .../sql/hive/MetastoreDataSourcesSuite.scala| 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 70 ++ 5 files changed, 158 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e82c46b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6c55bc6..d3365b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,6 +61,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertMetastoreParquet: Boolean = getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" + /** + * When true, a table created by a Hive CTAS statement (no USING clause) will be + * converted to a data source table, using the data source set by spark.sql.sources.default. + * The table in CTAS statement will be converted when it meets any of the following conditions: + * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or + * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * is either TextFile or SequenceFile. + * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe + * is specified (no ROW FORMAT SERDE clause). + * - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format + * and no SerDe is specified (no ROW FORMAT SERDE clause). + */ + protected[sql] def convertCTAS: Boolean = +getConf("spark.sql.hive.convertCTAS", "false").toBoolean + override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) http://git-wip-us.apache.org/repos/asf/spark/blob/6e82c46b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index cfd6f27..f7ad2ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.util.ReflectionUtils import org.apache.spark.Logging -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => P
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.3.0-rc1 [created] f97b0d4a6 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing Spark release v1.3.0-rc1
Preparing Spark release v1.3.0-rc1 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f97b0d4a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f97b0d4a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f97b0d4a Branch: refs/heads/branch-1.3 Commit: f97b0d4a6b26504916816d7aefcf3132cd1da6c2 Parents: e8284b2 Author: Patrick Wendell Authored: Wed Feb 18 01:52:06 2015 + Committer: Patrick Wendell Committed: Wed Feb 18 01:52:06 2015 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml| 2 +- extras/kinesis-asl/pom.xml| 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml| 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 4122e24..523e2e8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 510e926..4f73cf7 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index c993781..5612149 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 8caad2b..f7d6030 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 0706f1e..45aa775 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 1f26813..455304f 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index 503fc12..bcae38d 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/f97b0d4a/external/kafka/pom.xml -- diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index af96138..74ddc10 100644 ---
[1/2] spark git commit: Preparing development version 1.3.1-SNAPSHOT
Repository: spark Updated Branches: refs/heads/branch-1.3 e8284b29d -> 2ab0ba04f Preparing development version 1.3.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ab0ba04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ab0ba04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ab0ba04 Branch: refs/heads/branch-1.3 Commit: 2ab0ba04f66683be25cbe0e83cecf2bdcb0f13ba Parents: f97b0d4 Author: Patrick Wendell Authored: Wed Feb 18 01:52:06 2015 + Committer: Patrick Wendell Committed: Wed Feb 18 01:52:06 2015 + -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml| 2 +- extras/kinesis-asl/pom.xml| 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml| 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 523e2e8..7752b41 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 4f73cf7..4a13c58 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 5612149..aca0f58 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index f7d6030..c424592 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 45aa775..ef960a8 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 455304f..f01d6e8 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index bcae38d..bcd2ca2 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2ab0ba04/external/kafka/pom.xml --
spark git commit: [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators
Repository: spark Updated Branches: refs/heads/branch-1.3 7320605ad -> e8284b29d [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause. Author: Yin Huai Closes #4663 from yhuai/projectResolved and squashes the following commits: 472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false. (cherry picked from commit d5f12bfe8f0a98d6fee114bb24376668ebe2898e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8284b29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8284b29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8284b29 Branch: refs/heads/branch-1.3 Commit: e8284b29df0445bdf92f5ea2c74402a111718792 Parents: 7320605 Author: Yin Huai Authored: Tue Feb 17 17:50:39 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 17:50:51 2015 -0800 -- .../catalyst/plans/logical/basicOperators.scala | 10 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 13 +++- .../sql/hive/execution/SQLQuerySuite.scala | 32 +++- 3 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 9628e93..89544ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -23,6 +23,16 @@ import org.apache.spark.sql.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) + + override lazy val resolved: Boolean = { +val containsAggregatesOrGenerators = projectList.exists ( _.collect { +case agg: AggregateExpression => agg +case generator: Generator => generator + }.nonEmpty +) + +!expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index e70c651..aec7847 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert(caseInsensitiveAnalyze(plan).resolved) } + test("check project's resolved") { +assert(Project(testRelation.output, testRelation).resolved) + +assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved) + +val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)()) +assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved) + +assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved) + } + test("analyze project") { assert( caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) === http://git-wip-us.apache.org/repos/asf/spark/blob/e8284b29/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e8d9eec..ae03bc5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/h
spark git commit: [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators
Repository: spark Updated Branches: refs/heads/master a51fc7ef9 -> d5f12bfe8 [SPARK-5875][SQL]logical.Project should not be resolved if it contains aggregates or generators https://issues.apache.org/jira/browse/SPARK-5875 has a case to reproduce the bug and explain the root cause. Author: Yin Huai Closes #4663 from yhuai/projectResolved and squashes the following commits: 472f7b6 [Yin Huai] If a logical.Project has any AggregateExpression or Generator, it's resolved field should be false. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5f12bfe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5f12bfe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5f12bfe Branch: refs/heads/master Commit: d5f12bfe8f0a98d6fee114bb24376668ebe2898e Parents: a51fc7e Author: Yin Huai Authored: Tue Feb 17 17:50:39 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 17:50:39 2015 -0800 -- .../catalyst/plans/logical/basicOperators.scala | 10 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 13 +++- .../sql/hive/execution/SQLQuerySuite.scala | 32 +++- 3 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 9628e93..89544ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -23,6 +23,16 @@ import org.apache.spark.sql.types._ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { def output = projectList.map(_.toAttribute) + + override lazy val resolved: Boolean = { +val containsAggregatesOrGenerators = projectList.exists ( _.collect { +case agg: AggregateExpression => agg +case generator: Generator => generator + }.nonEmpty +) + +!expressions.exists(!_.resolved) && childrenResolved && !containsAggregatesOrGenerators + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index e70c651..aec7847 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -58,6 +58,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert(caseInsensitiveAnalyze(plan).resolved) } + test("check project's resolved") { +assert(Project(testRelation.output, testRelation).resolved) + +assert(!Project(Seq(UnresolvedAttribute("a")), testRelation).resolved) + +val explode = Explode(Nil, AttributeReference("a", IntegerType, nullable = true)()) +assert(!Project(Seq(Alias(explode, "explode")()), testRelation).resolved) + +assert(!Project(Seq(Alias(Count(Literal(1)), "count")()), testRelation).resolved) + } + test("analyze project") { assert( caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) === http://git-wip-us.apache.org/repos/asf/spark/blob/d5f12bfe/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e8d9eec..ae03bc5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import or
[1/2] spark git commit: Revert "Preparing development version 1.3.1-SNAPSHOT"
Repository: spark Updated Branches: refs/heads/branch-1.3 7e5e4d82b -> 7320605ad Revert "Preparing development version 1.3.1-SNAPSHOT" This reverts commit e57c81b8c1a6581c2588973eaf30d3c7ae90ed0c. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/932ae4d4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/932ae4d4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/932ae4d4 Branch: refs/heads/branch-1.3 Commit: 932ae4d4a05b6587d1564f801f8670b63a10c93b Parents: 7e5e4d8 Author: Patrick Wendell Authored: Tue Feb 17 17:48:43 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:48:43 2015 -0800 -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml| 2 +- extras/kinesis-asl/pom.xml| 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml| 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 7752b41..523e2e8 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 4a13c58..4f73cf7 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index aca0f58..5612149 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index c424592..f7d6030 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index ef960a8..45aa775 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index f01d6e8..455304f 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index bcd2ca2..bcae38d 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.1-SNAPSHOT +1.3.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/932ae4d4/external/kafka/pom.xm
[2/2] spark git commit: Revert "Preparing Spark release v1.3.0-snapshot1"
Revert "Preparing Spark release v1.3.0-snapshot1" This reverts commit d97bfc6f28ec4b7acfb36410c7c167d8d3c145ec. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7320605a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7320605a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7320605a Branch: refs/heads/branch-1.3 Commit: 7320605ad2e6ad6a94db96fe3fbcbfee349449ef Parents: 932ae4d Author: Patrick Wendell Authored: Tue Feb 17 17:48:47 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:48:47 2015 -0800 -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml| 2 +- extras/kinesis-asl/pom.xml| 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml| 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 28 files changed, 28 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 523e2e8..4122e24 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 4f73cf7..510e926 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 5612149..c993781 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index f7d6030..8caad2b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 45aa775..0706f1e 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 455304f..1f26813 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/kafka-assembly/pom.xml -- diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index bcae38d..503fc12 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.3.0 +1.3.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/7320605a/external/kafka/pom.xml -- diff --git a/ext
spark git commit: [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()
Repository: spark Updated Branches: refs/heads/branch-1.3 07a401a7b -> 7e5e4d82b [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs() This method is performance-sensitive and this change wasn't necessary. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e5e4d82 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e5e4d82 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e5e4d82 Branch: refs/heads/branch-1.3 Commit: 7e5e4d82be7509edb64c71ca6189add556589613 Parents: 07a401a Author: Josh Rosen Authored: Tue Feb 17 17:45:16 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 17:47:43 2015 -0800 -- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e5e4d82/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9c355d7..8b62d24 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -190,13 +190,15 @@ class DAGScheduler( } private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { -cacheLocs.getOrElseUpdate(rdd.id, { +// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times +if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - blockIds.map { id => + cacheLocs(rdd.id) = blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) } -}) +} +cacheLocs(rdd.id) } private def clearCacheLocs(): Unit = cacheLocs.synchronized { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs()
Repository: spark Updated Branches: refs/heads/master d46d6246d -> a51fc7ef9 [SPARK-4454] Revert getOrElse() cleanup in DAGScheduler.getCacheLocs() This method is performance-sensitive and this change wasn't necessary. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a51fc7ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a51fc7ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a51fc7ef Branch: refs/heads/master Commit: a51fc7ef9adb6a41c4857918217f800858fced2c Parents: d46d624 Author: Josh Rosen Authored: Tue Feb 17 17:45:16 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 17:45:16 2015 -0800 -- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a51fc7ef/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9c355d7..8b62d24 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -190,13 +190,15 @@ class DAGScheduler( } private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { -cacheLocs.getOrElseUpdate(rdd.id, { +// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times +if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - blockIds.map { id => + cacheLocs(rdd.id) = blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) } -}) +} +cacheLocs(rdd.id) } private def clearCacheLocs(): Unit = cacheLocs.synchronized { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map
Repository: spark Updated Branches: refs/heads/branch-1.3 cb905841b -> 07a401a7b [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map. This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`. If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors. Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread. An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations. However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop). Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix. See #3345 for additional context. Author: Josh Rosen Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits: 12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map. (cherry picked from commit d46d6246d225ff3af09ebae1a09d4de2430c502d) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07a401a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07a401a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07a401a7 Branch: refs/heads/branch-1.3 Commit: 07a401a7beea864092ec8f8c451e05cba5a19bbb Parents: cb90584 Author: Josh Rosen Authored: Tue Feb 17 17:39:58 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:40:04 2015 -0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 34 ++-- 1 file changed, 24 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07a401a7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7903557..9c355d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -98,7 +98,13 @@ class DAGScheduler( private[scheduler] val activeJobs = new HashSet[ActiveJob] - // Contains the locations that each RDD's partitions are cached on + /** + * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids + * and its values are arrays indexed by partition numbers. Each array value is the set of + * locations where that RDD partition is cached. + * + * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). + */ private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with @@ -183,18 +189,17 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason)) } - private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { -if (!cacheLocs.contains(rdd.id)) { + private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { +cacheLocs.getOrElseUpdate(rdd.id, { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - cacheLocs(rdd.id) = blockIds.map { id => + blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) } -} -cacheLocs(rdd.id) +}) } - private def clearCacheLocs() { + private def clearCacheLocs(): Unit = cacheLocs.synchronized { cacheLocs.clear() } @@ -1276,17 +1281,26 @@ class DAGScheduler( } /** - * Synchronized method that might be called from other threads. + * Gets the locality information associated with a partition of a particular RDD. + * + * This method is thread-safe and is called from both DAGScheduler and SparkContext. + * * @param rdd whose partitions are to
spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map
Repository: spark Updated Branches: refs/heads/master ae6cfb3ac -> d46d6246d [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map This patch addresses a race condition in DAGScheduler by properly synchronizing accesses to its `cacheLocs` map. This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, which can be called by separate threads, since DAGScheduler's `getPreferredLocs()` method is called by SparkContext and indirectly calls `getCacheLocs()`. If this map is cleared by the DAGScheduler event processing thread while a user thread is submitting a job and computing preferred locations, then this can cause the user thread to throw "NoSuchElementException: key not found" errors. Most accesses to DAGScheduler's internal state do not need synchronization because that state is only accessed from the event processing loop's thread. An alternative approach to fixing this bug would be to refactor this code so that SparkContext sends the DAGScheduler a message in order to get the list of preferred locations. However, this would involve more extensive changes to this code and would be significantly harder to backport to maintenance branches since some of the related code has undergone significant refactoring (e.g. the introduction of EventLoop). Since `cacheLocs` is the only state that's accessed in this way, adding simple synchronization seems like a better short-term fix. See #3345 for additional context. Author: Josh Rosen Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits: 12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs map. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d46d6246 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d46d6246 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d46d6246 Branch: refs/heads/master Commit: d46d6246d225ff3af09ebae1a09d4de2430c502d Parents: ae6cfb3 Author: Josh Rosen Authored: Tue Feb 17 17:39:58 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:39:58 2015 -0800 -- .../apache/spark/scheduler/DAGScheduler.scala | 34 ++-- 1 file changed, 24 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d46d6246/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7903557..9c355d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -98,7 +98,13 @@ class DAGScheduler( private[scheduler] val activeJobs = new HashSet[ActiveJob] - // Contains the locations that each RDD's partitions are cached on + /** + * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids + * and its values are arrays indexed by partition numbers. Each array value is the set of + * locations where that RDD partition is cached. + * + * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454). + */ private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with @@ -183,18 +189,17 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason)) } - private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { -if (!cacheLocs.contains(rdd.id)) { + private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { +cacheLocs.getOrElseUpdate(rdd.id, { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - cacheLocs(rdd.id) = blockIds.map { id => + blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) } -} -cacheLocs(rdd.id) +}) } - private def clearCacheLocs() { + private def clearCacheLocs(): Unit = cacheLocs.synchronized { cacheLocs.clear() } @@ -1276,17 +1281,26 @@ class DAGScheduler( } /** - * Synchronized method that might be called from other threads. + * Gets the locality information associated with a partition of a particular RDD. + * + * This method is thread-safe and is called from both DAGScheduler and SparkContext. + * * @param rdd whose partitions are to be looked at * @param partition to lookup locality information for * @return list of machines that are
spark git commit: [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support
Repository: spark Updated Branches: refs/heads/master c3d2b90bd -> ae6cfb3ac [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support Documentation for maven coordinates + Spark Package support. Added pyspark tests for `--packages` Author: Burak Yavuz Author: Davies Liu Closes #4662 from brkyvz/SPARK-5811 and squashes the following commits: 56d [Burak Yavuz] fixed broken test 64cb8ee [Burak Yavuz] passed pep8 on local c07b81e [Burak Yavuz] fixed pep8 a8bd6b7 [Burak Yavuz] submit PR 4ef4046 [Burak Yavuz] ready for PR 8fb02e5 [Burak Yavuz] merged master 25c9b9f [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into python-jar 560d13b [Burak Yavuz] before PR 17d3f76 [Davies Liu] support .jar as python package a3eb717 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-5811 c60156d [Burak Yavuz] [SPARK-5811] Added documentation for maven coordinates Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae6cfb3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae6cfb3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae6cfb3a Branch: refs/heads/master Commit: ae6cfb3acdbc2721d25793698a4a440f0519dbec Parents: c3d2b90 Author: Burak Yavuz Authored: Tue Feb 17 17:15:43 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:23:22 2015 -0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 52 ++- .../spark/deploy/SparkSubmitUtilsSuite.scala| 13 ++-- docs/programming-guide.md | 19 +- docs/submitting-applications.md | 5 ++ python/pyspark/tests.py | 69 ++-- 5 files changed, 131 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae6cfb3a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 012a89a..4c41108 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -252,6 +252,26 @@ object SparkSubmit { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER +// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files +// too for packages that include Python code +val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( +args.packages, Option(args.repositories), Option(args.ivyRepoPath)) +if (!resolvedMavenCoordinates.trim.isEmpty) { + if (args.jars == null || args.jars.trim.isEmpty) { +args.jars = resolvedMavenCoordinates + } else { +args.jars += s",$resolvedMavenCoordinates" + } + if (args.isPython) { +if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + args.pyFiles = resolvedMavenCoordinates +} else { + args.pyFiles += s",$resolvedMavenCoordinates" +} + } +} + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -307,18 +327,6 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" -// Resolve maven dependencies if there are any and add classpath to jars -val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( -args.packages, Option(args.repositories), Option(args.ivyRepoPath)) -if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { -args.jars = resolvedMavenCoordinates - } else { -args.jars += s",$resolvedMavenCoordinates" - } -} - // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( @@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils { private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** - * Extracts maven coordinates from a comma-delimited string + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides + * simplicity for Spark Package users. * @param coordinates Comma-delimited string of maven coordinates * @return Sequence of Maven coordinates */ private[
spark git commit: [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support
Repository: spark Updated Branches: refs/heads/branch-1.3 81202350a -> cb905841b [SPARK-5811] Added documentation for maven coordinates and added Spark Packages support Documentation for maven coordinates + Spark Package support. Added pyspark tests for `--packages` Author: Burak Yavuz Author: Davies Liu Closes #4662 from brkyvz/SPARK-5811 and squashes the following commits: 56d [Burak Yavuz] fixed broken test 64cb8ee [Burak Yavuz] passed pep8 on local c07b81e [Burak Yavuz] fixed pep8 a8bd6b7 [Burak Yavuz] submit PR 4ef4046 [Burak Yavuz] ready for PR 8fb02e5 [Burak Yavuz] merged master 25c9b9f [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into python-jar 560d13b [Burak Yavuz] before PR 17d3f76 [Davies Liu] support .jar as python package a3eb717 [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into SPARK-5811 c60156d [Burak Yavuz] [SPARK-5811] Added documentation for maven coordinates (cherry picked from commit ae6cfb3acdbc2721d25793698a4a440f0519dbec) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb905841 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb905841 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb905841 Branch: refs/heads/branch-1.3 Commit: cb905841b2eaa19e28a1327cab0e5d51f805d233 Parents: 8120235 Author: Burak Yavuz Authored: Tue Feb 17 17:15:43 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 17:23:30 2015 -0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 52 ++- .../spark/deploy/SparkSubmitUtilsSuite.scala| 13 ++-- docs/programming-guide.md | 19 +- docs/submitting-applications.md | 5 ++ python/pyspark/tests.py | 69 ++-- 5 files changed, 131 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb905841/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 012a89a..4c41108 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -252,6 +252,26 @@ object SparkSubmit { val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER +// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files +// too for packages that include Python code +val resolvedMavenCoordinates = + SparkSubmitUtils.resolveMavenCoordinates( +args.packages, Option(args.repositories), Option(args.ivyRepoPath)) +if (!resolvedMavenCoordinates.trim.isEmpty) { + if (args.jars == null || args.jars.trim.isEmpty) { +args.jars = resolvedMavenCoordinates + } else { +args.jars += s",$resolvedMavenCoordinates" + } + if (args.isPython) { +if (args.pyFiles == null || args.pyFiles.trim.isEmpty) { + args.pyFiles = resolvedMavenCoordinates +} else { + args.pyFiles += s",$resolvedMavenCoordinates" +} + } +} + // Require all python files to be local, so we can add them to the PYTHONPATH // In YARN cluster mode, python files are distributed as regular files, which can be non-local if (args.isPython && !isYarnCluster) { @@ -307,18 +327,6 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" -// Resolve maven dependencies if there are any and add classpath to jars -val resolvedMavenCoordinates = - SparkSubmitUtils.resolveMavenCoordinates( -args.packages, Option(args.repositories), Option(args.ivyRepoPath)) -if (!resolvedMavenCoordinates.trim.isEmpty) { - if (args.jars == null || args.jars.trim.isEmpty) { -args.jars = resolvedMavenCoordinates - } else { -args.jars += s",$resolvedMavenCoordinates" - } -} - // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( @@ -646,13 +654,15 @@ private[spark] object SparkSubmitUtils { private[spark] case class MavenCoordinate(groupId: String, artifactId: String, version: String) /** - * Extracts maven coordinates from a comma-delimited string + * Extracts maven coordinates from a comma-delimited string. Coordinates should be provided + * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. The latter provides + * simplicity for Spark Package users. * @param coor
svn commit: r1660555 - in /spark: community.md site/community.html
Author: matei Date: Wed Feb 18 01:08:19 2015 New Revision: 1660555 URL: http://svn.apache.org/r1660555 Log: Berlin meetup and cleaning up meetup page Modified: spark/community.md spark/site/community.html Modified: spark/community.md URL: http://svn.apache.org/viewvc/spark/community.md?rev=1660555&r1=1660554&r2=1660555&view=diff == --- spark/community.md (original) +++ spark/community.md Wed Feb 18 01:08:19 2015 @@ -52,21 +52,20 @@ Spark Meetups are grass-roots events org http://www.meetup.com/spark-users/";>Bay Area Spark Meetup. -This group has been running monthly events since January 2012 in the San Francisco area. -The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past meetups, including videos and http://www.meetup.com/spark-users/files/";>slides on most of the recent pages. -You can also find links to videos from past meetups on the Documentation Page. +This group has been running since January 2012 in the San Francisco area. +The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past meetups, including videos and http://www.meetup.com/spark-users/files/";>slides for most of the recent talks. -http://www.meetup.com/Spark-London/";>London Spark Meetup +http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup -http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark Meetup +http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark Meetup -http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup +http://www.meetup.com/Berlin-Apache-Spark-Meetup/";>Berlin Spark Meetup -http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark Meetup +http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark Meetup http://www.meetup.com/Boston-Apache-Spark-User-Group/";>Boston Spark Meetup @@ -84,6 +83,9 @@ Spark Meetups are grass-roots events org http://www.meetup.com/Spark-User-Group-Hyderabad/";>Hyderabad Spark Meetup +http://www.meetup.com/Spark-London/";>London Spark Meetup + + http://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark Meetup Modified: spark/site/community.html URL: http://svn.apache.org/viewvc/spark/site/community.html?rev=1660555&r1=1660554&r2=1660555&view=diff == --- spark/site/community.html (original) +++ spark/site/community.html Wed Feb 18 01:08:19 2015 @@ -212,21 +212,20 @@ http://www.meetup.com/spark-users/";>Bay Area Spark Meetup. -This group has been running monthly events since January 2012 in the San Francisco area. -The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past meetups, including videos and http://www.meetup.com/spark-users/files/";>slides on most of the recent pages. -You can also find links to videos from past meetups on the Documentation Page. +This group has been running since January 2012 in the San Francisco area. +The meetup page also contains an http://www.meetup.com/spark-users/events/past/";>archive of past meetups, including videos and http://www.meetup.com/spark-users/files/";>slides for most of the recent talks. -http://www.meetup.com/Spark-London/";>London Spark Meetup +http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup -http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark Meetup +http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark Meetup -http://www.meetup.com/Spark-Barcelona/";>Barcelona Spark Meetup +http://www.meetup.com/Berlin-Apache-Spark-Meetup/";>Berlin Spark Meetup -http://www.meetup.com/Spark_big_data_analytics/";>Bangalore Spark Meetup +http://www.meetup.com/spark-user-beijing-Meetup/";>Beijing Spark Meetup http://www.meetup.com/Boston-Apache-Spark-User-Group/";>Boston Spark Meetup @@ -244,6 +243,9 @@ http://www.meetup.com/Spark-User-Group-Hyderabad/";>Hyderabad Spark Meetup +http://www.meetup.com/Spark-London/";>London Spark Meetup + + http://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark Meetup - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r1660554 - /spark/news/_posts/2015-02-09-spark-1-2-1-released.md
Author: pwendell Date: Wed Feb 18 01:07:50 2015 New Revision: 1660554 URL: http://svn.apache.org/r1660554 Log: Adding missing news file for Spark 1.2.1 release Added: spark/news/_posts/2015-02-09-spark-1-2-1-released.md Added: spark/news/_posts/2015-02-09-spark-1-2-1-released.md URL: http://svn.apache.org/viewvc/spark/news/_posts/2015-02-09-spark-1-2-1-released.md?rev=1660554&view=auto == --- spark/news/_posts/2015-02-09-spark-1-2-1-released.md (added) +++ spark/news/_posts/2015-02-09-spark-1-2-1-released.md Wed Feb 18 01:07:50 2015 @@ -0,0 +1,16 @@ +--- +layout: post +title: Spark 1.2.1 released +categories: +- News +tags: [] +status: publish +type: post +published: true +meta: + _edit_last: '4' + _wpas_done_all: '1' +--- +We are happy to announce the availability of Spark 1.2.1! This is a maintenance release that includes contributions from 69 developers. Spark 1.2.1 includes fixes across several areas of Spark, including the core API, Streaming, PySpark, SQL, GraphX, and MLlib. + +Visit the release notes to read about this release or download the release today. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark
Repository: spark Updated Branches: refs/heads/branch-1.3 07d8ef9e7 -> 81202350a [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in. The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage. Author: Davies Liu Closes #4629 from davies/narrow and squashes the following commits: dffe34e [Davies Liu] improve test, check number of stages for join/cogroup 1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow 4d29932 [Davies Liu] address comment cc28d97 [Davies Liu] add unit tests 940245e [Davies Liu] address comments ff5a0a6 [Davies Liu] skip the partitionBy() on Python side eb26c62 [Davies Liu] narrow dependency in PySpark (cherry picked from commit c3d2b90bde2e11823909605d518167548df66bd8) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81202350 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81202350 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81202350 Branch: refs/heads/branch-1.3 Commit: 81202350a08c50685676300218270929c76f648a Parents: 07d8ef9 Author: Davies Liu Authored: Tue Feb 17 16:54:57 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 16:55:20 2015 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 11 - .../org/apache/spark/api/python/PythonRDD.scala | 10 .../main/scala/org/apache/spark/rdd/RDD.scala | 8 +++- python/pyspark/join.py | 8 ++-- python/pyspark/rdd.py | 49 +--- python/pyspark/streaming/dstream.py | 2 +- python/pyspark/tests.py | 38 ++- 7 files changed, 101 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/81202350/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fd8fac6..d59b466 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs. */ - def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { +val partitioners = rdds.flatMap(_.partitioner).toSet +if (partitioners.size == 1) { + new PartitionerAwareUnionRDD(this, rdds) +} else { + new UnionRDD(this, rdds) +} + } /** Build the union of a list of RDDs passed as variable-length arguments. */ def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = -new UnionRDD(this, Seq(first) ++ rest) +union(Seq(first) ++ rest) /** Get an RDD that has no partitions or elements. */ def emptyRDD[T: ClassTag] = new EmptyRDD[T](this) http://git-wip-us.apache.org/repos/asf/spark/blob/81202350/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2527211..dcb6e63 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -303,6 +303,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions + override val partitioner = prev.partitioner override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (Utils.deserializeLongValue(a), b) @@ -330,6 +331,15 @@ private[spark] object PythonRDD extends Logging { } /** + * Return an RDD of values from an RDD of (Long, Array[Byte]), with preservePartitions=true + * + * This is useful for PySpark to have the partitioner after partitionBy() + */ + def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] = { +pa
spark git commit: [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark
Repository: spark Updated Branches: refs/heads/master 117121a4e -> c3d2b90bd [SPARK-5785] [PySpark] narrow dependency for cogroup/join in PySpark Currently, PySpark does not support narrow dependency during cogroup/join when the two RDDs have the partitioner, another unnecessary shuffle stage will come in. The Python implementation of cogroup/join is different than Scala one, it depends on union() and partitionBy(). This patch will try to use PartitionerAwareUnionRDD() in union(), when all the RDDs have the same partitioner. It also fix `reservePartitioner` in all the map() or mapPartitions(), then partitionBy() can skip the unnecessary shuffle stage. Author: Davies Liu Closes #4629 from davies/narrow and squashes the following commits: dffe34e [Davies Liu] improve test, check number of stages for join/cogroup 1ed3ba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into narrow 4d29932 [Davies Liu] address comment cc28d97 [Davies Liu] add unit tests 940245e [Davies Liu] address comments ff5a0a6 [Davies Liu] skip the partitionBy() on Python side eb26c62 [Davies Liu] narrow dependency in PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3d2b90b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3d2b90b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3d2b90b Branch: refs/heads/master Commit: c3d2b90bde2e11823909605d518167548df66bd8 Parents: 117121a Author: Davies Liu Authored: Tue Feb 17 16:54:57 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 16:54:57 2015 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 11 - .../org/apache/spark/api/python/PythonRDD.scala | 10 .../main/scala/org/apache/spark/rdd/RDD.scala | 8 +++- python/pyspark/join.py | 8 ++-- python/pyspark/rdd.py | 49 +--- python/pyspark/streaming/dstream.py | 2 +- python/pyspark/tests.py | 38 ++- 7 files changed, 101 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3d2b90b/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fd8fac6..d59b466 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -961,11 +961,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs. */ - def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { +val partitioners = rdds.flatMap(_.partitioner).toSet +if (partitioners.size == 1) { + new PartitionerAwareUnionRDD(this, rdds) +} else { + new UnionRDD(this, rdds) +} + } /** Build the union of a list of RDDs passed as variable-length arguments. */ def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = -new UnionRDD(this, Seq(first) ++ rest) +union(Seq(first) ++ rest) /** Get an RDD that has no partitions or elements. */ def emptyRDD[T: ClassTag] = new EmptyRDD[T](this) http://git-wip-us.apache.org/repos/asf/spark/blob/c3d2b90b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2527211..dcb6e63 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -303,6 +303,7 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) { override def getPartitions = prev.partitions + override val partitioner = prev.partitioner override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (Utils.deserializeLongValue(a), b) @@ -330,6 +331,15 @@ private[spark] object PythonRDD extends Logging { } /** + * Return an RDD of values from an RDD of (Long, Array[Byte]), with preservePartitions=true + * + * This is useful for PySpark to have the partitioner after partitionBy() + */ + def valueOfPair(pair: JavaPairRDD[Long, Array[Byte]]): JavaRDD[Array[Byte]] = { +pair.rdd.mapPartitions(it => it.map(_._2), true) + } + + /** * Adapter for calling SparkContext#runJo
spark git commit: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.
Repository: spark Updated Branches: refs/heads/branch-1.3 0dba382ee -> 07d8ef9e7 [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table. The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai Author: Cheng Hao Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext (cherry picked from commit 117121a4ecaadda156a82255333670775e7727db) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07d8ef9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07d8ef9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07d8ef9e Branch: refs/heads/branch-1.3 Commit: 07d8ef9e730251b66f962fa7acdaf7d7eacc62a1 Parents: 0dba382 Author: Yin Huai Authored: Tue Feb 17 15:47:59 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 15:48:23 2015 -0800 -- .../apache/spark/sql/parquet/newParquet.scala | 18 ++- .../sql/hive/MetastoreDataSourcesSuite.scala| 38 +++ .../spark/sql/parquet/parquetSuites.scala | 114 ++- 3 files changed, 164 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07d8ef9e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 95bea92..16b7713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + // To get the schema. We first try to get the schema defined in maybeSchema. + // If maybeSchema is not defined, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist, we will try to get the schema defined in + // maybeMetastoreSchema (defined in the options of the data source). + // Finally, if we still could not get the schema. We throw an error. + parquetSchema = +maybeSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2( } } -private def readSchema(): StructType = { +private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema @@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } -}.reduce { (left, right) => +}.reduceOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } http://git-wip-us.apache.org/repos/asf/spark/blob/07d8ef9e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive
spark git commit: [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.
Repository: spark Updated Branches: refs/heads/master 4d4cc760f -> 117121a4e [SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table. The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai Author: Cheng Hao Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/117121a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/117121a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/117121a4 Branch: refs/heads/master Commit: 117121a4ecaadda156a82255333670775e7727db Parents: 4d4cc76 Author: Yin Huai Authored: Tue Feb 17 15:47:59 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 15:47:59 2015 -0800 -- .../apache/spark/sql/parquet/newParquet.scala | 18 ++- .../sql/hive/MetastoreDataSourcesSuite.scala| 38 +++ .../spark/sql/parquet/parquetSuites.scala | 114 ++- 3 files changed, 164 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 95bea92..16b7713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2( } } - parquetSchema = maybeSchema.getOrElse(readSchema()) + // To get the schema. We first try to get the schema defined in maybeSchema. + // If maybeSchema is not defined, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist, we will try to get the schema defined in + // maybeMetastoreSchema (defined in the options of the data source). + // Finally, if we still could not get the schema. We throw an error. + parquetSchema = +maybeSchema + .orElse(readSchema()) + .orElse(maybeMetastoreSchema) + .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = isPartitioned && @@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2( } } -private def readSchema(): StructType = { +private def readSchema(): Option[StructType] = { // Sees which file(s) we need to touch in order to figure out the schema. val filesToTouch = // Always tries the summary files first if users don't require a merged schema. In this case, @@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 { // internally. private[sql] val METASTORE_SCHEMA = "metastoreSchema" - private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = { + private[parquet] def readSchema( + footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { footers.map { footer => val metadata = footer.getParquetMetadata.getFileMetaData val parquetSchema = metadata.getSchema @@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 { sqlContext.conf.isParquetBinaryAsString, sqlContext.conf.isParquetINT96AsTimestamp)) } -}.reduce { (left, right) => +}.reduceOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } http://git-wip-us.apache.org/repos/asf/spark/blob/117121a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuit
spark git commit: [SPARK-5872] [SQL] create a sqlCtx in pyspark shell
Repository: spark Updated Branches: refs/heads/branch-1.3 cb061603c -> 0dba382ee [SPARK-5872] [SQL] create a sqlCtx in pyspark shell The sqlCtx will be HiveContext if hive is built in assembly jar, or SQLContext if not. It also skip the Hive tests in pyspark.sql.tests if no hive is available. Author: Davies Liu Closes #4659 from davies/sqlctx and squashes the following commits: 0e6629a [Davies Liu] sqlCtx in pyspark (cherry picked from commit 4d4cc760fa9687ce563320094557ef9144488676) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dba382e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dba382e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dba382e Branch: refs/heads/branch-1.3 Commit: 0dba382ee65694969704384c4968e3a656b3c833 Parents: cb06160 Author: Davies Liu Authored: Tue Feb 17 15:44:37 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 15:44:45 2015 -0800 -- python/pyspark/shell.py | 13 - python/pyspark/sql/tests.py | 12 ++-- 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0dba382e/python/pyspark/shell.py -- diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 4cf4b89..1a02fec 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -31,8 +31,12 @@ if sys.version_info[0] != 2: import atexit import os import platform + +import py4j + import pyspark from pyspark.context import SparkContext +from pyspark.sql import SQLContext, HiveContext from pyspark.storagelevel import StorageLevel # this is the deprecated equivalent of ADD_JARS @@ -46,6 +50,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"): sc = SparkContext(appName="PySparkShell", pyFiles=add_files) atexit.register(lambda: sc.stop()) +try: +# Try to access HiveConf, it will raise exception if Hive is not added +sc._jvm.org.apache.hadoop.hive.conf.HiveConf() +sqlCtx = HiveContext(sc) +except py4j.protocol.Py4JError: +sqlCtx = SQLContext(sc) + print("""Welcome to __ / __/__ ___ _/ /__ @@ -57,7 +68,7 @@ print("Using Python version %s (%s, %s)" % ( platform.python_version(), platform.python_build()[0], platform.python_build()[1])) -print("SparkContext available as sc.") +print("SparkContext available as sc, %s available as sqlCtx." % sqlCtx.__class__.__name__) if add_files is not None: print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead") http://git-wip-us.apache.org/repos/asf/spark/blob/0dba382e/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index aa80bca..52f7e65 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -25,6 +25,8 @@ import pydoc import shutil import tempfile +import py4j + if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest @@ -329,9 +331,12 @@ class HiveContextSQLTests(ReusedPySparkTestCase): def setUpClass(cls): ReusedPySparkTestCase.setUpClass() cls.tempdir = tempfile.NamedTemporaryFile(delete=False) +try: +cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() +except py4j.protocol.Py4JError: +cls.sqlCtx = None +return os.unlink(cls.tempdir.name) -print "type", type(cls.sc) -print "type", type(cls.sc._jsc) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext) @@ -344,6 +349,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(cls.tempdir.name, ignore_errors=True) def test_save_and_load_table(self): +if self.sqlCtx is None: +return # no hive available, skipped + df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5872] [SQL] create a sqlCtx in pyspark shell
Repository: spark Updated Branches: refs/heads/master 3df85dccb -> 4d4cc760f [SPARK-5872] [SQL] create a sqlCtx in pyspark shell The sqlCtx will be HiveContext if hive is built in assembly jar, or SQLContext if not. It also skip the Hive tests in pyspark.sql.tests if no hive is available. Author: Davies Liu Closes #4659 from davies/sqlctx and squashes the following commits: 0e6629a [Davies Liu] sqlCtx in pyspark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d4cc760 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d4cc760 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d4cc760 Branch: refs/heads/master Commit: 4d4cc760fa9687ce563320094557ef9144488676 Parents: 3df85dc Author: Davies Liu Authored: Tue Feb 17 15:44:37 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 15:44:37 2015 -0800 -- python/pyspark/shell.py | 13 - python/pyspark/sql/tests.py | 12 ++-- 2 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d4cc760/python/pyspark/shell.py -- diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 4cf4b89..1a02fec 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -31,8 +31,12 @@ if sys.version_info[0] != 2: import atexit import os import platform + +import py4j + import pyspark from pyspark.context import SparkContext +from pyspark.sql import SQLContext, HiveContext from pyspark.storagelevel import StorageLevel # this is the deprecated equivalent of ADD_JARS @@ -46,6 +50,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"): sc = SparkContext(appName="PySparkShell", pyFiles=add_files) atexit.register(lambda: sc.stop()) +try: +# Try to access HiveConf, it will raise exception if Hive is not added +sc._jvm.org.apache.hadoop.hive.conf.HiveConf() +sqlCtx = HiveContext(sc) +except py4j.protocol.Py4JError: +sqlCtx = SQLContext(sc) + print("""Welcome to __ / __/__ ___ _/ /__ @@ -57,7 +68,7 @@ print("Using Python version %s (%s, %s)" % ( platform.python_version(), platform.python_build()[0], platform.python_build()[1])) -print("SparkContext available as sc.") +print("SparkContext available as sc, %s available as sqlCtx." % sqlCtx.__class__.__name__) if add_files is not None: print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead") http://git-wip-us.apache.org/repos/asf/spark/blob/4d4cc760/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index aa80bca..52f7e65 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -25,6 +25,8 @@ import pydoc import shutil import tempfile +import py4j + if sys.version_info[:2] <= (2, 6): try: import unittest2 as unittest @@ -329,9 +331,12 @@ class HiveContextSQLTests(ReusedPySparkTestCase): def setUpClass(cls): ReusedPySparkTestCase.setUpClass() cls.tempdir = tempfile.NamedTemporaryFile(delete=False) +try: +cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf() +except py4j.protocol.Py4JError: +cls.sqlCtx = None +return os.unlink(cls.tempdir.name) -print "type", type(cls.sc) -print "type", type(cls.sc._jsc) _scala_HiveContext =\ cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc()) cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext) @@ -344,6 +349,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase): shutil.rmtree(cls.tempdir.name, ignore_errors=True) def test_save_and_load_table(self): +if self.sqlCtx is None: +return # no hive available, skipped + df = self.df tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5871] output explain in Python
Repository: spark Updated Branches: refs/heads/branch-1.3 35e23ff14 -> cb061603c [SPARK-5871] output explain in Python Author: Davies Liu Closes #4658 from davies/explain and squashes the following commits: db87ea2 [Davies Liu] output explain in Python (cherry picked from commit 3df85dccbc8fd1ba19bbcdb8d359c073b1494d98) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb061603 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb061603 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb061603 Branch: refs/heads/branch-1.3 Commit: cb061603c3ca4cd5162a36fc32de15779614e854 Parents: 35e23ff Author: Davies Liu Authored: Tue Feb 17 13:48:38 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 13:51:00 2015 -0800 -- python/pyspark/sql/dataframe.py | 23 --- 1 file changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb061603/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8417240..388033d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -244,8 +244,25 @@ class DataFrame(object): debugging purpose. If extended is False, only prints the physical plan. -""" -self._jdf.explain(extended) + +>>> df.explain() +PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:... + +>>> df.explain(True) +== Parsed Logical Plan == +... +== Analyzed Logical Plan == +... +== Optimized Logical Plan == +... +== Physical Plan == +... +== RDD == +""" +if extended: +print self._jdf.queryExecution().toString() +else: +print self._jdf.queryExecution().executedPlan().toString() def isLocal(self): """ @@ -1034,7 +1051,7 @@ def _test(): Row(name='Bob', age=5, height=85)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, -optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) +optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5871] output explain in Python
Repository: spark Updated Branches: refs/heads/master 445a755b8 -> 3df85dccb [SPARK-5871] output explain in Python Author: Davies Liu Closes #4658 from davies/explain and squashes the following commits: db87ea2 [Davies Liu] output explain in Python Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3df85dcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3df85dcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3df85dcc Branch: refs/heads/master Commit: 3df85dccbc8fd1ba19bbcdb8d359c073b1494d98 Parents: 445a755 Author: Davies Liu Authored: Tue Feb 17 13:48:38 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 13:48:38 2015 -0800 -- python/pyspark/sql/dataframe.py | 23 --- 1 file changed, 20 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3df85dcc/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8417240..388033d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -244,8 +244,25 @@ class DataFrame(object): debugging purpose. If extended is False, only prints the physical plan. -""" -self._jdf.explain(extended) + +>>> df.explain() +PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at mapPartitions at SQLContext.scala:... + +>>> df.explain(True) +== Parsed Logical Plan == +... +== Analyzed Logical Plan == +... +== Optimized Logical Plan == +... +== Physical Plan == +... +== RDD == +""" +if extended: +print self._jdf.queryExecution().toString() +else: +print self._jdf.queryExecution().executedPlan().toString() def isLocal(self): """ @@ -1034,7 +1051,7 @@ def _test(): Row(name='Bob', age=5, height=85)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, -optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) +optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4172] [PySpark] Progress API in Python
Repository: spark Updated Branches: refs/heads/master de4836f8f -> 445a755b8 [SPARK-4172] [PySpark] Progress API in Python This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/445a755b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/445a755b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/445a755b Branch: refs/heads/master Commit: 445a755b884885b88c1778fd56a3151045b0b0ed Parents: de4836f Author: Davies Liu Authored: Tue Feb 17 13:36:43 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 13:36:43 2015 -0800 -- .../spark/scheduler/TaskResultGetter.scala | 40 examples/src/main/python/status_api_demo.py | 67 ++ python/pyspark/__init__.py | 15 +-- python/pyspark/context.py | 7 ++ python/pyspark/status.py| 96 python/pyspark/tests.py | 31 +++ 6 files changed, 232 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/445a755b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 774f3d8..3938580 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer +import java.util.concurrent.RejectedExecutionException import scala.language.existentials import scala.util.control.NonFatal @@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { var reason : TaskEndReason = UnknownReason -getTaskResultExecutor.execute(new Runnable { - override def run(): Unit = Utils.logUncaughtExceptions { -try { - if (serializedData != null && serializedData.limit() > 0) { -reason = serializer.get().deserialize[TaskEndReason]( - serializedData, Utils.getSparkClassLoader) +try { + getTaskResultExecutor.execute(new Runnable { +override def run(): Unit = Utils.logUncaughtExceptions { + try { +if (serializedData != null && serializedData.limit() > 0) { + reason = serializer.get().deserialize[TaskEndReason]( +serializedData, Utils.getSparkClassLoader) +} + } catch { +case cnd: ClassNotFoundException => + // Log an error but keep going here -- the task failed, so not catastrophic + // if we can't deserialize the reason. + val loader = Utils.getContextOrSparkClassLoader + logError( +"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) +case ex: Exception => {} } -} catch { - case cnd: ClassNotFoundException => -// Log an error but keep going here -- the task failed, so not catastrophic if we can't -// deserialize the reason. -val loader = Utils.getContextOrSparkClassLoader -logError( - "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) - case ex: Exception => {} + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } -scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) - } -}) + }) +} catch { + case e: RejectedExecutionException if sparkEnv.isStopped => +// ignore it +} } def stop() { http://git-wip-us.apache.org/repos/asf/spark/blob/445a755b/examples/src/main/python/status_api_demo.py --
spark git commit: [SPARK-4172] [PySpark] Progress API in Python
Repository: spark Updated Branches: refs/heads/branch-1.3 e65dc1fd5 -> 35e23ff14 [SPARK-4172] [PySpark] Progress API in Python This patch bring the pull based progress API into Python, also a example in Python. Author: Davies Liu Closes #3027 from davies/progress_api and squashes the following commits: b1ba984 [Davies Liu] fix style d3b9253 [Davies Liu] add tests, mute the exception after stop 4297327 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 969fa9d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 25590c9 [Davies Liu] update with Java API 360de2d [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api c0f1021 [Davies Liu] Merge branch 'master' of github.com:apache/spark into progress_api 023afb3 [Davies Liu] add Python API and example for progress API (cherry picked from commit 445a755b884885b88c1778fd56a3151045b0b0ed) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35e23ff1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35e23ff1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35e23ff1 Branch: refs/heads/branch-1.3 Commit: 35e23ff140cd65a4121e769ee0a4e22a3490be37 Parents: e65dc1f Author: Davies Liu Authored: Tue Feb 17 13:36:43 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 13:36:54 2015 -0800 -- .../spark/scheduler/TaskResultGetter.scala | 40 examples/src/main/python/status_api_demo.py | 67 ++ python/pyspark/__init__.py | 15 +-- python/pyspark/context.py | 7 ++ python/pyspark/status.py| 96 python/pyspark/tests.py | 31 +++ 6 files changed, 232 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35e23ff1/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 774f3d8..3938580 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer +import java.util.concurrent.RejectedExecutionException import scala.language.existentials import scala.util.control.NonFatal @@ -95,25 +96,30 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul def enqueueFailedTask(taskSetManager: TaskSetManager, tid: Long, taskState: TaskState, serializedData: ByteBuffer) { var reason : TaskEndReason = UnknownReason -getTaskResultExecutor.execute(new Runnable { - override def run(): Unit = Utils.logUncaughtExceptions { -try { - if (serializedData != null && serializedData.limit() > 0) { -reason = serializer.get().deserialize[TaskEndReason]( - serializedData, Utils.getSparkClassLoader) +try { + getTaskResultExecutor.execute(new Runnable { +override def run(): Unit = Utils.logUncaughtExceptions { + try { +if (serializedData != null && serializedData.limit() > 0) { + reason = serializer.get().deserialize[TaskEndReason]( +serializedData, Utils.getSparkClassLoader) +} + } catch { +case cnd: ClassNotFoundException => + // Log an error but keep going here -- the task failed, so not catastrophic + // if we can't deserialize the reason. + val loader = Utils.getContextOrSparkClassLoader + logError( +"Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) +case ex: Exception => {} } -} catch { - case cnd: ClassNotFoundException => -// Log an error but keep going here -- the task failed, so not catastrophic if we can't -// deserialize the reason. -val loader = Utils.getContextOrSparkClassLoader -logError( - "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader) - case ex: Exception => {} + scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) } -scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) - } -}) + }) +} catch { + case e: RejectedExecutionException if sparkEnv.isStopped => +// ignore it +} } def stop() { http://git-wip-us.apache.org/repos/asf/spark
spark git commit: [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext
Repository: spark Updated Branches: refs/heads/master 9d281fa56 -> de4836f8f [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext Author: Michael Armbrust Closes #4657 from marmbrus/pythonUdfs and squashes the following commits: a7823a8 [Michael Armbrust] [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de4836f8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de4836f8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de4836f8 Branch: refs/heads/master Commit: de4836f8f12c36c1b350cef288a75b5e59155735 Parents: 9d281fa Author: Michael Armbrust Authored: Tue Feb 17 13:23:45 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 13:23:45 2015 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++- .../main/scala/org/apache/spark/sql/execution/pythonUdfs.scala| 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de4836f8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 31afa0e..709b350 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -113,6 +113,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = true) { override val extendedResolutionRules = +ExtractPythonUdfs :: sources.PreWriteCheck(catalog) :: sources.PreInsertCastAndRename :: Nil @@ -1059,7 +1060,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @DeveloperApi protected[sql] class QueryExecution(val logical: LogicalPlan) { -lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical)) +lazy val analyzed: LogicalPlan = analyzer(logical) lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed) lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData) http://git-wip-us.apache.org/repos/asf/spark/blob/de4836f8/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 3a2f8d7..69de4d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -205,6 +205,9 @@ case class EvaluatePython( extends logical.UnaryNode { def output = child.output :+ resultAttribute + + // References should not include the produced attribute. + override def references = udf.references } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext
Repository: spark Updated Branches: refs/heads/branch-1.3 01356514e -> e65dc1fd5 [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext Author: Michael Armbrust Closes #4657 from marmbrus/pythonUdfs and squashes the following commits: a7823a8 [Michael Armbrust] [SPARK-5868][SQL] Fix python UDFs in HiveContext and checks in SQLContext (cherry picked from commit de4836f8f12c36c1b350cef288a75b5e59155735) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e65dc1fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e65dc1fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e65dc1fd Branch: refs/heads/branch-1.3 Commit: e65dc1fd58e4f9445247c9fd4e94b34550e992fb Parents: 0135651 Author: Michael Armbrust Authored: Tue Feb 17 13:23:45 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 13:23:56 2015 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 3 ++- .../main/scala/org/apache/spark/sql/execution/pythonUdfs.scala| 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e65dc1fd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 31afa0e..709b350 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -113,6 +113,7 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = true) { override val extendedResolutionRules = +ExtractPythonUdfs :: sources.PreWriteCheck(catalog) :: sources.PreInsertCastAndRename :: Nil @@ -1059,7 +1060,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @DeveloperApi protected[sql] class QueryExecution(val logical: LogicalPlan) { -lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical)) +lazy val analyzed: LogicalPlan = analyzer(logical) lazy val withCachedData: LogicalPlan = cacheManager.useCachedData(analyzed) lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData) http://git-wip-us.apache.org/repos/asf/spark/blob/e65dc1fd/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 3a2f8d7..69de4d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -205,6 +205,9 @@ case class EvaluatePython( extends logical.UnaryNode { def output = child.output :+ resultAttribute + + // References should not include the produced attribute. + override def references = udf.references } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Minor][SQL] Use same function to check path parameter in JSONRelation
Repository: spark Updated Branches: refs/heads/master 4611de1ce -> ac506b7c2 [Minor][SQL] Use same function to check path parameter in JSONRelation Author: Liang-Chi Hsieh Closes #4649 from viirya/use_checkpath and squashes the following commits: 0f9a1a1 [Liang-Chi Hsieh] Use same function to check path parameter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac506b7c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac506b7c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac506b7c Branch: refs/heads/master Commit: ac506b7c2846f656e03839bbd0e93827c7cc613e Parents: 4611de1 Author: Liang-Chi Hsieh Authored: Tue Feb 17 12:24:13 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:24:13 2015 -0800 -- .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala | 4 ++-- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala| 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac506b7c/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 2484863..3b68b7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -37,7 +37,7 @@ private[sql] class DefaultSource override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) +val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) JSONRelation(path, samplingRatio, None)(sqlContext) @@ -48,7 +48,7 @@ private[sql] class DefaultSource sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { -val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) +val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) JSONRelation(path, samplingRatio, Some(schema))(sqlContext) http://git-wip-us.apache.org/repos/asf/spark/blob/ac506b7c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0263e3b..485d5c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -547,7 +547,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Map.empty[String, String]) }.getMessage assert( - message.contains("Option 'path' not specified"), + message.contains("'path' must be specified for json data."), "We should complain that path is not specified.") sql("DROP TABLE savedJsonTable") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] [Minor] Update the HiveContext Unittest
Repository: spark Updated Branches: refs/heads/master ac506b7c2 -> 9d281fa56 [SQL] [Minor] Update the HiveContext Unittest In unit test, the table src(key INT, value STRING) is not the same as HIVE src(key STRING, value STRING) https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql And in the reflect.q, test failed for expression `reflect("java.lang.Integer", "valueOf", key, 16)`, which expect the argument `key` as STRING not INT. This PR doesn't aim to change the `src` schema, we can do that after 1.3 released, however, we probably need to re-generate all the golden files. Author: Cheng Hao Closes #4584 from chenghao-intel/reflect and squashes the following commits: e5bdc3a [Cheng Hao] Move the test case reflect into blacklist 184abfd [Cheng Hao] revert the change to table src1 d9bcf92 [Cheng Hao] Update the HiveContext Unittest Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d281fa5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d281fa5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d281fa5 Branch: refs/heads/master Commit: 9d281fa56022800dc008a3de233fec44379a2bd7 Parents: ac506b7 Author: Cheng Hao Authored: Tue Feb 17 12:25:35 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:25:35 2015 -0800 -- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 6 ++ .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 1 + .../golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada | 1 + .../golden/udf_reflect2-1-7bec330c7bc6f71cbaf9bf1883d1b184 | 1 + .../golden/udf_reflect2-2-c5a05379f482215a5a484bed0299bf19 | 3 +++ .../golden/udf_reflect2-3-effc057c78c00b0af26a4ac0f5f116ca | 0 .../golden/udf_reflect2-4-73d466e70e96e9e5f0cd373b37d4e1f4 | 5 + 7 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 133f2d3..c6ead45 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -225,6 +225,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Needs constant object inspectors "udf_round", +// the table src(key INT, value STRING) is not the same as HIVE unittest. In Hive +// is src(key STRING, value STRING), and in the reflect.q, it failed in +// Integer.valueOf, which expect the first argument passed as STRING type not INT. +"udf_reflect", + // Sort with Limit clause causes failure. "ctas", "ctas_hadoop20", @@ -886,6 +891,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_power", "udf_radians", "udf_rand", +"udf_reflect2", "udf_regexp", "udf_regexp_extract", "udf_regexp_replace", http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 840fbc1..a2d99f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -196,6 +196,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // The test tables that are defined in the Hive QTestUtil. // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java + // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql val hiveQTestUtilTables = Seq( TestTable("src", "CREATE TABLE src (key INT, value STRING)".cmd, http://git-wip-us.apache.org/repos/asf/spark/blob/9d281fa5/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada -- diff --git a/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada new file mode 100644 index 000..573541a --- /dev/null +++ b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a
spark git commit: [SQL] [Minor] Update the HiveContext Unittest
Repository: spark Updated Branches: refs/heads/branch-1.3 d74d5e86a -> 01356514e [SQL] [Minor] Update the HiveContext Unittest In unit test, the table src(key INT, value STRING) is not the same as HIVE src(key STRING, value STRING) https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql And in the reflect.q, test failed for expression `reflect("java.lang.Integer", "valueOf", key, 16)`, which expect the argument `key` as STRING not INT. This PR doesn't aim to change the `src` schema, we can do that after 1.3 released, however, we probably need to re-generate all the golden files. Author: Cheng Hao Closes #4584 from chenghao-intel/reflect and squashes the following commits: e5bdc3a [Cheng Hao] Move the test case reflect into blacklist 184abfd [Cheng Hao] revert the change to table src1 d9bcf92 [Cheng Hao] Update the HiveContext Unittest (cherry picked from commit 9d281fa56022800dc008a3de233fec44379a2bd7) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01356514 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01356514 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01356514 Branch: refs/heads/branch-1.3 Commit: 01356514ef42321e09e9a67ba08366bb2bd5af4b Parents: d74d5e8 Author: Cheng Hao Authored: Tue Feb 17 12:25:35 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:25:43 2015 -0800 -- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 6 ++ .../main/scala/org/apache/spark/sql/hive/test/TestHive.scala | 1 + .../golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada | 1 + .../golden/udf_reflect2-1-7bec330c7bc6f71cbaf9bf1883d1b184 | 1 + .../golden/udf_reflect2-2-c5a05379f482215a5a484bed0299bf19 | 3 +++ .../golden/udf_reflect2-3-effc057c78c00b0af26a4ac0f5f116ca | 0 .../golden/udf_reflect2-4-73d466e70e96e9e5f0cd373b37d4e1f4 | 5 + 7 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 133f2d3..c6ead45 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -225,6 +225,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // Needs constant object inspectors "udf_round", +// the table src(key INT, value STRING) is not the same as HIVE unittest. In Hive +// is src(key STRING, value STRING), and in the reflect.q, it failed in +// Integer.valueOf, which expect the first argument passed as STRING type not INT. +"udf_reflect", + // Sort with Limit clause causes failure. "ctas", "ctas_hadoop20", @@ -886,6 +891,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { "udf_power", "udf_radians", "udf_rand", +"udf_reflect2", "udf_regexp", "udf_regexp_extract", "udf_regexp_replace", http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 840fbc1..a2d99f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -196,6 +196,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // The test tables that are defined in the Hive QTestUtil. // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java + // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql val hiveQTestUtilTables = Seq( TestTable("src", "CREATE TABLE src (key INT, value STRING)".cmd, http://git-wip-us.apache.org/repos/asf/spark/blob/01356514/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada -- diff --git a/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada b/sql/hive/src/test/resources/golden/udf_reflect2-0-50131c0ba7b7a6b65c789a5a8497bada new file mode 100
spark git commit: [Minor][SQL] Use same function to check path parameter in JSONRelation
Repository: spark Updated Branches: refs/heads/branch-1.3 62063b7a3 -> d74d5e86a [Minor][SQL] Use same function to check path parameter in JSONRelation Author: Liang-Chi Hsieh Closes #4649 from viirya/use_checkpath and squashes the following commits: 0f9a1a1 [Liang-Chi Hsieh] Use same function to check path parameter. (cherry picked from commit ac506b7c2846f656e03839bbd0e93827c7cc613e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d74d5e86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d74d5e86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d74d5e86 Branch: refs/heads/branch-1.3 Commit: d74d5e86abcb2ee4f142bb76f641d66cb4ffeb42 Parents: 62063b7 Author: Liang-Chi Hsieh Authored: Tue Feb 17 12:24:13 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:24:21 2015 -0800 -- .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala | 4 ++-- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala| 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d74d5e86/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 2484863..3b68b7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -37,7 +37,7 @@ private[sql] class DefaultSource override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { -val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) +val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) JSONRelation(path, samplingRatio, None)(sqlContext) @@ -48,7 +48,7 @@ private[sql] class DefaultSource sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = { -val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) +val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) JSONRelation(path, samplingRatio, Some(schema))(sqlContext) http://git-wip-us.apache.org/repos/asf/spark/blob/d74d5e86/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0263e3b..485d5c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -547,7 +547,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { Map.empty[String, String]) }.getMessage assert( - message.contains("Option 'path' not specified"), + message.contains("'path' must be specified for json data."), "We should complain that path is not specified.") sql("DROP TABLE savedJsonTable") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog
Repository: spark Updated Branches: refs/heads/branch-1.3 5636c4a58 -> 62063b7a3 [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the given plan multiple times if there are many Metastore Parquet tables. Since the transformUp operation is recursive, it should be better to only perform it once. Author: Liang-Chi Hsieh Closes #4651 from viirya/parquet_atonce and squashes the following commits: c1ed29d [Liang-Chi Hsieh] Fix bug. e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once. (cherry picked from commit 4611de1cef7363bc71ec608560dfd866ae477747) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62063b7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62063b7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62063b7a Branch: refs/heads/branch-1.3 Commit: 62063b7a3e2db9fc7320739d3b900a7840c2dee7 Parents: 5636c4a Author: Liang-Chi Hsieh Authored: Tue Feb 17 12:23:18 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:23:26 2015 -0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 37 +++- 1 file changed, 20 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62063b7a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa..cfd6f27 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) // Read path case p @ PhysicalOperation(_, _, relation: MetastoreRelation) if hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) } + val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap + val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) + // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes // attribute IDs referenced in other nodes. - toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) => -val parquetRelation = convertToParquetRelation(relation) -val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) - -lastPlan.transformUp { - case r: MetastoreRelation if r == relation => { -val withAlias = - r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( -Subquery(r.tableName, parquetRelation)) - -withAlias - } - case other => other.transformExpressions { -case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) - } + plan.transformUp { +case r: MetastoreRelation if relationMap.contains(r) => { + val parquetRelation = relationMap(r) + val withAlias = +r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( + Subquery(r.tableName, parquetRelation)) + + withAlias +} +case other => other.transformExpressions { + case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog
Repository: spark Updated Branches: refs/heads/master 31efb39c1 -> 4611de1ce [SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the given plan multiple times if there are many Metastore Parquet tables. Since the transformUp operation is recursive, it should be better to only perform it once. Author: Liang-Chi Hsieh Closes #4651 from viirya/parquet_atonce and squashes the following commits: c1ed29d [Liang-Chi Hsieh] Fix bug. e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4611de1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4611de1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4611de1c Branch: refs/heads/master Commit: 4611de1cef7363bc71ec608560dfd866ae477747 Parents: 31efb39 Author: Liang-Chi Hsieh Authored: Tue Feb 17 12:23:18 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:23:18 2015 -0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 37 +++- 1 file changed, 20 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4611de1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e43faa..cfd6f27 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) // Read path case p @ PhysicalOperation(_, _, relation: MetastoreRelation) if hive.convertMetastoreParquet && hive.conf.parquetUseDataSourceApi && relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - relation + val parquetRelation = convertToParquetRelation(relation) + val attributedRewrites = relation.output.zip(parquetRelation.output) + (relation, parquetRelation, attributedRewrites) } + val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap + val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _)) + // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes // attribute IDs referenced in other nodes. - toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) => -val parquetRelation = convertToParquetRelation(relation) -val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) - -lastPlan.transformUp { - case r: MetastoreRelation if r == relation => { -val withAlias = - r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( -Subquery(r.tableName, parquetRelation)) - -withAlias - } - case other => other.transformExpressions { -case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) - } + plan.transformUp { +case r: MetastoreRelation if relationMap.contains(r) => { + val parquetRelation = relationMap(r) + val withAlias = +r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( + Subquery(r.tableName, parquetRelation)) + + withAlias +} +case other => other.transformExpressions { + case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Minor] fix typo in SQL document
Repository: spark Updated Branches: refs/heads/master fc4eb9505 -> 31efb39c1 [Minor] fix typo in SQL document Author: CodingCat Closes #4656 from CodingCat/fix_typo and squashes the following commits: b41d15c [CodingCat] recover 689fe46 [CodingCat] fix typo Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31efb39c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31efb39c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31efb39c Branch: refs/heads/master Commit: 31efb39c1deb253032b38e8fbafde4b2b1dde1f6 Parents: fc4eb95 Author: CodingCat Authored: Tue Feb 17 12:16:52 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:16:52 2015 -0800 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31efb39c/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8022c5e..0146a4e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -159,7 +159,7 @@ you to construct DataFrames when the columns and their types are not known until -The Scala interaface for Spark SQL supports automatically converting an RDD containing case classes +The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Minor] fix typo in SQL document
Repository: spark Updated Branches: refs/heads/branch-1.3 71cf6e295 -> 5636c4a58 [Minor] fix typo in SQL document Author: CodingCat Closes #4656 from CodingCat/fix_typo and squashes the following commits: b41d15c [CodingCat] recover 689fe46 [CodingCat] fix typo (cherry picked from commit 31efb39c1deb253032b38e8fbafde4b2b1dde1f6) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5636c4a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5636c4a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5636c4a5 Branch: refs/heads/branch-1.3 Commit: 5636c4a583bd28a5b54e14930d72fd9265c301de Parents: 71cf6e2 Author: CodingCat Authored: Tue Feb 17 12:16:52 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 12:17:05 2015 -0800 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5636c4a5/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8022c5e..0146a4e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -159,7 +159,7 @@ you to construct DataFrames when the columns and their types are not known until -The Scala interaface for Spark SQL supports automatically converting an RDD containing case classes +The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5864] [PySpark] support .jar as python package
Repository: spark Updated Branches: refs/heads/branch-1.3 e64afcd84 -> 71cf6e295 [SPARK-5864] [PySpark] support .jar as python package A jar file containing Python sources in it could be used as a Python package, just like zip file. spark-submit already put the jar file into PYTHONPATH, this patch also put it in the sys.path, then it could be used in Python worker. Author: Davies Liu Closes #4652 from davies/jar and squashes the following commits: 17d3f76 [Davies Liu] support .jar as python package (cherry picked from commit fc4eb9505adda192eb38cb4454d532027690bfa3) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71cf6e29 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71cf6e29 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71cf6e29 Branch: refs/heads/branch-1.3 Commit: 71cf6e295111e2522ef161b41cf9ee55db29fc6c Parents: e64afcd Author: Davies Liu Authored: Tue Feb 17 12:05:06 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 12:05:19 2015 -0800 -- python/pyspark/context.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/71cf6e29/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf1f61c..40b3152 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -64,6 +64,8 @@ class SparkContext(object): _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH +PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): @@ -185,7 +187,7 @@ class SparkContext(object): for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) -if filename.lower().endswith("zip") or filename.lower().endswith("egg"): +if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) @@ -705,7 +707,7 @@ class SparkContext(object): self.addFile(path) (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix -if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): +if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5864] [PySpark] support .jar as python package
Repository: spark Updated Branches: refs/heads/master 49c19fdba -> fc4eb9505 [SPARK-5864] [PySpark] support .jar as python package A jar file containing Python sources in it could be used as a Python package, just like zip file. spark-submit already put the jar file into PYTHONPATH, this patch also put it in the sys.path, then it could be used in Python worker. Author: Davies Liu Closes #4652 from davies/jar and squashes the following commits: 17d3f76 [Davies Liu] support .jar as python package Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc4eb950 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc4eb950 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc4eb950 Branch: refs/heads/master Commit: fc4eb9505adda192eb38cb4454d532027690bfa3 Parents: 49c19fd Author: Davies Liu Authored: Tue Feb 17 12:05:06 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 12:05:06 2015 -0800 -- python/pyspark/context.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc4eb950/python/pyspark/context.py -- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index bf1f61c..40b3152 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -64,6 +64,8 @@ class SparkContext(object): _lock = Lock() _python_includes = None # zip and egg files that need to be added to PYTHONPATH +PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): @@ -185,7 +187,7 @@ class SparkContext(object): for path in self._conf.get("spark.submit.pyFiles", "").split(","): if path != "": (dirname, filename) = os.path.split(path) -if filename.lower().endswith("zip") or filename.lower().endswith("egg"): +if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) @@ -705,7 +707,7 @@ class SparkContext(object): self.addFile(path) (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix -if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): +if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) # for tests in local mode sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager
Repository: spark Updated Branches: refs/heads/master 24f358b9d -> 49c19fdba SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager Avoid call to remove shutdown hook being called from shutdown hook CC pwendell JoshRosen MattWhelan Author: Sean Owen Closes #4648 from srowen/SPARK-5841.2 and squashes the following commits: 51548db [Sean Owen] Avoid call to remove shutdown hook being called from shutdown hook Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/49c19fdb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/49c19fdb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/49c19fdb Branch: refs/heads/master Commit: 49c19fdbad57f0609bbcc9278f9eaa8115a73604 Parents: 24f358b Author: Sean Owen Authored: Tue Feb 17 19:40:06 2015 + Committer: Sean Owen Committed: Tue Feb 17 19:40:06 2015 + -- .../main/scala/org/apache/spark/storage/DiskBlockManager.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/49c19fdb/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ae9df8c..b297f3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -138,7 +138,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon val shutdownHook = new Thread("delete Spark local dirs") { override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") -DiskBlockManager.this.stop() +DiskBlockManager.this.doStop() } } Runtime.getRuntime.addShutdownHook(shutdownHook) @@ -149,7 +149,10 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon private[spark] def stop() { // Remove the shutdown hook. It causes memory leaks if we leave it around. Runtime.getRuntime.removeShutdownHook(shutdownHook) +doStop() + } + private def doStop(): Unit = { // Only perform cleanup if an external service is not serving our shuffle files. if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) { localDirs.foreach { localDir => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager
Repository: spark Updated Branches: refs/heads/branch-1.3 420bc9b3a -> e64afcd84 SPARK-5841 [CORE] [HOTFIX] Memory leak in DiskBlockManager Avoid call to remove shutdown hook being called from shutdown hook CC pwendell JoshRosen MattWhelan Author: Sean Owen Closes #4648 from srowen/SPARK-5841.2 and squashes the following commits: 51548db [Sean Owen] Avoid call to remove shutdown hook being called from shutdown hook (cherry picked from commit 49c19fdbad57f0609bbcc9278f9eaa8115a73604) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e64afcd8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e64afcd8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e64afcd8 Branch: refs/heads/branch-1.3 Commit: e64afcd8449c161ea134125ca469f5b9c09dbc60 Parents: 420bc9b Author: Sean Owen Authored: Tue Feb 17 19:40:06 2015 + Committer: Sean Owen Committed: Tue Feb 17 19:40:15 2015 + -- .../main/scala/org/apache/spark/storage/DiskBlockManager.scala | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e64afcd8/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ae9df8c..b297f3f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -138,7 +138,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon val shutdownHook = new Thread("delete Spark local dirs") { override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") -DiskBlockManager.this.stop() +DiskBlockManager.this.doStop() } } Runtime.getRuntime.addShutdownHook(shutdownHook) @@ -149,7 +149,10 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon private[spark] def stop() { // Remove the shutdown hook. It causes memory leaks if we leave it around. Runtime.getRuntime.removeShutdownHook(shutdownHook) +doStop() + } + private def doStop(): Unit = { // Only perform cleanup if an external service is not serving our shuffle files. if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) { localDirs.foreach { localDir => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: MAINTENANCE: Automated closing of pull requests.
Repository: spark Updated Branches: refs/heads/master 9b746f380 -> 24f358b9d MAINTENANCE: Automated closing of pull requests. This commit exists to close the following pull requests on Github: Closes #3297 (close requested by 'andrewor14') Closes #3345 (close requested by 'pwendell') Closes #2729 (close requested by 'srowen') Closes #2320 (close requested by 'pwendell') Closes #4529 (close requested by 'andrewor14') Closes #2098 (close requested by 'srowen') Closes #4120 (close requested by 'andrewor14') Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24f358b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24f358b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24f358b9 Branch: refs/heads/master Commit: 24f358b9d6bc7a72a4fb493b7f845a40ed941a5d Parents: 9b746f3 Author: Patrick Wendell Authored: Tue Feb 17 11:35:26 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 11:35:26 2015 -0800 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file
Repository: spark Updated Branches: refs/heads/branch-1.3 2bf2b56ef -> 420bc9b3a [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file hasShutdownDeleteTachyonDir(file: TachyonFile) should use shutdownDeleteTachyonPaths(not shutdownDeletePaths) to determine Whether contain file. To solve it ,delete two unused function. Author: xukun 00228947 Author: viper-kun Closes #4418 from viper-kun/deleteunusedfun and squashes the following commits: 87340eb [viper-kun] fix style 3d6c69e [xukun 00228947] fix bug 2bc397e [xukun 00228947] deleteunusedfun (cherry picked from commit b271c265b742fa6947522eda4592e9e6a7fd1f3a) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/420bc9b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/420bc9b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/420bc9b3 Branch: refs/heads/branch-1.3 Commit: 420bc9b3ae49883f17962b1cff18a3b8df3bd05c Parents: 2bf2b56 Author: xukun 00228947 Authored: Tue Feb 17 18:59:41 2015 + Committer: Sean Owen Committed: Tue Feb 17 18:59:51 2015 + -- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/420bc9b3/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c06bd6f..df21ed3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -213,8 +213,8 @@ private[spark] object Utils extends Logging { // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { val absolutePath = file.getPath() -shutdownDeletePaths.synchronized { - shutdownDeletePaths.contains(absolutePath) +shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths.contains(absolutePath) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-3381] [MLlib] Eliminate bins for unordered features in DecisionTrees
Repository: spark Updated Branches: refs/heads/master b271c265b -> 9b746f380 [SPARK-3381] [MLlib] Eliminate bins for unordered features in DecisionTrees For unordered features, it is sufficient to use splits since the threshold of the split corresponds the threshold of the HighSplit of the bin and there is no use of the LowSplit. Author: MechCoder Closes #4231 from MechCoder/spark-3381 and squashes the following commits: 58c19a5 [MechCoder] COSMIT c274b74 [MechCoder] Remove unordered feature calculation in labeledPointToTreePoint b2b9b89 [MechCoder] COSMIT d3ee042 [MechCoder] [SPARK-3381] [MLlib] Eliminate bins for unordered features Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b746f38 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b746f38 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b746f38 Branch: refs/heads/master Commit: 9b746f380869b54d673e3758ca5e4475f76c864a Parents: b271c26 Author: MechCoder Authored: Tue Feb 17 11:19:23 2015 -0800 Committer: Joseph K. Bradley Committed: Tue Feb 17 11:19:23 2015 -0800 -- .../apache/spark/mllib/tree/DecisionTree.scala | 37 ++-- .../spark/mllib/tree/impl/TreePoint.scala | 14 +++- .../spark/mllib/tree/DecisionTreeSuite.scala| 37 +--- 3 files changed, 15 insertions(+), 73 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b746f38/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala index f1f8599..b9d0c56 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala @@ -327,14 +327,14 @@ object DecisionTree extends Serializable with Logging { * @param agg Array storing aggregate calculation, with a set of sufficient statistics for * each (feature, bin). * @param treePoint Data point being aggregated. - * @param bins possible bins for all features, indexed (numFeatures)(numBins) + * @param splits possible splits indexed (numFeatures)(numSplits) * @param unorderedFeatures Set of indices of unordered features. * @param instanceWeight Weight (importance) of instance in dataset. */ private def mixedBinSeqOp( agg: DTStatsAggregator, treePoint: TreePoint, - bins: Array[Array[Bin]], + splits: Array[Array[Split]], unorderedFeatures: Set[Int], instanceWeight: Double, featuresForNode: Option[Array[Int]]): Unit = { @@ -362,7 +362,7 @@ object DecisionTree extends Serializable with Logging { val numSplits = agg.metadata.numSplits(featureIndex) var splitIndex = 0 while (splitIndex < numSplits) { - if (bins(featureIndex)(splitIndex).highSplit.categories.contains(featureValue)) { + if (splits(featureIndex)(splitIndex).categories.contains(featureValue)) { agg.featureUpdate(leftNodeFeatureOffset, splitIndex, treePoint.label, instanceWeight) } else { @@ -506,8 +506,8 @@ object DecisionTree extends Serializable with Logging { if (metadata.unorderedFeatures.isEmpty) { orderedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, instanceWeight, featuresForNode) } else { - mixedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, bins, metadata.unorderedFeatures, -instanceWeight, featuresForNode) + mixedBinSeqOp(agg(aggNodeIndex), baggedPoint.datum, splits, +metadata.unorderedFeatures, instanceWeight, featuresForNode) } } } @@ -1024,35 +1024,15 @@ object DecisionTree extends Serializable with Logging { // Categorical feature val featureArity = metadata.featureArity(featureIndex) if (metadata.isUnordered(featureIndex)) { - // TODO: The second half of the bins are unused. Actually, we could just use - // splits and not build bins for unordered features. That should be part of - // a later PR since it will require changing other code (using splits instead - // of bins in a few places). // Unordered features - // 2^(maxFeatureValue - 1) - 1 combinations + // 2^(maxFeatureValue - 1) - 1 combinations splits(featureIndex) = new Array[Split](numSplits) - bins(featureIndex) = new Array[Bin](numBins) var splitIndex = 0 while (splitIndex < numSplits) { val categories: List[Double]
spark git commit: [SPARK-5778] throw if nonexistent metrics config file provided
Repository: spark Updated Branches: refs/heads/branch-1.3 4a581aa3f -> 2bf2b56ef [SPARK-5778] throw if nonexistent metrics config file provided previous behavior was to log an error; this is fine in the general case where no `spark.metrics.conf` parameter was specified, in which case a default `metrics.properties` is looked for, and the execption logged and suppressed if it doesn't exist. if the user has purposefully specified a metrics.conf file, however, it makes more sense to show them an error when said file doesn't exist. Author: Ryan Williams Closes #4571 from ryan-williams/metrics and squashes the following commits: 5bccb14 [Ryan Williams] private-ize some MetricsConfig members 08ff998 [Ryan Williams] rename METRICS_CONF: DEFAULT_METRICS_CONF_FILENAME f4d7fab [Ryan Williams] fix tests ad24b0e [Ryan Williams] add "metrics.properties" to .rat-excludes 94e810b [Ryan Williams] throw if nonexistent Sink class is specified 31d2c30 [Ryan Williams] metrics code review feedback 56287db [Ryan Williams] throw if nonexistent metrics config file provided (cherry picked from commit d8f69cf78862d13a48392a0b94388b8d403523da) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bf2b56e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bf2b56e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bf2b56e Branch: refs/heads/branch-1.3 Commit: 2bf2b56ef0c0fbb4cead030e44910453b2cbb6fd Parents: 4a581aa Author: Ryan Williams Authored: Tue Feb 17 10:57:16 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 10:57:47 2015 -0800 -- .rat-excludes | 1 + .../apache/spark/metrics/MetricsConfig.scala| 32 +++- .../apache/spark/metrics/MetricsSystem.scala| 5 ++- .../resources/test_metrics_system.properties| 2 -- .../spark/metrics/MetricsConfigSuite.scala | 2 +- 5 files changed, 23 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2bf2b56e/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 769defb..e3c0331 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -18,6 +18,7 @@ fairscheduler.xml.template spark-defaults.conf.template log4j.properties log4j.properties.template +metrics.properties metrics.properties.template slaves slaves.template http://git-wip-us.apache.org/repos/asf/spark/blob/2bf2b56e/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 1b7a5d1..8edf493 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -28,12 +28,12 @@ import org.apache.spark.util.Utils private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { - val DEFAULT_PREFIX = "*" - val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r - val METRICS_CONF = "metrics.properties" + private val DEFAULT_PREFIX = "*" + private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r + private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" - val properties = new Properties() - var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] val properties = new Properties() + private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi setDefaultProperties(properties) // If spark.metrics.conf is not set, try to get file in class path -var is: InputStream = null -try { - is = configFile match { -case Some(f) => new FileInputStream(f) -case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF) +val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse { + try { + Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)) + } catch { +case e: Exception => + logError("Error loading default configuration file", e) + None } +} - if (is != null) { +isOpt.foreach { is => + try { properties.load(is) + } finally { +is.close() } -} catch { - case e: Exception => logError("Error loading configure file", e) -} finally { - if (is != null
spark git commit: [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file
Repository: spark Updated Branches: refs/heads/master d8f69cf78 -> b271c265b [SPARK-5661]function hasShutdownDeleteTachyonDir should use shutdownDeleteTachyonPaths to determine whether contains file hasShutdownDeleteTachyonDir(file: TachyonFile) should use shutdownDeleteTachyonPaths(not shutdownDeletePaths) to determine Whether contain file. To solve it ,delete two unused function. Author: xukun 00228947 Author: viper-kun Closes #4418 from viper-kun/deleteunusedfun and squashes the following commits: 87340eb [viper-kun] fix style 3d6c69e [xukun 00228947] fix bug 2bc397e [xukun 00228947] deleteunusedfun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b271c265 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b271c265 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b271c265 Branch: refs/heads/master Commit: b271c265b742fa6947522eda4592e9e6a7fd1f3a Parents: d8f69cf Author: xukun 00228947 Authored: Tue Feb 17 18:59:41 2015 + Committer: Sean Owen Committed: Tue Feb 17 18:59:41 2015 + -- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b271c265/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c06bd6f..df21ed3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -213,8 +213,8 @@ private[spark] object Utils extends Logging { // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { val absolutePath = file.getPath() -shutdownDeletePaths.synchronized { - shutdownDeletePaths.contains(absolutePath) +shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths.contains(absolutePath) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5778] throw if nonexistent metrics config file provided
Repository: spark Updated Branches: refs/heads/master d8adefefc -> d8f69cf78 [SPARK-5778] throw if nonexistent metrics config file provided previous behavior was to log an error; this is fine in the general case where no `spark.metrics.conf` parameter was specified, in which case a default `metrics.properties` is looked for, and the execption logged and suppressed if it doesn't exist. if the user has purposefully specified a metrics.conf file, however, it makes more sense to show them an error when said file doesn't exist. Author: Ryan Williams Closes #4571 from ryan-williams/metrics and squashes the following commits: 5bccb14 [Ryan Williams] private-ize some MetricsConfig members 08ff998 [Ryan Williams] rename METRICS_CONF: DEFAULT_METRICS_CONF_FILENAME f4d7fab [Ryan Williams] fix tests ad24b0e [Ryan Williams] add "metrics.properties" to .rat-excludes 94e810b [Ryan Williams] throw if nonexistent Sink class is specified 31d2c30 [Ryan Williams] metrics code review feedback 56287db [Ryan Williams] throw if nonexistent metrics config file provided Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f69cf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f69cf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f69cf7 Branch: refs/heads/master Commit: d8f69cf78862d13a48392a0b94388b8d403523da Parents: d8adefe Author: Ryan Williams Authored: Tue Feb 17 10:57:16 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 10:57:16 2015 -0800 -- .rat-excludes | 1 + .../apache/spark/metrics/MetricsConfig.scala| 32 +++- .../apache/spark/metrics/MetricsSystem.scala| 5 ++- .../resources/test_metrics_system.properties| 2 -- .../spark/metrics/MetricsConfigSuite.scala | 2 +- 5 files changed, 23 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8f69cf7/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index a788e82..8c61e67 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -19,6 +19,7 @@ fairscheduler.xml.template spark-defaults.conf.template log4j.properties log4j.properties.template +metrics.properties metrics.properties.template slaves slaves.template http://git-wip-us.apache.org/repos/asf/spark/blob/d8f69cf7/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 1b7a5d1..8edf493 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -28,12 +28,12 @@ import org.apache.spark.util.Utils private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { - val DEFAULT_PREFIX = "*" - val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r - val METRICS_CONF = "metrics.properties" + private val DEFAULT_PREFIX = "*" + private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r + private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" - val properties = new Properties() - var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] val properties = new Properties() + private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -47,20 +47,22 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi setDefaultProperties(properties) // If spark.metrics.conf is not set, try to get file in class path -var is: InputStream = null -try { - is = configFile match { -case Some(f) => new FileInputStream(f) -case None => Utils.getSparkClassLoader.getResourceAsStream(METRICS_CONF) +val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse { + try { + Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)) + } catch { +case e: Exception => + logError("Error loading default configuration file", e) + None } +} - if (is != null) { +isOpt.foreach { is => + try { properties.load(is) + } finally { +is.close() } -} catch { - case e: Exception => logError("Error loading configure file", e) -} finally { - if (is != null) is.close() } propertyCategories = subProperties(properties, INSTANCE_REGEX) http://git-wip-us.a
spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API
Repository: spark Updated Branches: refs/heads/branch-1.3 cd3d41587 -> 4a581aa3f [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API 1. added explain() 2. add isLocal() 3. do not call show() in __repl__ 4. add foreach() and foreachPartition() 5. add distinct() 6. fix functions.col()/column()/lit() 7. fix unit tests in sql/functions.py 8. fix unicode in showString() Author: Davies Liu Closes #4645 from davies/df6 and squashes the following commits: 6b46a2c [Davies Liu] fix DataFrame Python API (cherry picked from commit d8adefefcc2a4af32295440ed1d4917a6968f017) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a581aa3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a581aa3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a581aa3 Branch: refs/heads/branch-1.3 Commit: 4a581aa3f9144732cc0f6dab6e46f72035e072f4 Parents: cd3d415 Author: Davies Liu Authored: Tue Feb 17 10:22:48 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 10:22:56 2015 -0800 -- python/pyspark/sql/dataframe.py | 65 ++-- python/pyspark/sql/functions.py | 12 +++ 2 files changed, 59 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a581aa3/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 28a59e7..8417240 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -238,6 +238,22 @@ class DataFrame(object): """ print (self._jdf.schema().treeString()) +def explain(self, extended=False): +""" +Prints the plans (logical and physical) to the console for +debugging purpose. + +If extended is False, only prints the physical plan. +""" +self._jdf.explain(extended) + +def isLocal(self): +""" +Returns True if the `collect` and `take` methods can be run locally +(without any Spark executors). +""" +return self._jdf.isLocal() + def show(self): """ Print the first 20 rows. @@ -247,14 +263,12 @@ class DataFrame(object): 2 Alice 5 Bob >>> df -age name -2 Alice -5 Bob +DataFrame[age: int, name: string] """ -print (self) +print self._jdf.showString().encode('utf8', 'ignore') def __repr__(self): -return self._jdf.showString() +return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def count(self): """Return the number of elements in this RDD. @@ -336,6 +350,8 @@ class DataFrame(object): """ Return a new RDD by applying a function to each partition. +It's a shorthand for df.rdd.mapPartitions() + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(iterator): yield 1 >>> rdd.mapPartitions(f).sum() @@ -343,6 +359,31 @@ class DataFrame(object): """ return self.rdd.mapPartitions(f, preservesPartitioning) +def foreach(self, f): +""" +Applies a function to all rows of this DataFrame. + +It's a shorthand for df.rdd.foreach() + +>>> def f(person): +... print person.name +>>> df.foreach(f) +""" +return self.rdd.foreach(f) + +def foreachPartition(self, f): +""" +Applies a function to each partition of this DataFrame. + +It's a shorthand for df.rdd.foreachPartition() + +>>> def f(people): +... for person in people: +... print person.name +>>> df.foreachPartition(f) +""" +return self.rdd.foreachPartition(f) + def cache(self): """ Persist with the default storage level (C{MEMORY_ONLY_SER}). """ @@ -377,8 +418,13 @@ class DataFrame(object): """ Return a new :class:`DataFrame` that has exactly `numPartitions` partitions. """ -rdd = self._jdf.repartition(numPartitions, None) -return DataFrame(rdd, self.sql_ctx) +return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx) + +def distinct(self): +""" +Return a new :class:`DataFrame` containing the distinct rows in this DataFrame. +""" +return DataFrame(self._jdf.distinct(), self.sql_ctx) def sample(self, withReplacement, fraction, seed=None): """ @@ -957,10 +1003,7 @@ class Column(DataFrame): return Column(jc, self.sql_ctx) def __repr__(self): -if self._jdf.isComputable(): -return self._jdf.samples() -else: -ret
[1/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation
Repository: spark Updated Branches: refs/heads/branch-1.3 97cb568a2 -> cd3d41587 http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala index 89920f2..4f38110 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala @@ -143,7 +143,7 @@ class MySQLDatabase { } test("Basic test") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl") val rows = rdd.collect assert(rows.length == 2) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -153,7 +153,7 @@ class MySQLDatabase { } test("Numeric types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -181,7 +181,7 @@ class MySQLDatabase { } test("Date types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -199,7 +199,7 @@ class MySQLDatabase { } test("String types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -225,9 +225,9 @@ class MySQLDatabase { } test("Basic write test") { -val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") -val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") -val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") +val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers") +val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates") +val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings") rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false) rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false) rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false) http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala index c174d7a..7b47fee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala @@ -113,7 +113,7 @@ class PostgresDatabase { } test("Type mapping for various types") { -val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") +val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -142,7 +142,7 @@ class PostgresDatabase { } test("Basic write test") { -val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") +val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") rdd.createJDBCTable(url(db.ip), "public.barcopy", false) // Test only that it doesn't bomb out. } http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index bfacc51..07b5a84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.SQLContext /** - * Implementation for "describe [extended] table". - * * :: DeveloperApi :: + * + * Implementation for "describe [extended] table". */ @DeveloperApi case class DescribeHiveTableCommand( http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/
[1/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation
Repository: spark Updated Branches: refs/heads/master c76da36c2 -> c74b07fa9 http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala index 89920f2..4f38110 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegration.scala @@ -143,7 +143,7 @@ class MySQLDatabase { } test("Basic test") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl") val rows = rdd.collect assert(rows.length == 2) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -153,7 +153,7 @@ class MySQLDatabase { } test("Numeric types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -181,7 +181,7 @@ class MySQLDatabase { } test("Date types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -199,7 +199,7 @@ class MySQLDatabase { } test("String types") { -val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") +val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -225,9 +225,9 @@ class MySQLDatabase { } test("Basic write test") { -val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers") -val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates") -val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings") +val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers") +val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates") +val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings") rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false) rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false) rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false) http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala index c174d7a..7b47fee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegration.scala @@ -113,7 +113,7 @@ class PostgresDatabase { } test("Type mapping for various types") { -val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") +val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") val rows = rdd.collect assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -142,7 +142,7 @@ class PostgresDatabase { } test("Basic write test") { -val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar") +val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar") rdd.createJDBCTable(url(db.ip), "public.barcopy", false) // Test only that it doesn't bomb out. } http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index bfacc51..07b5a84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.SQLContext /** - * Implementation for "describe [extended] table". - * * :: DeveloperApi :: + * + * Implementation for "describe [extended] table". */ @DeveloperApi case class DescribeHiveTableCommand( http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/
[2/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation
[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation Author: Michael Armbrust Closes #4642 from marmbrus/docs and squashes the following commits: d291c34 [Michael Armbrust] python tests 9be66e3 [Michael Armbrust] comments d56afc2 [Michael Armbrust] fix style f004747 [Michael Armbrust] fix build c4a907b [Michael Armbrust] fix tests 42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up. (cherry picked from commit c74b07fa94a8da50437d952ae05cf6ac70fbb93e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd3d4158 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd3d4158 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd3d4158 Branch: refs/heads/branch-1.3 Commit: cd3d4158721b5c3cc35df47675f4f4d9540be6f1 Parents: 97cb568 Author: Michael Armbrust Authored: Tue Feb 17 10:21:17 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 10:21:33 2015 -0800 -- project/SparkBuild.scala| 12 +- python/pyspark/sql/context.py | 28 +-- .../org/apache/spark/sql/jdbc/JDBCUtils.java| 59 -- .../scala/org/apache/spark/sql/DataFrame.scala | 153 +- .../org/apache/spark/sql/DataFrameImpl.scala| 33 ++- .../apache/spark/sql/ExperimentalMethods.scala | 5 + .../apache/spark/sql/IncomputableColumn.scala | 4 + .../scala/org/apache/spark/sql/SQLContext.scala | 200 +++ .../apache/spark/sql/UserDefinedFunction.scala | 3 +- .../org/apache/spark/sql/api/package.scala | 23 +++ .../apache/spark/sql/execution/commands.scala | 2 +- .../spark/sql/execution/debug/package.scala | 10 +- .../spark/sql/jdbc/JavaJDBCTrampoline.scala | 30 --- .../scala/org/apache/spark/sql/jdbc/jdbc.scala | 74 ++- .../sql/parquet/ParquetTableOperations.scala| 4 +- .../apache/spark/sql/parquet/ParquetTest.scala | 4 +- .../apache/spark/sql/parquet/newParquet.scala | 6 +- .../spark/sql/parquet/timestamp/NanoTime.scala | 2 +- .../org/apache/spark/sql/sources/ddl.scala | 4 +- .../org/apache/spark/sql/jdbc/JavaJDBCTest.java | 102 -- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 20 +- .../spark/sql/jdbc/MySQLIntegration.scala | 14 +- .../spark/sql/jdbc/PostgresIntegration.scala| 4 +- .../execution/DescribeHiveTableCommand.scala| 4 +- .../spark/sql/hive/execution/commands.scala | 8 + .../spark/sql/hive/execution/package.scala | 25 +++ .../org/apache/spark/sql/hive/package.scala | 10 + .../sql/hive/parquet/FakeParquetSerDe.scala | 56 -- .../org/apache/spark/sql/hive/Shim12.scala | 9 +- .../org/apache/spark/sql/hive/Shim13.scala | 9 +- 31 files changed, 501 insertions(+), 423 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8fb1239..e4b1b96 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,9 +361,16 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn), + +// Skip actual catalyst, but include the subproject. +// Catalyst is not public API and contains quasiquotes which break scaladoc. +unidocAllSources in (ScalaUnidoc, unidoc) := { + (unidocAllSources in (ScalaUnidoc, unidoc)).value +.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) +}, // Skip class names containing $ and some internal packages in Javadocs unidocAllSources in (JavaUnidoc, unidoc) := { @@ -376,6 +383,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("executor"))) .map(_.filterNot(_.getCanonicalPath.contains("python"))) .map(_.filterNot(_.getCanonicalPath.contains("collection"))) +.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) }, // Javadoc options: create a window title, and group key packages on index page http://git-wip-us.apache.org/repos/asf/spark/blob/cd3d4158/python/pyspark/sql/context.py ---
spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API
Repository: spark Updated Branches: refs/heads/master c74b07fa9 -> d8adefefc [SPARK-5859] [PySpark] [SQL] fix DataFrame Python API 1. added explain() 2. add isLocal() 3. do not call show() in __repl__ 4. add foreach() and foreachPartition() 5. add distinct() 6. fix functions.col()/column()/lit() 7. fix unit tests in sql/functions.py 8. fix unicode in showString() Author: Davies Liu Closes #4645 from davies/df6 and squashes the following commits: 6b46a2c [Davies Liu] fix DataFrame Python API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8adefef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8adefef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8adefef Branch: refs/heads/master Commit: d8adefefcc2a4af32295440ed1d4917a6968f017 Parents: c74b07f Author: Davies Liu Authored: Tue Feb 17 10:22:48 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 10:22:48 2015 -0800 -- python/pyspark/sql/dataframe.py | 65 ++-- python/pyspark/sql/functions.py | 12 +++ 2 files changed, 59 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 28a59e7..8417240 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -238,6 +238,22 @@ class DataFrame(object): """ print (self._jdf.schema().treeString()) +def explain(self, extended=False): +""" +Prints the plans (logical and physical) to the console for +debugging purpose. + +If extended is False, only prints the physical plan. +""" +self._jdf.explain(extended) + +def isLocal(self): +""" +Returns True if the `collect` and `take` methods can be run locally +(without any Spark executors). +""" +return self._jdf.isLocal() + def show(self): """ Print the first 20 rows. @@ -247,14 +263,12 @@ class DataFrame(object): 2 Alice 5 Bob >>> df -age name -2 Alice -5 Bob +DataFrame[age: int, name: string] """ -print (self) +print self._jdf.showString().encode('utf8', 'ignore') def __repr__(self): -return self._jdf.showString() +return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) def count(self): """Return the number of elements in this RDD. @@ -336,6 +350,8 @@ class DataFrame(object): """ Return a new RDD by applying a function to each partition. +It's a shorthand for df.rdd.mapPartitions() + >>> rdd = sc.parallelize([1, 2, 3, 4], 4) >>> def f(iterator): yield 1 >>> rdd.mapPartitions(f).sum() @@ -343,6 +359,31 @@ class DataFrame(object): """ return self.rdd.mapPartitions(f, preservesPartitioning) +def foreach(self, f): +""" +Applies a function to all rows of this DataFrame. + +It's a shorthand for df.rdd.foreach() + +>>> def f(person): +... print person.name +>>> df.foreach(f) +""" +return self.rdd.foreach(f) + +def foreachPartition(self, f): +""" +Applies a function to each partition of this DataFrame. + +It's a shorthand for df.rdd.foreachPartition() + +>>> def f(people): +... for person in people: +... print person.name +>>> df.foreachPartition(f) +""" +return self.rdd.foreachPartition(f) + def cache(self): """ Persist with the default storage level (C{MEMORY_ONLY_SER}). """ @@ -377,8 +418,13 @@ class DataFrame(object): """ Return a new :class:`DataFrame` that has exactly `numPartitions` partitions. """ -rdd = self._jdf.repartition(numPartitions, None) -return DataFrame(rdd, self.sql_ctx) +return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx) + +def distinct(self): +""" +Return a new :class:`DataFrame` containing the distinct rows in this DataFrame. +""" +return DataFrame(self._jdf.distinct(), self.sql_ctx) def sample(self, withReplacement, fraction, seed=None): """ @@ -957,10 +1003,7 @@ class Column(DataFrame): return Column(jc, self.sql_ctx) def __repr__(self): -if self._jdf.isComputable(): -return self._jdf.samples() -else: -return 'Column<%s>' % self._jdf.toString() +return 'Column<%s>' % self._jdf.toString().encode('utf8')
[2/2] spark git commit: [SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation
[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation Author: Michael Armbrust Closes #4642 from marmbrus/docs and squashes the following commits: d291c34 [Michael Armbrust] python tests 9be66e3 [Michael Armbrust] comments d56afc2 [Michael Armbrust] fix style f004747 [Michael Armbrust] fix build c4a907b [Michael Armbrust] fix tests 42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c74b07fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c74b07fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c74b07fa Branch: refs/heads/master Commit: c74b07fa94a8da50437d952ae05cf6ac70fbb93e Parents: c76da36 Author: Michael Armbrust Authored: Tue Feb 17 10:21:17 2015 -0800 Committer: Michael Armbrust Committed: Tue Feb 17 10:21:17 2015 -0800 -- project/SparkBuild.scala| 12 +- python/pyspark/sql/context.py | 28 +-- .../org/apache/spark/sql/jdbc/JDBCUtils.java| 59 -- .../scala/org/apache/spark/sql/DataFrame.scala | 153 +- .../org/apache/spark/sql/DataFrameImpl.scala| 33 ++- .../apache/spark/sql/ExperimentalMethods.scala | 5 + .../apache/spark/sql/IncomputableColumn.scala | 4 + .../scala/org/apache/spark/sql/SQLContext.scala | 200 +++ .../apache/spark/sql/UserDefinedFunction.scala | 3 +- .../org/apache/spark/sql/api/package.scala | 23 +++ .../apache/spark/sql/execution/commands.scala | 2 +- .../spark/sql/execution/debug/package.scala | 10 +- .../spark/sql/jdbc/JavaJDBCTrampoline.scala | 30 --- .../scala/org/apache/spark/sql/jdbc/jdbc.scala | 74 ++- .../sql/parquet/ParquetTableOperations.scala| 4 +- .../apache/spark/sql/parquet/ParquetTest.scala | 4 +- .../apache/spark/sql/parquet/newParquet.scala | 6 +- .../spark/sql/parquet/timestamp/NanoTime.scala | 2 +- .../org/apache/spark/sql/sources/ddl.scala | 4 +- .../org/apache/spark/sql/jdbc/JavaJDBCTest.java | 102 -- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 20 +- .../spark/sql/jdbc/MySQLIntegration.scala | 14 +- .../spark/sql/jdbc/PostgresIntegration.scala| 4 +- .../execution/DescribeHiveTableCommand.scala| 4 +- .../spark/sql/hive/execution/commands.scala | 8 + .../spark/sql/hive/execution/package.scala | 25 +++ .../org/apache/spark/sql/hive/package.scala | 10 + .../sql/hive/parquet/FakeParquetSerDe.scala | 56 -- .../org/apache/spark/sql/hive/Shim12.scala | 9 +- .../org/apache/spark/sql/hive/Shim13.scala | 9 +- 31 files changed, 501 insertions(+), 423 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8fb1239..e4b1b96 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -361,9 +361,16 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn), + inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn), + +// Skip actual catalyst, but include the subproject. +// Catalyst is not public API and contains quasiquotes which break scaladoc. +unidocAllSources in (ScalaUnidoc, unidoc) := { + (unidocAllSources in (ScalaUnidoc, unidoc)).value +.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) +}, // Skip class names containing $ and some internal packages in Javadocs unidocAllSources in (JavaUnidoc, unidoc) := { @@ -376,6 +383,7 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("executor"))) .map(_.filterNot(_.getCanonicalPath.contains("python"))) .map(_.filterNot(_.getCanonicalPath.contains("collection"))) +.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst"))) }, // Javadoc options: create a window title, and group key packages on index page http://git-wip-us.apache.org/repos/asf/spark/blob/c74b07fa/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index dd2cd5e..2e230
spark git commit: [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM
Repository: spark Updated Branches: refs/heads/branch-1.3 824062912 -> 97cb568a2 [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM `numFeatures` is only used by multinomial logistic regression. Calling `.first()` for every GLM causes performance regression, especially in Python. Author: Xiangrui Meng Closes #4647 from mengxr/SPARK-5858 and squashes the following commits: 036dc7f [Xiangrui Meng] remove unnecessary first() call 12c5548 [Xiangrui Meng] check numFeatures only once (cherry picked from commit c76da36c2163276b5c34e59fbb139eeb34ed0faa) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97cb568a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97cb568a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97cb568a Branch: refs/heads/branch-1.3 Commit: 97cb568a219dfc6bcae1d8813cc552b11b7ba414 Parents: 8240629 Author: Xiangrui Meng Authored: Tue Feb 17 10:17:45 2015 -0800 Committer: Xiangrui Meng Committed: Tue Feb 17 10:17:50 2015 -0800 -- .../spark/mllib/classification/LogisticRegression.scala | 6 +- .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala | 7 --- 2 files changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97cb568a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 420d6e2..b787667 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -355,6 +355,10 @@ class LogisticRegressionWithLBFGS } override protected def createModel(weights: Vector, intercept: Double) = { -new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) +if (numOfLinearPredictor == 1) { + new LogisticRegressionModel(weights, intercept) +} else { + new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/97cb568a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 2b71453..7c66e8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] /** * The dimension of training features. */ - protected var numFeatures: Int = 0 + protected var numFeatures: Int = -1 /** * Set if the algorithm should use feature scaling to improve the convergence during optimization. @@ -163,7 +163,9 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] * RDD of LabeledPoint entries. */ def run(input: RDD[LabeledPoint]): M = { -numFeatures = input.first().features.size +if (numFeatures < 0) { + numFeatures = input.map(_.features.size).first() +} /** * When `numOfLinearPredictor > 1`, the intercepts are encapsulated into weights, @@ -193,7 +195,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] * of LabeledPoint entries starting from the initial weights provided. */ def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { -numFeatures = input.first().features.size if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM
Repository: spark Updated Branches: refs/heads/master 3ce46e94f -> c76da36c2 [SPARK-5858][MLLIB] Remove unnecessary first() call in GLM `numFeatures` is only used by multinomial logistic regression. Calling `.first()` for every GLM causes performance regression, especially in Python. Author: Xiangrui Meng Closes #4647 from mengxr/SPARK-5858 and squashes the following commits: 036dc7f [Xiangrui Meng] remove unnecessary first() call 12c5548 [Xiangrui Meng] check numFeatures only once Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c76da36c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c76da36c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c76da36c Branch: refs/heads/master Commit: c76da36c2163276b5c34e59fbb139eeb34ed0faa Parents: 3ce46e9 Author: Xiangrui Meng Authored: Tue Feb 17 10:17:45 2015 -0800 Committer: Xiangrui Meng Committed: Tue Feb 17 10:17:45 2015 -0800 -- .../spark/mllib/classification/LogisticRegression.scala | 6 +- .../spark/mllib/regression/GeneralizedLinearAlgorithm.scala | 7 --- 2 files changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c76da36c/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index 420d6e2..b787667 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -355,6 +355,10 @@ class LogisticRegressionWithLBFGS } override protected def createModel(weights: Vector, intercept: Double) = { -new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) +if (numOfLinearPredictor == 1) { + new LogisticRegressionModel(weights, intercept) +} else { + new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/c76da36c/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala index 2b71453..7c66e8c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala @@ -126,7 +126,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] /** * The dimension of training features. */ - protected var numFeatures: Int = 0 + protected var numFeatures: Int = -1 /** * Set if the algorithm should use feature scaling to improve the convergence during optimization. @@ -163,7 +163,9 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] * RDD of LabeledPoint entries. */ def run(input: RDD[LabeledPoint]): M = { -numFeatures = input.first().features.size +if (numFeatures < 0) { + numFeatures = input.map(_.features.size).first() +} /** * When `numOfLinearPredictor > 1`, the intercepts are encapsulated into weights, @@ -193,7 +195,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel] * of LabeledPoint entries starting from the initial weights provided. */ def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { -numFeatures = input.first().features.size if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5856: In Maven build script, launch Zinc with more memory
Repository: spark Updated Branches: refs/heads/branch-1.3 aeb85cdee -> 824062912 SPARK-5856: In Maven build script, launch Zinc with more memory I've seen out of memory exceptions when trying to run many parallel builds against the same Zinc server during packaging. We should use the same increased memory settings we use for Maven itself. I tested this and confirmed that the Nailgun JVM launched with higher memory. Author: Patrick Wendell Closes #4643 from pwendell/zinc-memory and squashes the following commits: 717cfb0 [Patrick Wendell] SPARK-5856: Launch Zinc with larger memory options. (cherry picked from commit 3ce46e94fe77d15f18e916b76b37fa96356ace93) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82406291 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82406291 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82406291 Branch: refs/heads/branch-1.3 Commit: 82406291290b5e1a6506a01bceba396aca78363e Parents: aeb85cd Author: Patrick Wendell Authored: Tue Feb 17 10:10:01 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 10:10:08 2015 -0800 -- build/mvn | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82406291/build/mvn -- diff --git a/build/mvn b/build/mvn index 53babf5..3561110 100755 --- a/build/mvn +++ b/build/mvn @@ -21,6 +21,8 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # Preserve the calling directory _CALLING_DIR="$(pwd)" +# Options used during compilation +_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" # Installs any application tarball given a URL, the expected tarball name, # and, optionally, a checkable binary path to determine if the binary has @@ -136,6 +138,7 @@ cd "${_CALLING_DIR}" # Now that zinc is ensured to be installed, check its status and, if its # not running or just installed, start it if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then + export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} ${ZINC_BIN} -shutdown ${ZINC_BIN} -start -port ${ZINC_PORT} \ -scala-compiler "${SCALA_COMPILER}" \ @@ -143,7 +146,7 @@ if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then fi # Set any `mvn` options if not already present -export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"} +export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} # Last, call the `mvn` command as usual ${MVN_BIN} "$@" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-5856: In Maven build script, launch Zinc with more memory
Repository: spark Updated Branches: refs/heads/master ee6e3eff0 -> 3ce46e94f SPARK-5856: In Maven build script, launch Zinc with more memory I've seen out of memory exceptions when trying to run many parallel builds against the same Zinc server during packaging. We should use the same increased memory settings we use for Maven itself. I tested this and confirmed that the Nailgun JVM launched with higher memory. Author: Patrick Wendell Closes #4643 from pwendell/zinc-memory and squashes the following commits: 717cfb0 [Patrick Wendell] SPARK-5856: Launch Zinc with larger memory options. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ce46e94 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ce46e94 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ce46e94 Branch: refs/heads/master Commit: 3ce46e94fe77d15f18e916b76b37fa96356ace93 Parents: ee6e3ef Author: Patrick Wendell Authored: Tue Feb 17 10:10:01 2015 -0800 Committer: Patrick Wendell Committed: Tue Feb 17 10:10:01 2015 -0800 -- build/mvn | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ce46e94/build/mvn -- diff --git a/build/mvn b/build/mvn index 53babf5..3561110 100755 --- a/build/mvn +++ b/build/mvn @@ -21,6 +21,8 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # Preserve the calling directory _CALLING_DIR="$(pwd)" +# Options used during compilation +_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" # Installs any application tarball given a URL, the expected tarball name, # and, optionally, a checkable binary path to determine if the binary has @@ -136,6 +138,7 @@ cd "${_CALLING_DIR}" # Now that zinc is ensured to be installed, check its status and, if its # not running or just installed, start it if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then + export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"} ${ZINC_BIN} -shutdown ${ZINC_BIN} -start -port ${ZINC_PORT} \ -scala-compiler "${SCALA_COMPILER}" \ @@ -143,7 +146,7 @@ if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`${ZINC_BIN} -status`" ]; then fi # Set any `mvn` options if not already present -export MAVEN_OPTS=${MAVEN_OPTS:-"-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"} +export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"} # Last, call the `mvn` command as usual ${MVN_BIN} "$@" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
Repository: spark Updated Branches: refs/heads/branch-1.3 b8da5c390 -> aeb85cdee Revert "[SPARK-5363] [PySpark] check ending mark in non-block way" This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aeb85cde Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aeb85cde Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aeb85cde Branch: refs/heads/branch-1.3 Commit: aeb85cdeea39ace5a41e5196663d5aa3ebf1517f Parents: b8da5c3 Author: Josh Rosen Authored: Tue Feb 17 07:48:27 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 07:51:05 2015 -0800 -- .../org/apache/spark/api/python/PythonRDD.scala | 21 python/pyspark/worker.py| 1 - 2 files changed, 4 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aeb85cde/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e94c390..2527211 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -144,24 +144,11 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } - // Check whether the worker is ready to be re-used. - if (reuse_worker) { -// It has a high possibility that the ending mark is already available, -// And current task should not be blocked by checking it - -if (stream.available() >= 4) { - val ending = stream.readInt() - if (ending == SpecialLengths.END_OF_STREAM) { -env.releasePythonWorker(pythonExec, envVars.toMap, worker) -released = true -logInfo(s"Communication with worker ended cleanly, re-use it: $worker") - } else { -logInfo(s"Communication with worker did not end cleanly " + - s"(ending with $ending), close it: $worker") - } -} else { - logInfo(s"The ending mark from worker is not available, close it: $worker") + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { +if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true } } null http://git-wip-us.apache.org/repos/asf/spark/blob/aeb85cde/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 180bdbb..8a93c32 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -121,7 +121,6 @@ def main(infile, outfile): write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) -outfile.flush() # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
Repository: spark Updated Branches: refs/heads/branch-1.2 432ceca2a -> 6be36d5a8 Revert "[SPARK-5363] [PySpark] check ending mark in non-block way" This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be36d5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be36d5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be36d5a Branch: refs/heads/branch-1.2 Commit: 6be36d5a88c172b446cc69ebde6176e606cf09f1 Parents: 432ceca Author: Josh Rosen Authored: Tue Feb 17 07:48:27 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 07:51:37 2015 -0800 -- .../org/apache/spark/api/python/PythonRDD.scala | 21 python/pyspark/worker.py| 1 - 2 files changed, 4 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b513fb8..0d508d6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -145,24 +145,11 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } - // Check whether the worker is ready to be re-used. - if (reuse_worker) { -// It has a high possibility that the ending mark is already available, -// And current task should not be blocked by checking it - -if (stream.available() >= 4) { - val ending = stream.readInt() - if (ending == SpecialLengths.END_OF_STREAM) { -env.releasePythonWorker(pythonExec, envVars.toMap, worker) -released = true -logInfo(s"Communication with worker ended cleanly, re-use it: $worker") - } else { -logInfo(s"Communication with worker did not end cleanly " + - s"(ending with $ending), close it: $worker") - } -} else { - logInfo(s"The ending mark from worker is not available, close it: $worker") + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { +if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true } } null http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c2ddd4d..7e5343c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -127,7 +127,6 @@ def main(infile, outfile): write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) -outfile.flush() # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"
Repository: spark Updated Branches: refs/heads/master a65766bf0 -> ee6e3eff0 Revert "[SPARK-5363] [PySpark] check ending mark in non-block way" This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee6e3eff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee6e3eff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee6e3eff Branch: refs/heads/master Commit: ee6e3eff02e9e08b1113ba6faf3397d7e7775087 Parents: a65766b Author: Josh Rosen Authored: Tue Feb 17 07:48:27 2015 -0800 Committer: Josh Rosen Committed: Tue Feb 17 07:49:02 2015 -0800 -- .../org/apache/spark/api/python/PythonRDD.scala | 21 python/pyspark/worker.py| 1 - 2 files changed, 4 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e3eff/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e94c390..2527211 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -144,24 +144,11 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } - // Check whether the worker is ready to be re-used. - if (reuse_worker) { -// It has a high possibility that the ending mark is already available, -// And current task should not be blocked by checking it - -if (stream.available() >= 4) { - val ending = stream.readInt() - if (ending == SpecialLengths.END_OF_STREAM) { -env.releasePythonWorker(pythonExec, envVars.toMap, worker) -released = true -logInfo(s"Communication with worker ended cleanly, re-use it: $worker") - } else { -logInfo(s"Communication with worker did not end cleanly " + - s"(ending with $ending), close it: $worker") - } -} else { - logInfo(s"The ending mark from worker is not available, close it: $worker") + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { +if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true } } null http://git-wip-us.apache.org/repos/asf/spark/blob/ee6e3eff/python/pyspark/worker.py -- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 180bdbb..8a93c32 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -121,7 +121,6 @@ def main(infile, outfile): write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) -outfile.flush() # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5826][Streaming] Fix Configuration not serializable problem
Repository: spark Updated Branches: refs/heads/branch-1.3 e9241fa70 -> b8da5c390 [SPARK-5826][Streaming] Fix Configuration not serializable problem Author: jerryshao Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits: 7ec71db [jerryshao] Remove transient for conf statement 88d84e6 [jerryshao] Fix Configuration not serializable problem (cherry picked from commit a65766bf0244a41b793b9dc5fbdd2882664ad00e) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8da5c39 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8da5c39 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8da5c39 Branch: refs/heads/branch-1.3 Commit: b8da5c390b7272ded5476d4531dcd757c5431fab Parents: e9241fa Author: jerryshao Authored: Tue Feb 17 10:45:18 2015 + Committer: Sean Owen Committed: Tue Feb 17 10:45:33 2015 + -- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8da5c39/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 6379b88..4f7db41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag @@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { + private val serializableConfOpt = conf.map(new SerializableWritable(_)) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = conf match { + val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5826][Streaming] Fix Configuration not serializable problem
Repository: spark Updated Branches: refs/heads/master c06e42f2c -> a65766bf0 [SPARK-5826][Streaming] Fix Configuration not serializable problem Author: jerryshao Closes #4612 from jerryshao/SPARK-5826 and squashes the following commits: 7ec71db [jerryshao] Remove transient for conf statement 88d84e6 [jerryshao] Fix Configuration not serializable problem Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a65766bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a65766bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a65766bf Branch: refs/heads/master Commit: a65766bf0244a41b793b9dc5fbdd2882664ad00e Parents: c06e42f Author: jerryshao Authored: Tue Feb 17 10:45:18 2015 + Committer: Sean Owen Committed: Tue Feb 17 10:45:18 2015 + -- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a65766bf/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 6379b88..4f7db41 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.reflect.ClassTag @@ -27,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -78,6 +78,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) extends InputDStream[(K, V)](ssc_) { + private val serializableConfOpt = conf.map(new SerializableWritable(_)) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -240,7 +242,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { val fileRDDs = files.map(file =>{ - val rdd = conf match { + val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org