spark git commit: [SPARK-16021][TEST-MAVEN] Fix the maven build
Repository: spark Updated Branches: refs/heads/master 69f539140 -> 4b5a72c7d [SPARK-16021][TEST-MAVEN] Fix the maven build ## What changes were proposed in this pull request? Fixed the maven build for #13983 ## How was this patch tested? The existing tests. Author: Shixiong ZhuCloses #14084 from zsxwing/fix-maven. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b5a72c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b5a72c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b5a72c7 Branch: refs/heads/master Commit: 4b5a72c7dc364ca8d57d9f4bb47f4cd31c5b3082 Parents: 69f5391 Author: Shixiong Zhu Authored: Wed Jul 6 22:48:05 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 22:48:05 2016 -0700 -- pom.xml | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b5a72c7/pom.xml -- diff --git a/pom.xml b/pom.xml index c99d786..4aaf616 100644 --- a/pom.xml +++ b/pom.xml @@ -1995,6 +1995,7 @@ false false true + true src - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16398][CORE] Make cancelJob and cancelStage APIs public
Repository: spark Updated Branches: refs/heads/master 42279bff6 -> 69f539140 [SPARK-16398][CORE] Make cancelJob and cancelStage APIs public ## What changes were proposed in this pull request? Make SparkContext `cancelJob` and `cancelStage` APIs public. This allows applications to use `SparkListener` to do their own management of jobs via events, but without using the REST API. ## How was this patch tested? Existing tests (dev/run-tests) Author: MasterDDTCloses #14072 from MasterDDT/SPARK-16398. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69f53914 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69f53914 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69f53914 Branch: refs/heads/master Commit: 69f5391408b779a400b553344fd61051004685fc Parents: 42279bf Author: MasterDDT Authored: Wed Jul 6 22:47:40 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 22:47:40 2016 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 18 ++ 1 file changed, 14 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/69f53914/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fe15052..57d1f09 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2011,13 +2011,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli dagScheduler.cancelAllJobs() } - /** Cancel a given job if it's scheduled or running */ - private[spark] def cancelJob(jobId: Int) { + /** + * Cancel a given job if it's scheduled or running. + * + * @param jobId the job ID to cancel + * @throws InterruptedException if the cancel message cannot be sent + */ + def cancelJob(jobId: Int) { dagScheduler.cancelJob(jobId) } - /** Cancel a given stage and all jobs associated with it */ - private[spark] def cancelStage(stageId: Int) { + /** + * Cancel a given stage and all jobs associated with it. + * + * @param stageId the stage ID to cancel + * @throws InterruptedException if the cancel message cannot be sent + */ + def cancelStage(stageId: Int) { dagScheduler.cancelStage(stageId) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16374][SQL] Remove Alias from MetastoreRelation and SimpleCatalogRelation
Repository: spark Updated Branches: refs/heads/master 34283de16 -> 42279bff6 [SPARK-16374][SQL] Remove Alias from MetastoreRelation and SimpleCatalogRelation What changes were proposed in this pull request? Different from the other leaf nodes, `MetastoreRelation` and `SimpleCatalogRelation` have a pre-defined `alias`, which is used to change the qualifier of the node. However, based on the existing alias handling, alias should be put in `SubqueryAlias`. This PR is to separate alias handling from `MetastoreRelation` and `SimpleCatalogRelation` to make it consistent with the other nodes. It simplifies the signature and conversion to a `BaseRelation`. For example, below is an example query for `MetastoreRelation`, which is converted to a `LogicalRelation`: ```SQL SELECT tmp.a + 1 FROM test_parquet_ctas tmp WHERE tmp.a > 2 ``` Before changes, the analyzed plan is ``` == Analyzed Logical Plan == (a + 1): int Project [(a#951 + 1) AS (a + 1)#952] +- Filter (a#951 > 2) +- SubqueryAlias tmp +- Relation[a#951] parquet ``` After changes, the analyzed plan becomes ``` == Analyzed Logical Plan == (a + 1): int Project [(a#951 + 1) AS (a + 1)#952] +- Filter (a#951 > 2) +- SubqueryAlias tmp +- SubqueryAlias test_parquet_ctas +- Relation[a#951] parquet ``` **Note: the optimized plans are the same.** For `SimpleCatalogRelation`, the existing code always generates two Subqueries. Thus, no change is needed. How was this patch tested? Added test cases. Author: gatorsmileCloses #14053 from gatorsmile/removeAliasFromMetastoreRelation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42279bff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42279bff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42279bff Branch: refs/heads/master Commit: 42279bff686f9808ec7a9e8f4da95c717edc6026 Parents: 34283de Author: gatorsmile Authored: Thu Jul 7 12:07:19 2016 +0800 Committer: Wenchen Fan Committed: Thu Jul 7 12:07:19 2016 +0800 -- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 5 ++--- .../spark/sql/catalyst/catalog/SessionCatalogSuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 10 ++ .../org/apache/spark/sql/hive/MetastoreRelation.scala | 10 -- 5 files changed, 14 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42279bff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e1d4991..ffaefeb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -403,7 +403,7 @@ class SessionCatalog( val relation = if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) - SimpleCatalogRelation(db, metadata, alias) + SimpleCatalogRelation(db, metadata) } else { tempTables(table) } http://git-wip-us.apache.org/repos/asf/spark/blob/42279bff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 6197aca..b12606e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -244,8 +244,7 @@ trait CatalogRelation { */ case class SimpleCatalogRelation( databaseName: String, -metadata: CatalogTable, -alias: Option[String] = None) +metadata: CatalogTable) extends LeafNode with CatalogRelation { override def catalogTable: CatalogTable = metadata @@ -261,7 +260,7 @@ case class SimpleCatalogRelation( CatalystSqlParser.parseDataType(f.dataType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true - )(qualifier = Some(alias.getOrElse(metadata.identifier.table))) + )(qualifier = Some(metadata.identifier.table)) } }
spark git commit: [SPARK-14839][SQL] Support for other types for `tableProperty` rule in SQL syntax
Repository: spark Updated Branches: refs/heads/master 44c7c62bc -> 34283de16 [SPARK-14839][SQL] Support for other types for `tableProperty` rule in SQL syntax ## What changes were proposed in this pull request? Currently, Scala API supports to take options with the types, `String`, `Long`, `Double` and `Boolean` and Python API also supports other types. This PR corrects `tableProperty` rule to support other types (string, boolean, double and integer) so that support the options for data sources in a consistent way. This will affect other rules such as DBPROPERTIES and TBLPROPERTIES (allowing other types as values). Also, `TODO add bucketing and partitioning.` was removed because it was resolved in https://github.com/apache/spark/commit/24bea000476cdd0b43be5160a76bc5b170ef0b42 ## How was this patch tested? Unit test in `MetastoreDataSourcesSuite.scala`. Author: hyukjinkwonCloses #13517 from HyukjinKwon/SPARK-14839. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34283de1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34283de1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34283de1 Branch: refs/heads/master Commit: 34283de160808324da02964cd5dc5df80e59ae71 Parents: 44c7c62 Author: hyukjinkwon Authored: Wed Jul 6 23:57:18 2016 -0400 Committer: Herman van Hovell Committed: Wed Jul 6 23:57:18 2016 -0400 -- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 9 ++- .../spark/sql/execution/SparkSqlParser.scala| 22 +-- .../sql/execution/command/DDLCommandSuite.scala | 61 3 files changed, 87 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34283de1/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 -- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4c15f9c..7ccbb2d 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -249,7 +249,7 @@ tablePropertyList ; tableProperty -: key=tablePropertyKey (EQ? value=STRING)? +: key=tablePropertyKey (EQ? value=tablePropertyValue)? ; tablePropertyKey @@ -257,6 +257,13 @@ tablePropertyKey | STRING ; +tablePropertyValue +: INTEGER_VALUE +| DECIMAL_VALUE +| booleanValue +| STRING +; + constantList : '(' constant (',' constant)* ')' ; http://git-wip-us.apache.org/repos/asf/spark/blob/34283de1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 42ec210..f77801f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -311,8 +311,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan. - * - * TODO add bucketing and partitioning. */ override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) @@ -413,7 +411,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { val properties = ctx.tableProperty.asScala.map { property => val key = visitTablePropertyKey(property.key) - val value = Option(property.value).map(string).orNull + val value = visitTablePropertyValue(property.value) key -> value } // Check for duplicate property names. @@ -426,7 +424,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { */ private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = { val props = visitTablePropertyList(ctx) -val badKeys = props.filter { case (_, v) => v == null }.keys +val badKeys = props.collect { case (key, null) => key } if (badKeys.nonEmpty) { operationNotAllowed( s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) @@ -461,6 +459,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** + * A table property value can
spark git commit: [SPARK-16021] Fill freed memory in test to help catch correctness bugs
Repository: spark Updated Branches: refs/heads/master b8ebf63c1 -> 44c7c62bc [SPARK-16021] Fill freed memory in test to help catch correctness bugs ## What changes were proposed in this pull request? This patches `MemoryAllocator` to fill clean and freed memory with known byte values, similar to https://github.com/jemalloc/jemalloc/wiki/Use-Case:-Find-a-memory-corruption-bug . Memory filling is flag-enabled in test only by default. ## How was this patch tested? Unit test that it's on in test. cc sameeragarwal Author: Eric LiangCloses #13983 from ericl/spark-16021. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44c7c62b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44c7c62b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44c7c62b Branch: refs/heads/master Commit: 44c7c62bcfca74c82ffc4e3c53997fff47bfacac Parents: b8ebf63 Author: Eric Liang Authored: Wed Jul 6 16:30:25 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 16:30:25 2016 -0700 -- .../main/java/org/apache/spark/unsafe/Platform.java | 4 .../spark/unsafe/memory/HeapMemoryAllocator.java| 10 +- .../apache/spark/unsafe/memory/MemoryAllocator.java | 13 - .../org/apache/spark/unsafe/memory/MemoryBlock.java | 7 +++ .../spark/unsafe/memory/UnsafeMemoryAllocator.java | 9 - .../org/apache/spark/unsafe/PlatformUtilSuite.java | 16 project/SparkBuild.scala| 1 + 7 files changed, 57 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 77c8c39..a2ee45c 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -176,6 +176,10 @@ public final class Platform { throw new IllegalStateException("unreachable"); } + public static void setMemory(Object object, long offset, long size, byte value) { +_UNSAFE.setMemory(object, offset, size, value); + } + public static void setMemory(long address, byte value, long size) { _UNSAFE.setMemory(address, size, value); } http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java index 09847ce..3cd4264 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java @@ -24,6 +24,7 @@ import java.util.LinkedList; import java.util.Map; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.memory.MemoryAllocator; /** * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array. @@ -64,12 +65,19 @@ public class HeapMemoryAllocator implements MemoryAllocator { } } long[] array = new long[(int) ((size + 7) / 8)]; -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); +MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); +if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); +} +return memory; } @Override public void free(MemoryBlock memory) { final long size = memory.size(); +if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) { + memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE); +} if (shouldPool(size)) { synchronized (this) { LinkedList pool = bufferPoolsBySize.get(size); http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java index 5192f68..8bd2b06 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java @@
spark git commit: [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well
Repository: spark Updated Branches: refs/heads/master 8e3e4ed6c -> b8ebf63c1 [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well ## What changes were proposed in this pull request? Bring the kafka-0-8 subproject up to date with some test modifications from development on 0-10. Main changes are - eliminating waits on concurrent queue in favor of an assert on received results, - atomics instead of volatile (although this probably doesn't matter) - increasing uniqueness of topic names ## How was this patch tested? Unit tests Author: cody koeningerCloses #14073 from koeninger/kafka-0-8-test-direct-cleanup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8ebf63c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8ebf63c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8ebf63c Branch: refs/heads/master Commit: b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59 Parents: 8e3e4ed Author: cody koeninger Authored: Wed Jul 6 16:21:41 2016 -0700 Committer: Tathagata Das Committed: Wed Jul 6 16:21:41 2016 -0700 -- .../kafka/DirectKafkaStreamSuite.scala | 41 ++-- .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 ++-- 2 files changed, 24 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b8ebf63c/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index cb782d2..ab1c505 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -244,12 +244,9 @@ class DirectKafkaStreamSuite ) // Send data to Kafka and wait for it to be received -def sendDataAndWaitForReceive(data: Seq[Int]) { +def sendData(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { -assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) - } } // Setup the streaming context @@ -264,21 +261,21 @@ class DirectKafkaStreamSuite } ssc.checkpoint(testDir.getAbsolutePath) -// This is to collect the raw data received from Kafka -kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => - val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) -} - // This is ensure all the data is eventually receiving only once stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => - rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } + rdd.collect().headOption.foreach { x => +DirectKafkaStreamSuite.total.set(x._2) + } } ssc.start() -// Send some data and wait for them to be received +// Send some data for (i <- (1 to 10).grouped(4)) { - sendDataAndWaitForReceive(i) + sendData(i) +} + +eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } ssc.stop() @@ -302,23 +299,26 @@ class DirectKafkaStreamSuite val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] // Verify offset ranges have been recovered -val recoveredOffsetRanges = getOffsetRanges(recoveredStream) +val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) } assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") -val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } +val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => -earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) +earlierOffsetRanges.contains((or._1, or._2)) }, "Recovered ranges are not the same as the ones generated\n" + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + -s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" +s"earlierOffsetRanges: $earlierOffsetRanges" ) // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only
spark git commit: [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well
Repository: spark Updated Branches: refs/heads/branch-2.0 05ddc7517 -> 920162a1e [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well ## What changes were proposed in this pull request? Bring the kafka-0-8 subproject up to date with some test modifications from development on 0-10. Main changes are - eliminating waits on concurrent queue in favor of an assert on received results, - atomics instead of volatile (although this probably doesn't matter) - increasing uniqueness of topic names ## How was this patch tested? Unit tests Author: cody koeningerCloses #14073 from koeninger/kafka-0-8-test-direct-cleanup. (cherry picked from commit b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/920162a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/920162a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/920162a1 Branch: refs/heads/branch-2.0 Commit: 920162a1e0b43b558ba2242868a44cad06bef946 Parents: 05ddc75 Author: cody koeninger Authored: Wed Jul 6 16:21:41 2016 -0700 Committer: Tathagata Das Committed: Wed Jul 6 16:21:49 2016 -0700 -- .../kafka/DirectKafkaStreamSuite.scala | 41 ++-- .../spark/streaming/kafka/KafkaRDDSuite.scala | 8 ++-- 2 files changed, 24 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/920162a1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala -- diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index cb782d2..ab1c505 100644 --- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -244,12 +244,9 @@ class DirectKafkaStreamSuite ) // Send data to Kafka and wait for it to be received -def sendDataAndWaitForReceive(data: Seq[Int]) { +def sendData(data: Seq[Int]) { val strings = data.map { _.toString} kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap) - eventually(timeout(10 seconds), interval(50 milliseconds)) { -assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains }) - } } // Setup the streaming context @@ -264,21 +261,21 @@ class DirectKafkaStreamSuite } ssc.checkpoint(testDir.getAbsolutePath) -// This is to collect the raw data received from Kafka -kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) => - val data = rdd.map { _._2 }.collect() - DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*)) -} - // This is ensure all the data is eventually receiving only once stateStream.foreachRDD { (rdd: RDD[(String, Int)]) => - rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 } + rdd.collect().headOption.foreach { x => +DirectKafkaStreamSuite.total.set(x._2) + } } ssc.start() -// Send some data and wait for them to be received +// Send some data for (i <- (1 to 10).grouped(4)) { - sendDataAndWaitForReceive(i) + sendData(i) +} + +eventually(timeout(10 seconds), interval(50 milliseconds)) { + assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } ssc.stop() @@ -302,23 +299,26 @@ class DirectKafkaStreamSuite val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]] // Verify offset ranges have been recovered -val recoveredOffsetRanges = getOffsetRanges(recoveredStream) +val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => (x._1, x._2.toSet) } assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") -val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } +val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => -earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) +earlierOffsetRanges.contains((or._1, or._2)) }, "Recovered ranges are not the same as the ones generated\n" + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + -s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" +s"earlierOffsetRanges: $earlierOffsetRanges" ) //
spark git commit: [SPARK-16371][SQL] Two follow-up tasks
Repository: spark Updated Branches: refs/heads/branch-2.0 2c2b8f121 -> 05ddc7517 [SPARK-16371][SQL] Two follow-up tasks ## What changes were proposed in this pull request? This is a small follow-up for SPARK-16371: 1. Hide removeMetadata from public API. 2. Add JIRA ticket number to test case name. ## How was this patch tested? Updated a test comment. Author: Reynold XinCloses #14074 from rxin/parquet-filter. (cherry picked from commit 8e3e4ed6c090d18675d49eec46b3ee572457db95) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05ddc751 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05ddc751 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05ddc751 Branch: refs/heads/branch-2.0 Commit: 05ddc75179acc582c615da01b9c0e7e049a5ecf0 Parents: 2c2b8f1 Author: Reynold Xin Authored: Wed Jul 6 15:04:37 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 15:04:44 2016 -0700 -- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 4 ++-- .../sql/execution/datasources/parquet/ParquetFilterSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/05ddc751/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 436512f..effef54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -378,10 +378,10 @@ object StructType extends AbstractDataType { StructType(fields.asScala) } - protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = + private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) - def removeMetadata(key: String, dt: DataType): DataType = + private[sql] def removeMetadata(key: String, dt: DataType): DataType = dt match { case StructType(fields) => val newFields = fields.map { f => http://git-wip-us.apache.org/repos/asf/spark/blob/05ddc751/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 35d6915..2a89773 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -546,7 +546,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("Do not push down filters incorrectly when inner name and outer name are the same") { + test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i { implicit df => // Here the schema becomes as below: // - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16371][SQL] Two follow-up tasks
Repository: spark Updated Branches: refs/heads/master 9c041990c -> 8e3e4ed6c [SPARK-16371][SQL] Two follow-up tasks ## What changes were proposed in this pull request? This is a small follow-up for SPARK-16371: 1. Hide removeMetadata from public API. 2. Add JIRA ticket number to test case name. ## How was this patch tested? Updated a test comment. Author: Reynold XinCloses #14074 from rxin/parquet-filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e3e4ed6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e3e4ed6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e3e4ed6 Branch: refs/heads/master Commit: 8e3e4ed6c090d18675d49eec46b3ee572457db95 Parents: 9c041990 Author: Reynold Xin Authored: Wed Jul 6 15:04:37 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 15:04:37 2016 -0700 -- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 4 ++-- .../sql/execution/datasources/parquet/ParquetFilterSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8e3e4ed6/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 0e89f71..0284ecc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -413,10 +413,10 @@ object StructType extends AbstractDataType { StructType(fields.asScala) } - protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = + private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) - def removeMetadata(key: String, dt: DataType): DataType = + private[sql] def removeMetadata(key: String, dt: DataType): DataType = dt match { case StructType(fields) => val newFields = fields.map { f => http://git-wip-us.apache.org/repos/asf/spark/blob/8e3e4ed6/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 18a3128..84fdcfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -544,7 +544,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("Do not push down filters incorrectly when inner name and outer name are the same") { + test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i { implicit df => // Here the schema becomes as below: // - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MESOS] expand coarse-grained mode docs
Repository: spark Updated Branches: refs/heads/branch-2.0 88be66b93 -> 2c2b8f121 [MESOS] expand coarse-grained mode docs ## What changes were proposed in this pull request? docs ## How was this patch tested? viewed the docs in github Author: Michael GummeltCloses #14059 from mgummelt/coarse-grained. (cherry picked from commit 9c041990cf4d0138d9104207b5c2e7a319b42615) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c2b8f12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c2b8f12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c2b8f12 Branch: refs/heads/branch-2.0 Commit: 2c2b8f121a213618ef47cb030d17b9bd323f0d9e Parents: 88be66b Author: Michael Gummelt Authored: Wed Jul 6 15:02:45 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 15:02:50 2016 -0700 -- docs/running-on-mesos.md | 77 --- 1 file changed, 51 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c2b8f12/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 4a0ab62..8ab5f30 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -180,30 +180,53 @@ Note that jars or python files that are passed to spark-submit should be URIs re # Mesos Run Modes -Spark can run over Mesos in two modes: "coarse-grained" (default) and "fine-grained". - -The "coarse-grained" mode will launch only *one* long-running Spark task on each Mesos -machine, and dynamically schedule its own "mini-tasks" within it. The benefit is much lower startup -overhead, but at the cost of reserving the Mesos resources for the complete duration of the -application. - -Coarse-grained is the default mode. You can also set `spark.mesos.coarse` property to true -to turn it on explicitly in [SparkConf](configuration.html#spark-properties): - -{% highlight scala %} -conf.set("spark.mesos.coarse", "true") -{% endhighlight %} - -In addition, for coarse-grained mode, you can control the maximum number of resources Spark will -acquire. By default, it will acquire *all* cores in the cluster (that get offered by Mesos), which -only makes sense if you run just one application at a time. You can cap the maximum number of cores -using `conf.set("spark.cores.max", "10")` (for example). - -In "fine-grained" mode, each Spark task runs as a separate Mesos task. This allows -multiple instances of Spark (and other frameworks) to share machines at a very fine granularity, -where each application gets more or fewer machines as it ramps up and down, but it comes with an -additional overhead in launching each task. This mode may be inappropriate for low-latency -requirements like interactive queries or serving web requests. +Spark can run over Mesos in two modes: "coarse-grained" (default) and +"fine-grained". + +## Coarse-Grained + +In "coarse-grained" mode, each Spark executor runs as a single Mesos +task. Spark executors are sized according to the following +configuration variables: + +* Executor memory: `spark.executor.memory` +* Executor cores: `spark.executor.cores` +* Number of executors: `spark.cores.max`/`spark.executor.cores` + +Please see the [Spark Configuration](configuration.html) page for +details and default values. + +Executors are brought up eagerly when the application starts, until +`spark.cores.max` is reached. If you don't set `spark.cores.max`, the +Spark application will reserve all resources offered to it by Mesos, +so we of course urge you to set this variable in any sort of +multi-tenant cluster, including one which runs multiple concurrent +Spark applications. + +The scheduler will start executors round-robin on the offers Mesos +gives it, but there are no spread guarantees, as Mesos does not +provide such guarantees on the offer stream. + +The benefit of coarse-grained mode is much lower startup overhead, but +at the cost of reserving Mesos resources for the complete duration of +the application. To configure your job to dynamically adjust to its +resource requirements, look into +[Dynamic Allocation](#dynamic-resource-allocation-with-mesos). + +## Fine-Grained + +In "fine-grained" mode, each Spark task inside the Spark executor runs +as a separate Mesos task. This allows multiple instances of Spark (and +other frameworks) to share cores at a very fine granularity, where +each application gets more or fewer cores as it ramps up and down, but +it comes with an additional overhead in launching each task. This mode +may be inappropriate for low-latency requirements like interactive +queries or serving web
spark git commit: [MESOS] expand coarse-grained mode docs
Repository: spark Updated Branches: refs/heads/master a8f89df3b -> 9c041990c [MESOS] expand coarse-grained mode docs ## What changes were proposed in this pull request? docs ## How was this patch tested? viewed the docs in github Author: Michael GummeltCloses #14059 from mgummelt/coarse-grained. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c041990 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c041990 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c041990 Branch: refs/heads/master Commit: 9c041990cf4d0138d9104207b5c2e7a319b42615 Parents: a8f89df Author: Michael Gummelt Authored: Wed Jul 6 15:02:45 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 15:02:45 2016 -0700 -- docs/running-on-mesos.md | 77 --- 1 file changed, 51 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c041990/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 4a0ab62..8ab5f30 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -180,30 +180,53 @@ Note that jars or python files that are passed to spark-submit should be URIs re # Mesos Run Modes -Spark can run over Mesos in two modes: "coarse-grained" (default) and "fine-grained". - -The "coarse-grained" mode will launch only *one* long-running Spark task on each Mesos -machine, and dynamically schedule its own "mini-tasks" within it. The benefit is much lower startup -overhead, but at the cost of reserving the Mesos resources for the complete duration of the -application. - -Coarse-grained is the default mode. You can also set `spark.mesos.coarse` property to true -to turn it on explicitly in [SparkConf](configuration.html#spark-properties): - -{% highlight scala %} -conf.set("spark.mesos.coarse", "true") -{% endhighlight %} - -In addition, for coarse-grained mode, you can control the maximum number of resources Spark will -acquire. By default, it will acquire *all* cores in the cluster (that get offered by Mesos), which -only makes sense if you run just one application at a time. You can cap the maximum number of cores -using `conf.set("spark.cores.max", "10")` (for example). - -In "fine-grained" mode, each Spark task runs as a separate Mesos task. This allows -multiple instances of Spark (and other frameworks) to share machines at a very fine granularity, -where each application gets more or fewer machines as it ramps up and down, but it comes with an -additional overhead in launching each task. This mode may be inappropriate for low-latency -requirements like interactive queries or serving web requests. +Spark can run over Mesos in two modes: "coarse-grained" (default) and +"fine-grained". + +## Coarse-Grained + +In "coarse-grained" mode, each Spark executor runs as a single Mesos +task. Spark executors are sized according to the following +configuration variables: + +* Executor memory: `spark.executor.memory` +* Executor cores: `spark.executor.cores` +* Number of executors: `spark.cores.max`/`spark.executor.cores` + +Please see the [Spark Configuration](configuration.html) page for +details and default values. + +Executors are brought up eagerly when the application starts, until +`spark.cores.max` is reached. If you don't set `spark.cores.max`, the +Spark application will reserve all resources offered to it by Mesos, +so we of course urge you to set this variable in any sort of +multi-tenant cluster, including one which runs multiple concurrent +Spark applications. + +The scheduler will start executors round-robin on the offers Mesos +gives it, but there are no spread guarantees, as Mesos does not +provide such guarantees on the offer stream. + +The benefit of coarse-grained mode is much lower startup overhead, but +at the cost of reserving Mesos resources for the complete duration of +the application. To configure your job to dynamically adjust to its +resource requirements, look into +[Dynamic Allocation](#dynamic-resource-allocation-with-mesos). + +## Fine-Grained + +In "fine-grained" mode, each Spark task inside the Spark executor runs +as a separate Mesos task. This allows multiple instances of Spark (and +other frameworks) to share cores at a very fine granularity, where +each application gets more or fewer cores as it ramps up and down, but +it comes with an additional overhead in launching each task. This mode +may be inappropriate for low-latency requirements like interactive +queries or serving web requests. + +Note that while Spark tasks in fine-grained will relinquish cores as +they terminate, they will not relinquish
spark git commit: [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging
Repository: spark Updated Branches: refs/heads/master 040f6f9f4 -> a8f89df3b [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging ## What changes were proposed in this pull request? The commit https://github.com/apache/spark/commit/044971eca0ff3c2ce62afa665dbd3072d52cbbec introduced a lazy val to simplify code in Logging. Simple enough, though one side effect is that accessing log now means grabbing the instance's lock. This in turn turned up a form of deadlock in the Mesos code. It was arguably a bit of a problem in how this code is structured, but, in any event the safest thing to do seems to be to revert the commit, and that's 90% of the change here; it's just not worth the risk of similar more subtle issues. What I didn't revert here was the removal of this odd override of log in the Mesos code. In retrospect it might have been put in place at some stage as a defense against this type of problem. After all the Logging code still involved a lock at initialization before the change in question. Even after the revert, it doesn't seem like it does anything, given how Logging works now, so I left it removed. However, I also removed the particular log message that ended up playing a part in this problem anyway, maybe being paranoid, to make sure this type of problem can't happen even with how the current locking works in logging initialization. ## How was this patch tested? Jenkins tests Author: Sean OwenCloses #14069 from srowen/SPARK-16379. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8f89df3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8f89df3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8f89df3 Branch: refs/heads/master Commit: a8f89df3b391e7a3fa9f73d9ec730d6eaa95bb09 Parents: 040f6f9 Author: Sean Owen Authored: Wed Jul 6 13:36:07 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 13:36:07 2016 -0700 -- .../scala/org/apache/spark/internal/Logging.scala | 14 ++ .../mesos/MesosCoarseGrainedSchedulerBackend.scala| 1 - 2 files changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a8f89df3/core/src/main/scala/org/apache/spark/internal/Logging.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index c51050c..66a0cfe 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -32,10 +32,7 @@ private[spark] trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine - @transient lazy val log: Logger = { -initializeLogIfNecessary(false) -LoggerFactory.getLogger(logName) - } + @transient private var log_ : Logger = null // Method to get the logger name for this object protected def logName = { @@ -43,6 +40,15 @@ private[spark] trait Logging { this.getClass.getName.stripSuffix("$") } + // Method to get or create the logger for this object + protected def log: Logger = { +if (log_ == null) { + initializeLogIfNecessary(false) + log_ = LoggerFactory.getLogger(logName) +} +log_ + } + // Log methods that take only a String protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) http://git-wip-us.apache.org/repos/asf/spark/blob/a8f89df3/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e88e4ad..99e6d39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -244,7 +244,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { appId = frameworkId.getValue mesosExternalShuffleClient.foreach(_.init(appId)) -logInfo("Registered as framework ID " + appId) markRegistered() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging
Repository: spark Updated Branches: refs/heads/branch-2.0 d7926da5e -> 88be66b93 [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging ## What changes were proposed in this pull request? The commit https://github.com/apache/spark/commit/044971eca0ff3c2ce62afa665dbd3072d52cbbec introduced a lazy val to simplify code in Logging. Simple enough, though one side effect is that accessing log now means grabbing the instance's lock. This in turn turned up a form of deadlock in the Mesos code. It was arguably a bit of a problem in how this code is structured, but, in any event the safest thing to do seems to be to revert the commit, and that's 90% of the change here; it's just not worth the risk of similar more subtle issues. What I didn't revert here was the removal of this odd override of log in the Mesos code. In retrospect it might have been put in place at some stage as a defense against this type of problem. After all the Logging code still involved a lock at initialization before the change in question. Even after the revert, it doesn't seem like it does anything, given how Logging works now, so I left it removed. However, I also removed the particular log message that ended up playing a part in this problem anyway, maybe being paranoid, to make sure this type of problem can't happen even with how the current locking works in logging initialization. ## How was this patch tested? Jenkins tests Author: Sean OwenCloses #14069 from srowen/SPARK-16379. (cherry picked from commit a8f89df3b391e7a3fa9f73d9ec730d6eaa95bb09) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88be66b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88be66b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88be66b9 Branch: refs/heads/branch-2.0 Commit: 88be66b933a7b1f0f71b1eb6c88bf01ecbf1923c Parents: d7926da Author: Sean Owen Authored: Wed Jul 6 13:36:07 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 13:36:15 2016 -0700 -- .../scala/org/apache/spark/internal/Logging.scala | 14 ++ .../mesos/MesosCoarseGrainedSchedulerBackend.scala| 1 - 2 files changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88be66b9/core/src/main/scala/org/apache/spark/internal/Logging.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index c51050c..66a0cfe 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -32,10 +32,7 @@ private[spark] trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine - @transient lazy val log: Logger = { -initializeLogIfNecessary(false) -LoggerFactory.getLogger(logName) - } + @transient private var log_ : Logger = null // Method to get the logger name for this object protected def logName = { @@ -43,6 +40,15 @@ private[spark] trait Logging { this.getClass.getName.stripSuffix("$") } + // Method to get or create the logger for this object + protected def log: Logger = { +if (log_ == null) { + initializeLogIfNecessary(false) + log_ = LoggerFactory.getLogger(logName) +} +log_ + } + // Log methods that take only a String protected def logInfo(msg: => String) { if (log.isInfoEnabled) log.info(msg) http://git-wip-us.apache.org/repos/asf/spark/blob/88be66b9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e88e4ad..99e6d39 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -244,7 +244,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { appId = frameworkId.getValue mesosExternalShuffleClient.foreach(_.init(appId)) -logInfo("Registered as framework ID " + appId) markRegistered() }
spark git commit: [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds
Repository: spark Updated Branches: refs/heads/branch-2.0 2465f0728 -> d7926da5e [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds ## What changes were proposed in this pull request? "test big model load / save" in Word2VecSuite, lately resulted into OOM. Therefore we decided to make the partitioning adaptive (not based on spark default "spark.kryoserializer.buffer.max" conf) and then testing it using a small buffer size in order to trigger partitioning without allocating too much memory for the test. ## How was this patch tested? It was tested running the following unit test: org.apache.spark.mllib.feature.Word2VecSuite Author: tmnd1991Closes #13509 from tmnd1991/SPARK-15740. (cherry picked from commit 040f6f9f468f153e4c4db78c26ced0299245fb6f) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7926da5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7926da5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7926da5 Branch: refs/heads/branch-2.0 Commit: d7926da5e72ee2015e3ebe39a5fd0b322e9d1334 Parents: 2465f07 Author: tmnd1991 Authored: Wed Jul 6 12:56:26 2016 -0700 Committer: Joseph K. Bradley Committed: Wed Jul 6 12:56:51 2016 -0700 -- .../apache/spark/mllib/feature/Word2Vec.scala | 16 +++-- .../spark/mllib/feature/Word2VecSuite.scala | 25 +--- 2 files changed, 31 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 2f52825..f2211df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] { ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) - // We want to partition the model in partitions of size 32MB - val partitionSize = (1L << 25) + // We want to partition the model in partitions smaller than + // spark.kryoserializer.buffer.max + val bufferSize = Utils.byteStringAsBytes( +spark.conf.get("spark.kryoserializer.buffer.max", "64m")) // We calculate the approximate size of the model - // We only calculate the array size, not considering - // the string size, the formula is: - // floatSize * numWords * vectorSize - val approxSize = 4L * numWords * vectorSize - val nPartitions = ((approxSize / partitionSize) + 1).toInt + // We only calculate the array size, considering an + // average string size of 15 bytes, the formula is: + // (floatSize * vectorSize + 15) * numWords + val approxSize = (4L * vectorSize + 15) * numWords + val nPartitions = ((approxSize / bufferSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index c9fb976..22de4c4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } - ignore("big model load / save") { -// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 -val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*) + test("big model load / save") { +// backupping old values +val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m") +val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k") + +// setting test values to trigger partitioning +spark.conf.set("spark.kryoserializer.buffer", "50b") +spark.conf.set("spark.kryoserializer.buffer.max", "50b") + +// create a model bigger than 50 Bytes +val word2VecMap = Map((0 to
spark git commit: [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds
Repository: spark Updated Branches: refs/heads/master 4f8ceed59 -> 040f6f9f4 [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds ## What changes were proposed in this pull request? "test big model load / save" in Word2VecSuite, lately resulted into OOM. Therefore we decided to make the partitioning adaptive (not based on spark default "spark.kryoserializer.buffer.max" conf) and then testing it using a small buffer size in order to trigger partitioning without allocating too much memory for the test. ## How was this patch tested? It was tested running the following unit test: org.apache.spark.mllib.feature.Word2VecSuite Author: tmnd1991Closes #13509 from tmnd1991/SPARK-15740. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/040f6f9f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/040f6f9f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/040f6f9f Branch: refs/heads/master Commit: 040f6f9f468f153e4c4db78c26ced0299245fb6f Parents: 4f8ceed Author: tmnd1991 Authored: Wed Jul 6 12:56:26 2016 -0700 Committer: Joseph K. Bradley Committed: Wed Jul 6 12:56:26 2016 -0700 -- .../apache/spark/mllib/feature/Word2Vec.scala | 16 +++-- .../spark/mllib/feature/Word2VecSuite.scala | 25 +--- 2 files changed, 31 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/040f6f9f/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 2f52825..f2211df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] { ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) - // We want to partition the model in partitions of size 32MB - val partitionSize = (1L << 25) + // We want to partition the model in partitions smaller than + // spark.kryoserializer.buffer.max + val bufferSize = Utils.byteStringAsBytes( +spark.conf.get("spark.kryoserializer.buffer.max", "64m")) // We calculate the approximate size of the model - // We only calculate the array size, not considering - // the string size, the formula is: - // floatSize * numWords * vectorSize - val approxSize = 4L * numWords * vectorSize - val nPartitions = ((approxSize / partitionSize) + 1).toInt + // We only calculate the array size, considering an + // average string size of 15 bytes, the formula is: + // (floatSize * vectorSize + 15) * numWords + val approxSize = (4L * vectorSize + 15) * numWords + val nPartitions = ((approxSize / bufferSize) + 1).toInt val dataArray = model.toSeq.map { case (w, v) => Data(w, v) } spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path)) } http://git-wip-us.apache.org/repos/asf/spark/blob/040f6f9f/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index c9fb976..22de4c4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { } - ignore("big model load / save") { -// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25 -val word2VecMap = Map((0 to 9000).map(i => s"$i" -> Array.fill(1000)(0.1f)): _*) + test("big model load / save") { +// backupping old values +val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", "64m") +val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", "64k") + +// setting test values to trigger partitioning +spark.conf.set("spark.kryoserializer.buffer", "50b") +spark.conf.set("spark.kryoserializer.buffer.max", "50b") + +// create a model bigger than 50 Bytes +val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): _*) val model = new Word2VecModel(word2VecMap) +// est. size of this model, given
spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet
Repository: spark Updated Branches: refs/heads/branch-2.0 03f336d89 -> 2465f0728 [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet ## What changes were proposed in this pull request? Currently, if there is a schema as below: ``` root |-- _1: struct (nullable = true) ||-- _1: integer (nullable = true) ``` and if we execute the codes below: ```scala df.filter("_1 IS NOT NULL").count() ``` This pushes down a filter although this filter is being applied to `StructType`.(If my understanding is correct, Spark does not pushes down filters for those). The reason is, `ParquetFilters.getFieldMap` produces results below: ``` (_1,StructType(StructField(_1,IntegerType,true))) (_1,IntegerType) ``` and then it becomes a `Map` ``` (_1,IntegerType) ``` Now, because of ` lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`. So, Parquet filter2 produces incorrect results, for example, the codes below: ``` df.filter("_1 IS NOT NULL").count() ``` produces always 0. This PR prevents this by not finding nested fields. ## How was this patch tested? Unit test in `ParquetFilterSuite`. Author: hyukjinkwonCloses #14067 from HyukjinKwon/SPARK-16371. (cherry picked from commit 4f8ceed59367319300e4bfa5b957c387be81ffa3) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2465f072 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2465f072 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2465f072 Branch: refs/heads/branch-2.0 Commit: 2465f0728e95109ab851ab09b5badd697928ba2b Parents: 03f336d Author: hyukjinkwon Authored: Wed Jul 6 12:42:16 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 12:42:37 2016 -0700 -- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 5 - .../datasources/parquet/ParquetFilterSuite.scala | 14 ++ 3 files changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f38bf81..8cbdaeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -436,7 +436,7 @@ private[sql] class ParquetOutputWriterFactory( ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) // We want to clear this temporary metadata from saving into Parquet file. -// This metadata is only useful for detecting optional columns when pushdowning filters. +// This metadata is only useful for detecting optional columns when pushing down filters. val dataSchemaToWrite = StructType.removeMetadata( StructType.metadataKeyForOptionalField, dataSchema).asInstanceOf[StructType] http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 95afdc7..70ae829 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -215,10 +215,13 @@ private[sql] object ParquetFilters { */ private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. fields.filter { f => !f.metadata.contains(StructType.metadataKeyForOptionalField) || !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType) ++
spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet
Repository: spark Updated Branches: refs/heads/master 480357cc6 -> 4f8ceed59 [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet ## What changes were proposed in this pull request? Currently, if there is a schema as below: ``` root |-- _1: struct (nullable = true) ||-- _1: integer (nullable = true) ``` and if we execute the codes below: ```scala df.filter("_1 IS NOT NULL").count() ``` This pushes down a filter although this filter is being applied to `StructType`.(If my understanding is correct, Spark does not pushes down filters for those). The reason is, `ParquetFilters.getFieldMap` produces results below: ``` (_1,StructType(StructField(_1,IntegerType,true))) (_1,IntegerType) ``` and then it becomes a `Map` ``` (_1,IntegerType) ``` Now, because of ` lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`. So, Parquet filter2 produces incorrect results, for example, the codes below: ``` df.filter("_1 IS NOT NULL").count() ``` produces always 0. This PR prevents this by not finding nested fields. ## How was this patch tested? Unit test in `ParquetFilterSuite`. Author: hyukjinkwonCloses #14067 from HyukjinKwon/SPARK-16371. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8ceed5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8ceed5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8ceed5 Branch: refs/heads/master Commit: 4f8ceed59367319300e4bfa5b957c387be81ffa3 Parents: 480357c Author: hyukjinkwon Authored: Wed Jul 6 12:42:16 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 12:42:16 2016 -0700 -- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 5 - .../datasources/parquet/ParquetFilterSuite.scala | 14 ++ 3 files changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9833620..76d7f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory( ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) // We want to clear this temporary metadata from saving into Parquet file. -// This metadata is only useful for detecting optional columns when pushdowning filters. +// This metadata is only useful for detecting optional columns when pushing down filters. val dataSchemaToWrite = StructType.removeMetadata( StructType.metadataKeyForOptionalField, dataSchema).asInstanceOf[StructType] http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7213a38..e0a113a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -185,10 +185,13 @@ private[sql] object ParquetFilters { */ private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { case StructType(fields) => + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. fields.filter { f => !f.metadata.contains(StructType.metadataKeyForOptionalField) || !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) } + }.map(f => f.name -> f.dataType) case _ => Array.empty[(String, DataType)]
spark git commit: [SPARK-16304] LinkageError should not crash Spark executor
Repository: spark Updated Branches: refs/heads/master 4e14199ff -> 480357cc6 [SPARK-16304] LinkageError should not crash Spark executor ## What changes were proposed in this pull request? This patch updates the failure handling logic so Spark executor does not crash when seeing LinkageError. ## How was this patch tested? Added an end-to-end test in FailureSuite. Author: petermaxleeCloses #13982 from petermaxlee/SPARK-16304. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/480357cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/480357cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/480357cc Branch: refs/heads/master Commit: 480357cc6d71c682fe703611c71c1e6a36e6ce9a Parents: 4e14199 Author: petermaxlee Authored: Wed Jul 6 10:46:22 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:46:22 2016 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 +- core/src/test/scala/org/apache/spark/FailureSuite.scala | 9 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/480357cc/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 156cf17..298e624 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1881,7 +1881,11 @@ private[spark] object Utils extends Logging { /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ def isFatalError(e: Throwable): Boolean = { e match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + case NonFatal(_) | + _: InterruptedException | + _: NotImplementedError | + _: ControlThrowable | + _: LinkageError => false case _ => true http://git-wip-us.apache.org/repos/asf/spark/blob/480357cc/core/src/test/scala/org/apache/spark/FailureSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 132f636..d805c67 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -253,6 +253,15 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { rdd.count() } + test("SPARK-16304: Link error should not crash executor") { +sc = new SparkContext("local[1,2]", "test") +intercept[SparkException] { + sc.parallelize(1 to 2).foreach { i => +throw new LinkageError() + } +} + } + // TODO: Need to add tests with shuffle fetch failures. } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation
Repository: spark Updated Branches: refs/heads/master b1310425b -> 4e14199ff [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation ## What changes were proposed in this pull request? This PR fixes wrongly formatted examples in PySpark documentation as below: - **`SparkSession`** - **Before** ![2016-07-06 11 34 41](https://cloud.githubusercontent.com/assets/6477701/16605847/ae939526-436d-11e6-8ab8-6ad578362425.png) - **After** ![2016-07-06 11 33 56](https://cloud.githubusercontent.com/assets/6477701/16605845/ace9ee78-436d-11e6-8923-b76d4fc3e7c3.png) - **`Builder`** - **Before** ![2016-07-06 11 34 44](https://cloud.githubusercontent.com/assets/6477701/16605844/aba60dbc-436d-11e6-990a-c87bc0281c6b.png) - **After** ![2016-07-06 1 26 37](https://cloud.githubusercontent.com/assets/6477701/16607562/586704c0-437d-11e6-9483-e0af93d8f74e.png) This PR also fixes several similar instances across the documentation in `sql` PySpark module. ## How was this patch tested? N/A Author: hyukjinkwonCloses #14063 from HyukjinKwon/minor-pyspark-builder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e14199f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e14199f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e14199f Branch: refs/heads/master Commit: 4e14199ff740ea186eb2cec2e5cf901b58c5f90e Parents: b131042 Author: hyukjinkwon Authored: Wed Jul 6 10:45:51 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:45:51 2016 -0700 -- python/pyspark/mllib/clustering.py | 14 +++--- python/pyspark/sql/dataframe.py| 8 python/pyspark/sql/functions.py| 8 python/pyspark/sql/group.py| 2 ++ python/pyspark/sql/session.py | 13 +++-- python/pyspark/sql/types.py| 4 ++-- 6 files changed, 26 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 93a0b64..c38c543 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -571,14 +571,14 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): >>> import math >>> def genCircle(r, n): -... points = [] -... for i in range(0, n): -... theta = 2.0 * math.pi * i / n -... points.append((r * math.cos(theta), r * math.sin(theta))) -... return points +... points = [] +... for i in range(0, n): +... theta = 2.0 * math.pi * i / n +... points.append((r * math.cos(theta), r * math.sin(theta))) +... return points >>> def sim(x, y): -... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) -... return math.exp(-dist2 / 2.0) +... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) +... return math.exp(-dist2 / 2.0) >>> r1 = 1.0 >>> n1 = 10 >>> r2 = 4.0 http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e44b01b..a0ac7a9 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1045,10 +1045,10 @@ class DataFrame(object): :func:`drop_duplicates` is an alias for :func:`dropDuplicates`. >>> from pyspark.sql import Row ->>> df = sc.parallelize([ \ -Row(name='Alice', age=5, height=80), \ -Row(name='Alice', age=5, height=80), \ -Row(name='Alice', age=10, height=80)]).toDF() +>>> df = sc.parallelize([ \\ +... Row(name='Alice', age=5, height=80), \\ +... Row(name='Alice', age=5, height=80), \\ +... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +---+--+-+ |age|height| name| http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7a73451..92d709e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1550,8 +1550,8 @@ def translate(srcCol, matching, replace): The translate will happen when any character in the string matching with the character in the `matching`. ->>>
spark git commit: [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation
Repository: spark Updated Branches: refs/heads/branch-2.0 091cd5f26 -> 03f336d89 [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation ## What changes were proposed in this pull request? This PR fixes wrongly formatted examples in PySpark documentation as below: - **`SparkSession`** - **Before** ![2016-07-06 11 34 41](https://cloud.githubusercontent.com/assets/6477701/16605847/ae939526-436d-11e6-8ab8-6ad578362425.png) - **After** ![2016-07-06 11 33 56](https://cloud.githubusercontent.com/assets/6477701/16605845/ace9ee78-436d-11e6-8923-b76d4fc3e7c3.png) - **`Builder`** - **Before** ![2016-07-06 11 34 44](https://cloud.githubusercontent.com/assets/6477701/16605844/aba60dbc-436d-11e6-990a-c87bc0281c6b.png) - **After** ![2016-07-06 1 26 37](https://cloud.githubusercontent.com/assets/6477701/16607562/586704c0-437d-11e6-9483-e0af93d8f74e.png) This PR also fixes several similar instances across the documentation in `sql` PySpark module. ## How was this patch tested? N/A Author: hyukjinkwonCloses #14063 from HyukjinKwon/minor-pyspark-builder. (cherry picked from commit 4e14199ff740ea186eb2cec2e5cf901b58c5f90e) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03f336d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03f336d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03f336d8 Branch: refs/heads/branch-2.0 Commit: 03f336d8921e1f22ee4d1f6fa8869163b1f29ea9 Parents: 091cd5f Author: hyukjinkwon Authored: Wed Jul 6 10:45:51 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:45:56 2016 -0700 -- python/pyspark/mllib/clustering.py | 14 +++--- python/pyspark/sql/dataframe.py| 8 python/pyspark/sql/functions.py| 8 python/pyspark/sql/group.py| 2 ++ python/pyspark/sql/session.py | 13 +++-- python/pyspark/sql/types.py| 4 ++-- 6 files changed, 26 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 93a0b64..c38c543 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -571,14 +571,14 @@ class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader): >>> import math >>> def genCircle(r, n): -... points = [] -... for i in range(0, n): -... theta = 2.0 * math.pi * i / n -... points.append((r * math.cos(theta), r * math.sin(theta))) -... return points +... points = [] +... for i in range(0, n): +... theta = 2.0 * math.pi * i / n +... points.append((r * math.cos(theta), r * math.sin(theta))) +... return points >>> def sim(x, y): -... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) -... return math.exp(-dist2 / 2.0) +... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) +... return math.exp(-dist2 / 2.0) >>> r1 = 1.0 >>> n1 = 10 >>> r2 = 4.0 http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e6e7029..c7d704a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1033,10 +1033,10 @@ class DataFrame(object): :func:`drop_duplicates` is an alias for :func:`dropDuplicates`. >>> from pyspark.sql import Row ->>> df = sc.parallelize([ \ -Row(name='Alice', age=5, height=80), \ -Row(name='Alice', age=5, height=80), \ -Row(name='Alice', age=10, height=80)]).toDF() +>>> df = sc.parallelize([ \\ +... Row(name='Alice', age=5, height=80), \\ +... Row(name='Alice', age=5, height=80), \\ +... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +---+--+-+ |age|height| name| http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 15cefc8..1feca6e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1550,8 +1550,8 @@ def translate(srcCol, matching, replace): The translate will
spark git commit: [DOC][SQL] update out-of-date code snippets using SQLContext in all documents.
Repository: spark Updated Branches: refs/heads/master 23eff5e51 -> b1310425b [DOC][SQL] update out-of-date code snippets using SQLContext in all documents. ## What changes were proposed in this pull request? I search the whole documents directory using SQLContext, and update the following places: - docs/configuration.md, sparkR code snippets. - docs/streaming-programming-guide.md, several example code. ## How was this patch tested? N/A Author: WeichenXuCloses #14025 from WeichenXu123/WIP_SQLContext_update. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1310425 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1310425 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1310425 Branch: refs/heads/master Commit: b1310425b30cbd711e4834d65a0accb3c5a8403a Parents: 23eff5e Author: WeichenXu Authored: Wed Jul 6 10:41:48 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:41:48 2016 -0700 -- docs/configuration.md | 4 ++-- docs/streaming-programming-guide.md | 39 +--- 2 files changed, 23 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1310425/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index cee59cf..1e95b86 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1564,8 +1564,8 @@ spark.sql("SET -v").show(n=200, truncate=False) {% highlight r %} -# sqlContext is an existing sqlContext. -properties <- sql(sqlContext, "SET -v") +sparkR.session() +properties <- sql("SET -v") showDF(properties, numRows = 200, truncate = FALSE) {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/b1310425/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index db06a65..2ee3b80 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1534,7 +1534,7 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma *** ## DataFrame and SQL Operations -You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. +You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. @@ -1546,9 +1546,9 @@ val words: DStream[String] = ... words.foreachRDD { rdd => - // Get the singleton instance of SQLContext - val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) - import sqlContext.implicits._ + // Get the singleton instance of SparkSession + val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() + import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") @@ -1558,7 +1558,7 @@ words.foreachRDD { rdd => // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = -sqlContext.sql("select word, count(*) as total from words group by word") +spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } @@ -1593,8 +1593,8 @@ words.foreachRDD( @Override public Void call(JavaRDD rdd, Time time) { - // Get the singleton instance of SQLContext - SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); + // Get the singleton instance of SparkSession + SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD rowRDD =
spark git commit: [DOC][SQL] update out-of-date code snippets using SQLContext in all documents.
Repository: spark Updated Branches: refs/heads/branch-2.0 e956bd775 -> 091cd5f26 [DOC][SQL] update out-of-date code snippets using SQLContext in all documents. ## What changes were proposed in this pull request? I search the whole documents directory using SQLContext, and update the following places: - docs/configuration.md, sparkR code snippets. - docs/streaming-programming-guide.md, several example code. ## How was this patch tested? N/A Author: WeichenXuCloses #14025 from WeichenXu123/WIP_SQLContext_update. (cherry picked from commit b1310425b30cbd711e4834d65a0accb3c5a8403a) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/091cd5f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/091cd5f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/091cd5f2 Branch: refs/heads/branch-2.0 Commit: 091cd5f265166512a450333946c62c3eb3440e79 Parents: e956bd7 Author: WeichenXu Authored: Wed Jul 6 10:41:48 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:41:54 2016 -0700 -- docs/configuration.md | 4 ++-- docs/streaming-programming-guide.md | 39 +--- 2 files changed, 23 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/091cd5f2/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index cee59cf..1e95b86 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1564,8 +1564,8 @@ spark.sql("SET -v").show(n=200, truncate=False) {% highlight r %} -# sqlContext is an existing sqlContext. -properties <- sql(sqlContext, "SET -v") +sparkR.session() +properties <- sql("SET -v") showDF(properties, numRows = 200, truncate = FALSE) {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/091cd5f2/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index db06a65..2ee3b80 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1534,7 +1534,7 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma *** ## DataFrame and SQL Operations -You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. +You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. @@ -1546,9 +1546,9 @@ val words: DStream[String] = ... words.foreachRDD { rdd => - // Get the singleton instance of SQLContext - val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) - import sqlContext.implicits._ + // Get the singleton instance of SparkSession + val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() + import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") @@ -1558,7 +1558,7 @@ words.foreachRDD { rdd => // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = -sqlContext.sql("select word, count(*) as total from words group by word") +spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } @@ -1593,8 +1593,8 @@ words.foreachRDD( @Override public Void call(JavaRDD rdd, Time time) { - // Get the singleton instance of SQLContext - SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); + // Get the singleton instance of SparkSession + SparkSession spark =
spark git commit: [SPARK-15979][SQL] Renames CatalystWriteSupport to ParquetWriteSupport
Repository: spark Updated Branches: refs/heads/master 478b71d02 -> 23eff5e51 [SPARK-15979][SQL] Renames CatalystWriteSupport to ParquetWriteSupport ## What changes were proposed in this pull request? PR #13696 renamed various Parquet support classes but left `CatalystWriteSupport` behind. This PR is renames it as a follow-up. ## How was this patch tested? N/A. Author: Cheng LianCloses #14070 from liancheng/spark-15979-follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23eff5e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23eff5e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23eff5e5 Branch: refs/heads/master Commit: 23eff5e512df5710ea6591a3fce321b53eb3fb0b Parents: 478b71d Author: Cheng Lian Authored: Wed Jul 6 10:36:45 2016 -0700 Committer: Reynold Xin Committed: Wed Jul 6 10:36:45 2016 -0700 -- .../parquet/CatalystWriteSupport.scala | 437 --- .../datasources/parquet/ParquetFileFormat.scala | 14 +- .../parquet/ParquetWriteSupport.scala | 437 +++ 3 files changed, 444 insertions(+), 444 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23eff5e5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala deleted file mode 100644 index 00e1bca..000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import java.nio.{ByteBuffer, ByteOrder} -import java.util - -import scala.collection.JavaConverters.mapAsJavaMapConverter - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.io.api.{Binary, RecordConsumer} - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.SpecializedGetters -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ - -/** - * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet - * messages. This class can write Parquet data in two modes: - * - * - Standard mode: Parquet data are written in standard format defined in parquet-format spec. - * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior. - * - * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyFormat`. The value - * of this option is propagated to this class by the `init()` method and its Hadoop configuration - * argument. - */ -private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging { - // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer. - // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access - // data in `ArrayData` without the help of `SpecificMutableRow`. - private type ValueWriter = (SpecializedGetters, Int) => Unit - - // Schema of the `InternalRow`s to be written - private var schema: StructType = _ - - // `ValueWriter`s for all fields of the schema - private var rootFieldWriters: Seq[ValueWriter] = _ - -
spark git commit: [SPARK-15591][WEBUI] Paginate Stage Table in Stages tab
Repository: spark Updated Branches: refs/heads/master 21eadd1d8 -> 478b71d02 [SPARK-15591][WEBUI] Paginate Stage Table in Stages tab ## What changes were proposed in this pull request? This patch adds pagination support for the Stage Tables in the Stage tab. Pagination is provided for all of the four Job Tables (active, pending, completed, and failed). Besides, the paged stage tables are also used in JobPage (the detail page for one job) and PoolPage. Interactions (jumping, sorting, and setting page size) for paged tables are also included. ## How was this patch tested? Tested manually by using checking the Web UI after completing and failing hundreds of jobs. Same as the testings for [Paginate Job Table in Jobs tab](https://github.com/apache/spark/pull/13620). This shows the pagination for completed stages: ![paged stage table](https://cloud.githubusercontent.com/assets/5558370/16125696/5804e35e-3427-11e6-8923-5c5948982648.png) Author: Tao LinCloses #13708 from nblintao/stageTable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/478b71d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/478b71d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/478b71d0 Branch: refs/heads/master Commit: 478b71d028107d42fbb6d1bd300b86efbe0dcf7d Parents: 21eadd1 Author: Tao Lin Authored: Wed Jul 6 10:28:05 2016 -0700 Committer: Shixiong Zhu Committed: Wed Jul 6 10:28:05 2016 -0700 -- .../scala/org/apache/spark/ui/PagedTable.scala | 1 + .../apache/spark/ui/jobs/AllStagesPage.scala| 25 +- .../org/apache/spark/ui/jobs/JobPage.scala | 24 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 15 +- .../org/apache/spark/ui/jobs/StageTable.scala | 517 +++ 5 files changed, 441 insertions(+), 141 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/PagedTable.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 9b6ed8c..2a7c16b 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] { Splitter .on('&') .trimResults() +.omitEmptyStrings() .withKeyValueSeparator("=") .split(querystring) .asScala http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index e75f1c5..cba8f82 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { val numCompletedStages = listener.numCompletedStages val failedStages = listener.failedStages.reverse.toSeq val numFailedStages = listener.numFailedStages - val now = System.currentTimeMillis + val subPath = "stages" val activeStagesTable = -new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = parent.killEnabled) +new StageTableBase(request, activeStages, "activeStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) val pendingStagesTable = -new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse, - parent.basePath, parent.progressListener, isFairScheduler = parent.isFairScheduler, - killEnabled = false) +new StageTableBase(request, pendingStages, "pendingStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) val completedStagesTable = -new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath, - parent.progressListener, isFairScheduler = parent.isFairScheduler, killEnabled = false) +new StageTableBase(request, completedStages, "completedStage", parent.basePath, subPath, + parent.progressListener, parent.isFairScheduler, + killEnabled = false,
spark git commit: [MINOR][CORE][1.6-BACKPORT] Fix display wrong free memory size in the log
Repository: spark Updated Branches: refs/heads/branch-1.6 76781950f -> 2588776ad [MINOR][CORE][1.6-BACKPORT] Fix display wrong free memory size in the log ## What changes were proposed in this pull request? Free memory size displayed in the log is wrong (used memory), fix to make it correct. Backported to 1.6. ## How was this patch tested? N/A Author: jerryshaoCloses #14043 from jerryshao/memory-log-fix-1.6-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2588776a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2588776a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2588776a Branch: refs/heads/branch-1.6 Commit: 2588776ad3d91e39300d61c8a12dc0803c28e866 Parents: 7678195 Author: jerryshao Authored: Wed Jul 6 14:49:21 2016 +0100 Committer: Sean Owen Committed: Wed Jul 6 14:49:21 2016 +0100 -- core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2588776a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 17aae6e..aed0da9 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -393,7 +393,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } val valuesOrBytes = if (deserialized) "values" else "bytes" logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed))) + blockId, valuesOrBytes, Utils.bytesToString(size), + Utils.bytesToString(maxMemory - blocksMemoryUsed))) } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails
Repository: spark Updated Branches: refs/heads/branch-2.0 d5d2457e4 -> e956bd775 [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails What changes were proposed in this pull request? In `CREATE TABLE AS SELECT`, if the `SELECT` query failed, the table should not exist. For example, ```SQL CREATE TABLE tab STORED AS TEXTFILE SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b ``` The above query failed as expected but an empty table `t` is created. This PR is to drop the created table when hitting any non-fatal exception. How was this patch tested? Added a test case to verify the behavior Author: gatorsmileCloses #13926 from gatorsmile/dropTableAfterException. (cherry picked from commit 21eadd1d8cbf029197e73ffca1cba54d5a890c01) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e956bd77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e956bd77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e956bd77 Branch: refs/heads/branch-2.0 Commit: e956bd7750882ce259a278e9eac7f64b4fb76286 Parents: d5d2457 Author: gatorsmile Authored: Wed Jul 6 21:43:55 2016 +0800 Committer: Wenchen Fan Committed: Wed Jul 6 21:45:30 2016 +0800 -- .../execution/CreateHiveTableAsSelectCommand.scala | 13 +++-- .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 +++ 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e956bd77/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index b809938..15a5d79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import scala.util.control.NonFatal + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} @@ -87,8 +89,15 @@ case class CreateHiveTableAsSelectCommand( throw new AnalysisException(s"$tableIdentifier already exists.") } } else { - sparkSession.sessionState.executePlan(InsertIntoTable( -metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + try { +sparkSession.sessionState.executePlan(InsertIntoTable( + metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + } catch { +case NonFatal(e) => + // drop the created table. + sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true) + throw e + } } Seq.empty[Row] http://git-wip-us.apache.org/repos/asf/spark/blob/e956bd77/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 89f69c8..9d3c4cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -554,6 +554,21 @@ class HiveDDLSuite } } + test("Create Cataloged Table As Select - Drop Table After Runtime Exception") { +withTable("tab") { + intercept[RuntimeException] { +sql( + """ +|CREATE TABLE tab +|STORED AS TEXTFILE +|SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b + """.stripMargin) + } + // After hitting runtime exception, we should drop the created table. + assert(!spark.sessionState.catalog.tableExists(TableIdentifier("tab"))) +} + } + test("desc table for data source table") { withTable("tab1") { val tabName = "tab1" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails
Repository: spark Updated Branches: refs/heads/master 909c6d812 -> 21eadd1d8 [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails What changes were proposed in this pull request? In `CREATE TABLE AS SELECT`, if the `SELECT` query failed, the table should not exist. For example, ```SQL CREATE TABLE tab STORED AS TEXTFILE SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b ``` The above query failed as expected but an empty table `t` is created. This PR is to drop the created table when hitting any non-fatal exception. How was this patch tested? Added a test case to verify the behavior Author: gatorsmileCloses #13926 from gatorsmile/dropTableAfterException. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21eadd1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21eadd1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21eadd1d Branch: refs/heads/master Commit: 21eadd1d8cbf029197e73ffca1cba54d5a890c01 Parents: 909c6d8 Author: gatorsmile Authored: Wed Jul 6 21:43:55 2016 +0800 Committer: Wenchen Fan Committed: Wed Jul 6 21:43:55 2016 +0800 -- .../execution/CreateHiveTableAsSelectCommand.scala | 13 +++-- .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 +++ 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21eadd1d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index b809938..15a5d79 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import scala.util.control.NonFatal + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} @@ -87,8 +89,15 @@ case class CreateHiveTableAsSelectCommand( throw new AnalysisException(s"$tableIdentifier already exists.") } } else { - sparkSession.sessionState.executePlan(InsertIntoTable( -metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + try { +sparkSession.sessionState.executePlan(InsertIntoTable( + metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + } catch { +case NonFatal(e) => + // drop the created table. + sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true) + throw e + } } Seq.empty[Row] http://git-wip-us.apache.org/repos/asf/spark/blob/21eadd1d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 89f69c8..9d3c4cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -554,6 +554,21 @@ class HiveDDLSuite } } + test("Create Cataloged Table As Select - Drop Table After Runtime Exception") { +withTable("tab") { + intercept[RuntimeException] { +sql( + """ +|CREATE TABLE tab +|STORED AS TEXTFILE +|SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b + """.stripMargin) + } + // After hitting runtime exception, we should drop the created table. + assert(!spark.sessionState.catalog.tableExists(TableIdentifier("tab"))) +} + } + test("desc table for data source table") { withTable("tab1") { val tabName = "tab1" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached
Repository: spark Updated Branches: refs/heads/branch-2.0 25006c8bc -> d5d2457e4 [SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached This PR backports your fix (https://github.com/apache/spark/pull/13818) to branch 2.0. This PR addresses [SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968). ## What changes were proposed in this pull request? The `getCached` method of [HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala) computes `pathsInMetastore` from the metastore relation's catalog table. This only returns the table base path, which is incomplete/inaccurate for a nonempty partitioned table. As a result, cached lookups on nonempty partitioned tables always miss. Rather than get `pathsInMetastore` from metastoreRelation.catalogTable.storage.locationUri.toSeq I modified the `getCached` method to take a `pathsInMetastore` argument. Calls to this method pass in the paths computed from calls to the Hive metastore. This is how `getCached` was implemented in Spark 1.5: https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444. I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from the SQL session catalog. ## How was this patch tested? I've added a new unit test to `parquetSuites.scala`: SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached relation Note that the only difference between this new test and the one above it in the file is that the new test populates its partitioned table with a single value, while the existing test leaves the table empty. This reveals a subtle, unexpected hole in test coverage present before this patch. Note I also modified a different but related unit test in `parquetSuites.scala`: SPARK-15248: explicitly added partitions should be readable This unit test asserts that Spark SQL should return data from a table partition which has been placed there outside a metastore query immediately after it is added. I changed the test so that, instead of adding the data as a parquet file saved in the partition's location, the data is added through a SQL `INSERT` query. I made this change because I could find no way to efficiently support partitioned table caching without failing that test. In addition to my primary motivation, I can offer a few reasons I believe this is an acceptable weakening of that test. First, it still validates a fix for [SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for which it was written. Second, the assertion made is stronger than that required for non-partitioned tables. If you write data to the storage location of a non-partitioned metastore table without using a proper SQL DML query, a subsequent call to show that data will not return it. I believe this is an intentional limitation put in place to make table caching feasible, but I'm only speculating. Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. In our environment, where we have tables with 10's of thousands of partitions, the difference between using a cached relation versus a new one is a matter of seconds versus minutes. Caching partitioned table metadata vastly improves the usability of Spark SQL for these cases. Author: Reynold XinAuthor: Michael Allman Closes #14064 from yhuai/spark-15968-branch-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5d2457e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5d2457e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5d2457e Branch: refs/heads/branch-2.0 Commit: d5d2457e41661a3402a8258026856454222a2f54 Parents: 25006c8 Author: Reynold Xin Authored: Wed Jul 6 21:35:56 2016 +0800 Committer: Wenchen Fan Committed: Wed Jul 6 21:35:56 2016 +0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 16 - .../hive/execution/InsertIntoHiveTable.scala| 1 + .../apache/spark/sql/hive/parquetSuites.scala | 61 ++-- 3 files changed, 59 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d6c5325..789f94a 100644 ---
spark git commit: [MINOR][BUILD] Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors
Repository: spark Updated Branches: refs/heads/branch-1.6 4fcb88843 -> 76781950f [MINOR][BUILD] Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors ## What changes were proposed in this pull request? Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors ## How was this patch tested? Jenkins Author: Sean OwenCloses #14066 from srowen/Maven339Branch16. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76781950 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76781950 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76781950 Branch: refs/heads/branch-1.6 Commit: 76781950fd500ace0f939951fc7a94a58aca87c4 Parents: 4fcb888 Author: Sean Owen Authored: Wed Jul 6 12:27:17 2016 +0100 Committer: Sean Owen Committed: Wed Jul 6 12:27:17 2016 +0100 -- build/mvn | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76781950/build/mvn -- diff --git a/build/mvn b/build/mvn index c2b1427..8598386 100755 --- a/build/mvn +++ b/build/mvn @@ -69,7 +69,7 @@ install_app() { # Install maven under the build/ folder install_mvn() { - local MVN_VERSION="3.3.3" + local MVN_VERSION="3.3.9" install_app \ "https://www.apache.org/dyn/closer.lua?action=download=/maven/maven-3/${MVN_VERSION}/binaries; \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16307][ML] Add test to verify the predicted variances of a DT on toy data
Repository: spark Updated Branches: refs/heads/master 7e28fabdf -> 909c6d812 [SPARK-16307][ML] Add test to verify the predicted variances of a DT on toy data ## What changes were proposed in this pull request? The current tests assumes that `impurity.calculate()` returns the variance correctly. It should be better to make the tests independent of this assumption. In other words verify that the variance computed equals the variance computed manually on a small tree. ## How was this patch tested? The patch is a test Author: MechCoderCloses #13981 from MechCoder/dt_variance. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/909c6d81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/909c6d81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/909c6d81 Branch: refs/heads/master Commit: 909c6d812f6ca3a3305e4611a700c8c17905b953 Parents: 7e28fab Author: MechCoder Authored: Wed Jul 6 02:54:44 2016 -0700 Committer: Yanbo Liang Committed: Wed Jul 6 02:54:44 2016 -0700 -- .../regression/DecisionTreeRegressorSuite.scala | 20 .../apache/spark/ml/tree/impl/TreeTests.scala | 12 2 files changed, 32 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/909c6d81/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala index 9afb742..15fa26e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tree.impl.TreeTests import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} @@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite assert(variance === expectedVariance, s"Expected variance $expectedVariance but got $variance.") } + +val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc) +val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, Int], 0) +dt.setMaxDepth(1) + .setMaxBins(6) + .setSeed(0) +val transformVarDF = dt.fit(varianceDF).transform(varianceDF) +val calculatedVariances = transformVarDF.select(dt.getVarianceCol).collect().map { + case Row(variance: Double) => variance +} + +// Since max depth is set to 1, the best split point is that which splits the data +// into (0.0, 1.0, 2.0) and (10.0, 12.0, 14.0). The predicted variance for each +// data point in the left node is 0.667 and for each data point in the right node +// is 2.667 +val expectedVariances = Array(0.667, 0.667, 0.667, 2.667, 2.667, 2.667) +calculatedVariances.zip(expectedVariances).foreach { case (actual, expected) => + assert(actual ~== expected absTol 1e-3) +} } test("Feature importance with toy data") { http://git-wip-us.apache.org/repos/asf/spark/blob/909c6d81/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala index d2fa8d0..c90cb8c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala @@ -183,6 +183,18 @@ private[ml] object TreeTests extends SparkFunSuite { )) /** + * Create some toy data for testing correctness of variance. + */ + def varianceData(sc: SparkContext): RDD[LabeledPoint] = sc.parallelize(Seq( +new LabeledPoint(1.0, Vectors.dense(Array(0.0))), +new LabeledPoint(2.0, Vectors.dense(Array(1.0))), +new LabeledPoint(3.0, Vectors.dense(Array(2.0))), +new LabeledPoint(10.0, Vectors.dense(Array(3.0))), +new LabeledPoint(12.0, Vectors.dense(Array(4.0))), +new LabeledPoint(14.0, Vectors.dense(Array(5.0))) + )) + + /** * Mapping from all Params to valid settings which differ from the defaults. * This is useful for tests which need to exercise all Params, such as save/load. * This excludes input columns to
spark git commit: [SPARK-16388][SQL] Remove spark.sql.nativeView and spark.sql.nativeView.canonical config
Repository: spark Updated Branches: refs/heads/master 5497242c7 -> 7e28fabdf [SPARK-16388][SQL] Remove spark.sql.nativeView and spark.sql.nativeView.canonical config ## What changes were proposed in this pull request? These two configs should always be true after Spark 2.0. This patch removes them from the config list. Note that ideally this should've gone into branch-2.0, but due to the timing of the release we should only merge this in master for Spark 2.1. ## How was this patch tested? Updated test cases. Author: Reynold XinCloses #14061 from rxin/SPARK-16388. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e28fabd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e28fabd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e28fabd Branch: refs/heads/master Commit: 7e28fabdff2da1cc374efbf43372d92ae0cd07aa Parents: 5497242 Author: Reynold Xin Authored: Wed Jul 6 17:40:55 2016 +0800 Committer: Cheng Lian Committed: Wed Jul 6 17:40:55 2016 +0800 -- .../spark/sql/execution/command/views.scala | 39 +--- .../org/apache/spark/sql/internal/SQLConf.scala | 23 --- .../spark/sql/internal/SQLConfSuite.scala | 16 +- .../spark/sql/hive/execution/SQLViewSuite.scala | 206 --- 4 files changed, 106 insertions(+), 178 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7e28fabd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 088f684..007fa46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -149,37 +149,18 @@ case class CreateViewCommand( * SQL based on the analyzed plan, and also creates the proper schema for the view. */ private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { -val viewSQL: String = - if (sparkSession.sessionState.conf.canonicalView) { -val logicalPlan = - if (tableDesc.schema.isEmpty) { -analyzedPlan - } else { -val projectList = analyzedPlan.output.zip(tableDesc.schema).map { - case (attr, col) => Alias(attr, col.name)() -} -sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed - } -new SQLBuilder(logicalPlan).toSQL - } else { -// When user specified column names for view, we should create a project to do the renaming. -// When no column name specified, we still need to create a project to declare the columns -// we need, to make us more robust to top level `*`s. -val viewOutput = { - val columnNames = analyzedPlan.output.map(f => quote(f.name)) - if (tableDesc.schema.isEmpty) { -columnNames.mkString(", ") - } else { -columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map { - case (name, alias) => s"$name AS $alias" -}.mkString(", ") +val viewSQL: String = { + val logicalPlan = +if (tableDesc.schema.isEmpty) { + analyzedPlan +} else { + val projectList = analyzedPlan.output.zip(tableDesc.schema).map { +case (attr, col) => Alias(attr, col.name)() } + sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed } - -val viewText = tableDesc.viewText.get -val viewName = quote(tableDesc.identifier.table) -s"SELECT $viewOutput FROM ($viewText) $viewName" - } + new SQLBuilder(logicalPlan).toSQL +} // Validate the view SQL - make sure we can parse it and analyze it. // If we cannot analyze the generated query, there is probably a bug in SQL generation. http://git-wip-us.apache.org/repos/asf/spark/blob/7e28fabd/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1a9bb6a..5ab0c1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -258,25 +258,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val NATIVE_VIEW =
spark git commit: [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading
Repository: spark Updated Branches: refs/heads/branch-2.0 521fc7186 -> 25006c8bc [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-16249 Change visibility of Object ml.clustering.LDA to public for loading, thus users can invoke LDA.load("path"). ## How was this patch tested? existing ut and manually test for load ( saved with current code) Author: Yuhao YangAuthor: Yuhao Yang Closes #13941 from hhbyyh/ldapublic. (cherry picked from commit 5497242c769b40338bfa57d64f2c64996dfa57e8) Signed-off-by: Yanbo Liang Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25006c8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25006c8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25006c8b Branch: refs/heads/branch-2.0 Commit: 25006c8bcc397c9f070cc5d685ffeb5b8fb0a341 Parents: 521fc71 Author: Yuhao Yang Authored: Wed Jul 6 01:30:47 2016 -0700 Committer: Yanbo Liang Committed: Wed Jul 6 01:31:07 2016 -0700 -- .../main/scala/org/apache/spark/ml/clustering/LDA.scala | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/25006c8b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index b333d59..778cd0f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -880,11 +880,13 @@ class LDA @Since("1.6.0") ( } } - -private[clustering] object LDA extends DefaultParamsReadable[LDA] { +@Since("2.0.0") +object LDA extends DefaultParamsReadable[LDA] { /** Get dataset for spark.mllib LDA */ - def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, OldVector)] = { + private[clustering] def getOldDataset( + dataset: Dataset[_], + featuresCol: String): RDD[(Long, OldVector)] = { dataset .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) @@ -894,6 +896,6 @@ private[clustering] object LDA extends DefaultParamsReadable[LDA] { } } - @Since("1.6.0") + @Since("2.0.0") override def load(path: String): LDA = super.load(path) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading
Repository: spark Updated Branches: refs/heads/master 5f342049c -> 5497242c7 [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading ## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-16249 Change visibility of Object ml.clustering.LDA to public for loading, thus users can invoke LDA.load("path"). ## How was this patch tested? existing ut and manually test for load ( saved with current code) Author: Yuhao YangAuthor: Yuhao Yang Closes #13941 from hhbyyh/ldapublic. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5497242c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5497242c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5497242c Branch: refs/heads/master Commit: 5497242c769b40338bfa57d64f2c64996dfa57e8 Parents: 5f34204 Author: Yuhao Yang Authored: Wed Jul 6 01:30:47 2016 -0700 Committer: Yanbo Liang Committed: Wed Jul 6 01:30:47 2016 -0700 -- .../main/scala/org/apache/spark/ml/clustering/LDA.scala | 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5497242c/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index b333d59..778cd0f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -880,11 +880,13 @@ class LDA @Since("1.6.0") ( } } - -private[clustering] object LDA extends DefaultParamsReadable[LDA] { +@Since("2.0.0") +object LDA extends DefaultParamsReadable[LDA] { /** Get dataset for spark.mllib LDA */ - def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, OldVector)] = { + private[clustering] def getOldDataset( + dataset: Dataset[_], + featuresCol: String): RDD[(Long, OldVector)] = { dataset .withColumn("docId", monotonicallyIncreasingId()) .select("docId", featuresCol) @@ -894,6 +896,6 @@ private[clustering] object LDA extends DefaultParamsReadable[LDA] { } } - @Since("1.6.0") + @Since("2.0.0") override def load(path: String): LDA = super.load(path) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost
Repository: spark Updated Branches: refs/heads/branch-2.0 6e8fa86eb -> 521fc7186 [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost ## What changes were proposed in this pull request? Currently, if due to some failure, the outstream gets destroyed or closed and later `outstream.close()` leads to IOException in such case. Due to this, the `stderrBuffer` does not get logged and there is no way for users to see why the job failed. The change is to first display the stderr buffer and then try closing the outstream. ## How was this patch tested? The correct way to test this fix would be to grep the log to see if the `stderrBuffer` gets logged but I dont think having test cases which do that is a good idea. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) ⦠Author: Tejas PatilCloses #13834 from tejasapatil/script_transform. (cherry picked from commit 5f342049cce9102fb62b4de2d8d8fa691c2e8ac4) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/521fc718 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/521fc718 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/521fc718 Branch: refs/heads/branch-2.0 Commit: 521fc7186a2637321f7a7cfac713537de73ae66f Parents: 6e8fa86 Author: Tejas Patil Authored: Wed Jul 6 09:18:04 2016 +0100 Committer: Sean Owen Committed: Wed Jul 6 09:18:13 2016 +0100 -- .../spark/sql/hive/execution/ScriptTransformation.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/521fc718/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 9e25e1d..dfb1251 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -312,15 +312,15 @@ private class ScriptTransformationWriterThread( } threwException = false } catch { - case NonFatal(e) => + case t: Throwable => // An error occurred while writing input, so kill the child process. According to the // Javadoc this call will not throw an exception: -_exception = e +_exception = t proc.destroy() -throw e +throw t } finally { try { -outputStream.close() +Utils.tryLogNonFatalError(outputStream.close()) if (proc.waitFor() != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost
Repository: spark Updated Branches: refs/heads/master ec79183ac -> 5f342049c [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost ## What changes were proposed in this pull request? Currently, if due to some failure, the outstream gets destroyed or closed and later `outstream.close()` leads to IOException in such case. Due to this, the `stderrBuffer` does not get logged and there is no way for users to see why the job failed. The change is to first display the stderr buffer and then try closing the outstream. ## How was this patch tested? The correct way to test this fix would be to grep the log to see if the `stderrBuffer` gets logged but I dont think having test cases which do that is a good idea. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) ⦠Author: Tejas PatilCloses #13834 from tejasapatil/script_transform. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f342049 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f342049 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f342049 Branch: refs/heads/master Commit: 5f342049cce9102fb62b4de2d8d8fa691c2e8ac4 Parents: ec79183 Author: Tejas Patil Authored: Wed Jul 6 09:18:04 2016 +0100 Committer: Sean Owen Committed: Wed Jul 6 09:18:04 2016 +0100 -- .../spark/sql/hive/execution/ScriptTransformation.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5f342049/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 84990d3..d063dd6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -314,15 +314,15 @@ private class ScriptTransformationWriterThread( } threwException = false } catch { - case NonFatal(e) => + case t: Throwable => // An error occurred while writing input, so kill the child process. According to the // Javadoc this call will not throw an exception: -_exception = e +_exception = t proc.destroy() -throw e +throw t } finally { try { -outputStream.close() +Utils.tryLogNonFatalError(outputStream.close()) if (proc.waitFor() != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org