spark git commit: [SPARK-16021][TEST-MAVEN] Fix the maven build

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 69f539140 -> 4b5a72c7d


[SPARK-16021][TEST-MAVEN] Fix the maven build

## What changes were proposed in this pull request?

Fixed the maven build for #13983

## How was this patch tested?

The existing tests.

Author: Shixiong Zhu 

Closes #14084 from zsxwing/fix-maven.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b5a72c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b5a72c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b5a72c7

Branch: refs/heads/master
Commit: 4b5a72c7dc364ca8d57d9f4bb47f4cd31c5b3082
Parents: 69f5391
Author: Shixiong Zhu 
Authored: Wed Jul 6 22:48:05 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 22:48:05 2016 -0700

--
 pom.xml | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4b5a72c7/pom.xml
--
diff --git a/pom.xml b/pom.xml
index c99d786..4aaf616 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1995,6 +1995,7 @@
   false
   
false
   
true
+  true
   
   src
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16398][CORE] Make cancelJob and cancelStage APIs public

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 42279bff6 -> 69f539140


[SPARK-16398][CORE] Make cancelJob and cancelStage APIs public

## What changes were proposed in this pull request?

Make SparkContext `cancelJob` and `cancelStage` APIs public. This allows 
applications to use `SparkListener` to do their own management of jobs via 
events, but without using the REST API.

## How was this patch tested?

Existing tests (dev/run-tests)

Author: MasterDDT 

Closes #14072 from MasterDDT/SPARK-16398.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69f53914
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69f53914
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69f53914

Branch: refs/heads/master
Commit: 69f5391408b779a400b553344fd61051004685fc
Parents: 42279bf
Author: MasterDDT 
Authored: Wed Jul 6 22:47:40 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 22:47:40 2016 -0700

--
 .../scala/org/apache/spark/SparkContext.scala | 18 ++
 1 file changed, 14 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/69f53914/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe15052..57d1f09 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2011,13 +2011,23 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 dagScheduler.cancelAllJobs()
   }
 
-  /** Cancel a given job if it's scheduled or running */
-  private[spark] def cancelJob(jobId: Int) {
+  /**
+   * Cancel a given job if it's scheduled or running.
+   *
+   * @param jobId the job ID to cancel
+   * @throws InterruptedException if the cancel message cannot be sent
+   */
+  def cancelJob(jobId: Int) {
 dagScheduler.cancelJob(jobId)
   }
 
-  /** Cancel a given stage and all jobs associated with it */
-  private[spark] def cancelStage(stageId: Int) {
+  /**
+   * Cancel a given stage and all jobs associated with it.
+   *
+   * @param stageId the stage ID to cancel
+   * @throws InterruptedException if the cancel message cannot be sent
+   */
+  def cancelStage(stageId: Int) {
 dagScheduler.cancelStage(stageId)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16374][SQL] Remove Alias from MetastoreRelation and SimpleCatalogRelation

2016-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 34283de16 -> 42279bff6


[SPARK-16374][SQL] Remove Alias from MetastoreRelation and SimpleCatalogRelation

 What changes were proposed in this pull request?
Different from the other leaf nodes, `MetastoreRelation` and 
`SimpleCatalogRelation` have a pre-defined `alias`, which is used to change the 
qualifier of the node. However, based on the existing alias handling, alias 
should be put in `SubqueryAlias`.

This PR is to separate alias handling from `MetastoreRelation` and 
`SimpleCatalogRelation` to make it consistent with the other nodes. It 
simplifies the signature and conversion to a `BaseRelation`.

For example, below is an example query for `MetastoreRelation`,  which is 
converted to a `LogicalRelation`:
```SQL
SELECT tmp.a + 1 FROM test_parquet_ctas tmp WHERE tmp.a > 2
```

Before changes, the analyzed plan is
```
== Analyzed Logical Plan ==
(a + 1): int
Project [(a#951 + 1) AS (a + 1)#952]
+- Filter (a#951 > 2)
   +- SubqueryAlias tmp
  +- Relation[a#951] parquet
```
After changes, the analyzed plan becomes
```
== Analyzed Logical Plan ==
(a + 1): int
Project [(a#951 + 1) AS (a + 1)#952]
+- Filter (a#951 > 2)
   +- SubqueryAlias tmp
  +- SubqueryAlias test_parquet_ctas
 +- Relation[a#951] parquet
```

**Note: the optimized plans are the same.**

For `SimpleCatalogRelation`, the existing code always generates two Subqueries. 
Thus, no change is needed.

 How was this patch tested?
Added test cases.

Author: gatorsmile 

Closes #14053 from gatorsmile/removeAliasFromMetastoreRelation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42279bff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42279bff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42279bff

Branch: refs/heads/master
Commit: 42279bff686f9808ec7a9e8f4da95c717edc6026
Parents: 34283de
Author: gatorsmile 
Authored: Thu Jul 7 12:07:19 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 7 12:07:19 2016 +0800

--
 .../spark/sql/catalyst/catalog/SessionCatalog.scala   |  2 +-
 .../org/apache/spark/sql/catalyst/catalog/interface.scala |  5 ++---
 .../spark/sql/catalyst/catalog/SessionCatalogSuite.scala  |  2 +-
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala  | 10 ++
 .../org/apache/spark/sql/hive/MetastoreRelation.scala | 10 --
 5 files changed, 14 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42279bff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e1d4991..ffaefeb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -403,7 +403,7 @@ class SessionCatalog(
   val relation =
 if (name.database.isDefined || !tempTables.contains(table)) {
   val metadata = externalCatalog.getTable(db, table)
-  SimpleCatalogRelation(db, metadata, alias)
+  SimpleCatalogRelation(db, metadata)
 } else {
   tempTables(table)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/42279bff/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 6197aca..b12606e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -244,8 +244,7 @@ trait CatalogRelation {
  */
 case class SimpleCatalogRelation(
 databaseName: String,
-metadata: CatalogTable,
-alias: Option[String] = None)
+metadata: CatalogTable)
   extends LeafNode with CatalogRelation {
 
   override def catalogTable: CatalogTable = metadata
@@ -261,7 +260,7 @@ case class SimpleCatalogRelation(
 CatalystSqlParser.parseDataType(f.dataType),
 // Since data can be dumped in randomly with no validation, everything 
is nullable.
 nullable = true
-  )(qualifier = Some(alias.getOrElse(metadata.identifier.table)))
+  )(qualifier = Some(metadata.identifier.table))
 }
   }
 


spark git commit: [SPARK-14839][SQL] Support for other types for `tableProperty` rule in SQL syntax

2016-07-06 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 44c7c62bc -> 34283de16


[SPARK-14839][SQL] Support for other types for `tableProperty` rule in SQL 
syntax

## What changes were proposed in this pull request?

Currently, Scala API supports to take options with the types, `String`, `Long`, 
`Double` and `Boolean` and Python API also supports other types.

This PR corrects `tableProperty` rule to support other types (string, boolean, 
double and integer) so that support the options for data sources in a 
consistent way. This will affect other rules such as DBPROPERTIES and 
TBLPROPERTIES (allowing other types as values).

Also, `TODO add bucketing and partitioning.` was removed because it was 
resolved in 
https://github.com/apache/spark/commit/24bea000476cdd0b43be5160a76bc5b170ef0b42

## How was this patch tested?

Unit test in `MetastoreDataSourcesSuite.scala`.

Author: hyukjinkwon 

Closes #13517 from HyukjinKwon/SPARK-14839.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34283de1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34283de1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34283de1

Branch: refs/heads/master
Commit: 34283de160808324da02964cd5dc5df80e59ae71
Parents: 44c7c62
Author: hyukjinkwon 
Authored: Wed Jul 6 23:57:18 2016 -0400
Committer: Herman van Hovell 
Committed: Wed Jul 6 23:57:18 2016 -0400

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  9 ++-
 .../spark/sql/execution/SparkSqlParser.scala| 22 +--
 .../sql/execution/command/DDLCommandSuite.scala | 61 
 3 files changed, 87 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34283de1/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 4c15f9c..7ccbb2d 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -249,7 +249,7 @@ tablePropertyList
 ;
 
 tableProperty
-: key=tablePropertyKey (EQ? value=STRING)?
+: key=tablePropertyKey (EQ? value=tablePropertyValue)?
 ;
 
 tablePropertyKey
@@ -257,6 +257,13 @@ tablePropertyKey
 | STRING
 ;
 
+tablePropertyValue
+: INTEGER_VALUE
+| DECIMAL_VALUE
+| booleanValue
+| STRING
+;
+
 constantList
 : '(' constant (',' constant)* ')'
 ;

http://git-wip-us.apache.org/repos/asf/spark/blob/34283de1/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 42ec210..f77801f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -311,8 +311,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
   /**
* Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical 
plan.
-   *
-   * TODO add bucketing and partitioning.
*/
   override def visitCreateTableUsing(ctx: CreateTableUsingContext): 
LogicalPlan = withOrigin(ctx) {
 val (table, temp, ifNotExists, external) = 
visitCreateTableHeader(ctx.createTableHeader)
@@ -413,7 +411,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
 val properties = ctx.tableProperty.asScala.map { property =>
   val key = visitTablePropertyKey(property.key)
-  val value = Option(property.value).map(string).orNull
+  val value = visitTablePropertyValue(property.value)
   key -> value
 }
 // Check for duplicate property names.
@@ -426,7 +424,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
*/
   private def visitPropertyKeyValues(ctx: TablePropertyListContext): 
Map[String, String] = {
 val props = visitTablePropertyList(ctx)
-val badKeys = props.filter { case (_, v) => v == null }.keys
+val badKeys = props.collect { case (key, null) => key }
 if (badKeys.nonEmpty) {
   operationNotAllowed(
 s"Values must be specified for key(s): ${badKeys.mkString("[", ",", 
"]")}", ctx)
@@ -461,6 +459,22 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
   }
 
   /**
+   * A table property value can 

spark git commit: [SPARK-16021] Fill freed memory in test to help catch correctness bugs

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master b8ebf63c1 -> 44c7c62bc


[SPARK-16021] Fill freed memory in test to help catch correctness bugs

## What changes were proposed in this pull request?

This patches `MemoryAllocator` to fill clean and freed memory with known byte 
values, similar to 
https://github.com/jemalloc/jemalloc/wiki/Use-Case:-Find-a-memory-corruption-bug
 . Memory filling is flag-enabled in test only by default.

## How was this patch tested?

Unit test that it's on in test.

cc sameeragarwal

Author: Eric Liang 

Closes #13983 from ericl/spark-16021.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44c7c62b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44c7c62b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44c7c62b

Branch: refs/heads/master
Commit: 44c7c62bcfca74c82ffc4e3c53997fff47bfacac
Parents: b8ebf63
Author: Eric Liang 
Authored: Wed Jul 6 16:30:25 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 16:30:25 2016 -0700

--
 .../main/java/org/apache/spark/unsafe/Platform.java |  4 
 .../spark/unsafe/memory/HeapMemoryAllocator.java| 10 +-
 .../apache/spark/unsafe/memory/MemoryAllocator.java | 13 -
 .../org/apache/spark/unsafe/memory/MemoryBlock.java |  7 +++
 .../spark/unsafe/memory/UnsafeMemoryAllocator.java  |  9 -
 .../org/apache/spark/unsafe/PlatformUtilSuite.java  | 16 
 project/SparkBuild.scala|  1 +
 7 files changed, 57 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
--
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 77c8c39..a2ee45c 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -176,6 +176,10 @@ public final class Platform {
 throw new IllegalStateException("unreachable");
   }
 
+  public static void setMemory(Object object, long offset, long size, byte 
value) {
+_UNSAFE.setMemory(object, offset, size, value);
+  }
+
   public static void setMemory(long address, byte value, long size) {
 _UNSAFE.setMemory(address, size, value);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index 09847ce..3cd4264 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.MemoryAllocator;
 
 /**
  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM 
long primitive array.
@@ -64,12 +65,19 @@ public class HeapMemoryAllocator implements MemoryAllocator 
{
   }
 }
 long[] array = new long[(int) ((size + 7) / 8)];
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
+MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, 
size);
+if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
+  memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
+}
+return memory;
   }
 
   @Override
   public void free(MemoryBlock memory) {
 final long size = memory.size();
+if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
+  memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
+}
 if (shouldPool(size)) {
   synchronized (this) {
 LinkedList pool = 
bufferPoolsBySize.get(size);

http://git-wip-us.apache.org/repos/asf/spark/blob/44c7c62b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
index 5192f68..8bd2b06 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryAllocator.java
@@ 

spark git commit: [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

2016-07-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 8e3e4ed6c -> b8ebf63c1


[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

## What changes were proposed in this pull request?

Bring the kafka-0-8 subproject up to date with some test modifications from 
development on 0-10.

Main changes are
- eliminating waits on concurrent queue in favor of an assert on received 
results,
- atomics instead of volatile (although this probably doesn't matter)
- increasing uniqueness of topic names

## How was this patch tested?

Unit tests

Author: cody koeninger 

Closes #14073 from koeninger/kafka-0-8-test-direct-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8ebf63c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8ebf63c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8ebf63c

Branch: refs/heads/master
Commit: b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59
Parents: 8e3e4ed
Author: cody koeninger 
Authored: Wed Jul 6 16:21:41 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jul 6 16:21:41 2016 -0700

--
 .../kafka/DirectKafkaStreamSuite.scala  | 41 ++--
 .../spark/streaming/kafka/KafkaRDDSuite.scala   |  8 ++--
 2 files changed, 24 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b8ebf63c/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
--
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index cb782d2..ab1c505 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -244,12 +244,9 @@ class DirectKafkaStreamSuite
 )
 
 // Send data to Kafka and wait for it to be received
-def sendDataAndWaitForReceive(data: Seq[Int]) {
+def sendData(data: Seq[Int]) {
   val strings = data.map { _.toString}
   kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-  eventually(timeout(10 seconds), interval(50 milliseconds)) {
-assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains 
})
-  }
 }
 
 // Setup the streaming context
@@ -264,21 +261,21 @@ class DirectKafkaStreamSuite
 }
 ssc.checkpoint(testDir.getAbsolutePath)
 
-// This is to collect the raw data received from Kafka
-kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-  val data = rdd.map { _._2 }.collect()
-  DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
-}
-
 // This is ensure all the data is eventually receiving only once
 stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
-  rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = 
x._2 }
+  rdd.collect().headOption.foreach { x =>
+DirectKafkaStreamSuite.total.set(x._2)
+  }
 }
 ssc.start()
 
-// Send some data and wait for them to be received
+// Send some data
 for (i <- (1 to 10).grouped(4)) {
-  sendDataAndWaitForReceive(i)
+  sendData(i)
+}
+
+eventually(timeout(10 seconds), interval(50 milliseconds)) {
+  assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
 }
 
 ssc.stop()
@@ -302,23 +299,26 @@ class DirectKafkaStreamSuite
 val recoveredStream = 
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
 
 // Verify offset ranges have been recovered
-val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => 
(x._1, x._2.toSet) }
 assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
+val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
 assert(
   recoveredOffsetRanges.forall { or =>
-earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+earlierOffsetRanges.contains((or._1, or._2))
   },
   "Recovered ranges are not the same as the ones generated\n" +
 s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
-s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+s"earlierOffsetRanges: $earlierOffsetRanges"
 )
 // Restart context, give more data and verify the total at the end
 // If the total is write that means each records has been received only 

spark git commit: [SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

2016-07-06 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 05ddc7517 -> 920162a1e


[SPARK-16212][STREAMING][KAFKA] apply test tweaks from 0-10 to 0-8 as well

## What changes were proposed in this pull request?

Bring the kafka-0-8 subproject up to date with some test modifications from 
development on 0-10.

Main changes are
- eliminating waits on concurrent queue in favor of an assert on received 
results,
- atomics instead of volatile (although this probably doesn't matter)
- increasing uniqueness of topic names

## How was this patch tested?

Unit tests

Author: cody koeninger 

Closes #14073 from koeninger/kafka-0-8-test-direct-cleanup.

(cherry picked from commit b8ebf63c1e1fa1ab53ea760fa293051c08ce5f59)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/920162a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/920162a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/920162a1

Branch: refs/heads/branch-2.0
Commit: 920162a1e0b43b558ba2242868a44cad06bef946
Parents: 05ddc75
Author: cody koeninger 
Authored: Wed Jul 6 16:21:41 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jul 6 16:21:49 2016 -0700

--
 .../kafka/DirectKafkaStreamSuite.scala  | 41 ++--
 .../spark/streaming/kafka/KafkaRDDSuite.scala   |  8 ++--
 2 files changed, 24 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/920162a1/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
--
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index cb782d2..ab1c505 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -244,12 +244,9 @@ class DirectKafkaStreamSuite
 )
 
 // Send data to Kafka and wait for it to be received
-def sendDataAndWaitForReceive(data: Seq[Int]) {
+def sendData(data: Seq[Int]) {
   val strings = data.map { _.toString}
   kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-  eventually(timeout(10 seconds), interval(50 milliseconds)) {
-assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains 
})
-  }
 }
 
 // Setup the streaming context
@@ -264,21 +261,21 @@ class DirectKafkaStreamSuite
 }
 ssc.checkpoint(testDir.getAbsolutePath)
 
-// This is to collect the raw data received from Kafka
-kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
-  val data = rdd.map { _._2 }.collect()
-  DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
-}
-
 // This is ensure all the data is eventually receiving only once
 stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
-  rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = 
x._2 }
+  rdd.collect().headOption.foreach { x =>
+DirectKafkaStreamSuite.total.set(x._2)
+  }
 }
 ssc.start()
 
-// Send some data and wait for them to be received
+// Send some data
 for (i <- (1 to 10).grouped(4)) {
-  sendDataAndWaitForReceive(i)
+  sendData(i)
+}
+
+eventually(timeout(10 seconds), interval(50 milliseconds)) {
+  assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
 }
 
 ssc.stop()
@@ -302,23 +299,26 @@ class DirectKafkaStreamSuite
 val recoveredStream = 
ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
 
 // Verify offset ranges have been recovered
-val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
+val recoveredOffsetRanges = getOffsetRanges(recoveredStream).map { x => 
(x._1, x._2.toSet) }
 assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
+val earlierOffsetRanges = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
 assert(
   recoveredOffsetRanges.forall { or =>
-earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
+earlierOffsetRanges.contains((or._1, or._2))
   },
   "Recovered ranges are not the same as the ones generated\n" +
 s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
-s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
+s"earlierOffsetRanges: $earlierOffsetRanges"
 )
 // 

spark git commit: [SPARK-16371][SQL] Two follow-up tasks

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2c2b8f121 -> 05ddc7517


[SPARK-16371][SQL] Two follow-up tasks

## What changes were proposed in this pull request?
This is a small follow-up for SPARK-16371:

1. Hide removeMetadata from public API.
2. Add JIRA ticket number to test case name.

## How was this patch tested?
Updated a test comment.

Author: Reynold Xin 

Closes #14074 from rxin/parquet-filter.

(cherry picked from commit 8e3e4ed6c090d18675d49eec46b3ee572457db95)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05ddc751
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05ddc751
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05ddc751

Branch: refs/heads/branch-2.0
Commit: 05ddc75179acc582c615da01b9c0e7e049a5ecf0
Parents: 2c2b8f1
Author: Reynold Xin 
Authored: Wed Jul 6 15:04:37 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 15:04:44 2016 -0700

--
 .../src/main/scala/org/apache/spark/sql/types/StructType.scala   | 4 ++--
 .../sql/execution/datasources/parquet/ParquetFilterSuite.scala   | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/05ddc751/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 436512f..effef54 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -378,10 +378,10 @@ object StructType extends AbstractDataType {
 StructType(fields.asScala)
   }
 
-  protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
 StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata)))
 
-  def removeMetadata(key: String, dt: DataType): DataType =
+  private[sql] def removeMetadata(key: String, dt: DataType): DataType =
 dt match {
   case StructType(fields) =>
 val newFields = fields.map { f =>

http://git-wip-us.apache.org/repos/asf/spark/blob/05ddc751/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 35d6915..2a89773 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -546,7 +546,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
 }
   }
 
-  test("Do not push down filters incorrectly when inner name and outer name 
are the same") {
+  test("SPARK-16371 Do not push down filters when inner name and outer name 
are the same") {
 withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i { implicit df =>
   // Here the schema becomes as below:
   //


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16371][SQL] Two follow-up tasks

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 9c041990c -> 8e3e4ed6c


[SPARK-16371][SQL] Two follow-up tasks

## What changes were proposed in this pull request?
This is a small follow-up for SPARK-16371:

1. Hide removeMetadata from public API.
2. Add JIRA ticket number to test case name.

## How was this patch tested?
Updated a test comment.

Author: Reynold Xin 

Closes #14074 from rxin/parquet-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e3e4ed6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e3e4ed6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e3e4ed6

Branch: refs/heads/master
Commit: 8e3e4ed6c090d18675d49eec46b3ee572457db95
Parents: 9c041990
Author: Reynold Xin 
Authored: Wed Jul 6 15:04:37 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 15:04:37 2016 -0700

--
 .../src/main/scala/org/apache/spark/sql/types/StructType.scala   | 4 ++--
 .../sql/execution/datasources/parquet/ParquetFilterSuite.scala   | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8e3e4ed6/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 0e89f71..0284ecc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -413,10 +413,10 @@ object StructType extends AbstractDataType {
 StructType(fields.asScala)
   }
 
-  protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
 StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata)))
 
-  def removeMetadata(key: String, dt: DataType): DataType =
+  private[sql] def removeMetadata(key: String, dt: DataType): DataType =
 dt match {
   case StructType(fields) =>
 val newFields = fields.map { f =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8e3e4ed6/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 18a3128..84fdcfe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -544,7 +544,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
 }
   }
 
-  test("Do not push down filters incorrectly when inner name and outer name 
are the same") {
+  test("SPARK-16371 Do not push down filters when inner name and outer name 
are the same") {
 withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i { implicit df =>
   // Here the schema becomes as below:
   //


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MESOS] expand coarse-grained mode docs

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 88be66b93 -> 2c2b8f121


[MESOS] expand coarse-grained mode docs

## What changes were proposed in this pull request?

docs

## How was this patch tested?

viewed the docs in github

Author: Michael Gummelt 

Closes #14059 from mgummelt/coarse-grained.

(cherry picked from commit 9c041990cf4d0138d9104207b5c2e7a319b42615)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c2b8f12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c2b8f12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c2b8f12

Branch: refs/heads/branch-2.0
Commit: 2c2b8f121a213618ef47cb030d17b9bd323f0d9e
Parents: 88be66b
Author: Michael Gummelt 
Authored: Wed Jul 6 15:02:45 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 15:02:50 2016 -0700

--
 docs/running-on-mesos.md | 77 ---
 1 file changed, 51 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c2b8f12/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 4a0ab62..8ab5f30 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -180,30 +180,53 @@ Note that jars or python files that are passed to 
spark-submit should be URIs re
 
 # Mesos Run Modes
 
-Spark can run over Mesos in two modes: "coarse-grained" (default) and 
"fine-grained".
-
-The "coarse-grained" mode will launch only *one* long-running Spark task on 
each Mesos
-machine, and dynamically schedule its own "mini-tasks" within it. The benefit 
is much lower startup
-overhead, but at the cost of reserving the Mesos resources for the complete 
duration of the
-application.
-
-Coarse-grained is the default mode. You can also set `spark.mesos.coarse` 
property to true
-to turn it on explicitly in [SparkConf](configuration.html#spark-properties):
-
-{% highlight scala %}
-conf.set("spark.mesos.coarse", "true")
-{% endhighlight %}
-
-In addition, for coarse-grained mode, you can control the maximum number of 
resources Spark will
-acquire. By default, it will acquire *all* cores in the cluster (that get 
offered by Mesos), which
-only makes sense if you run just one application at a time. You can cap the 
maximum number of cores
-using `conf.set("spark.cores.max", "10")` (for example).
-
-In "fine-grained" mode, each Spark task runs as a separate Mesos task. This 
allows
-multiple instances of Spark (and other frameworks) to share machines at a very 
fine granularity,
-where each application gets more or fewer machines as it ramps up and down, 
but it comes with an
-additional overhead in launching each task. This mode may be inappropriate for 
low-latency
-requirements like interactive queries or serving web requests.
+Spark can run over Mesos in two modes: "coarse-grained" (default) and
+"fine-grained".
+
+## Coarse-Grained
+
+In "coarse-grained" mode, each Spark executor runs as a single Mesos
+task.  Spark executors are sized according to the following
+configuration variables:
+
+* Executor memory: `spark.executor.memory`
+* Executor cores: `spark.executor.cores`
+* Number of executors: `spark.cores.max`/`spark.executor.cores`
+
+Please see the [Spark Configuration](configuration.html) page for
+details and default values.
+
+Executors are brought up eagerly when the application starts, until
+`spark.cores.max` is reached.  If you don't set `spark.cores.max`, the
+Spark application will reserve all resources offered to it by Mesos,
+so we of course urge you to set this variable in any sort of
+multi-tenant cluster, including one which runs multiple concurrent
+Spark applications.
+
+The scheduler will start executors round-robin on the offers Mesos
+gives it, but there are no spread guarantees, as Mesos does not
+provide such guarantees on the offer stream.
+
+The benefit of coarse-grained mode is much lower startup overhead, but
+at the cost of reserving Mesos resources for the complete duration of
+the application.  To configure your job to dynamically adjust to its
+resource requirements, look into
+[Dynamic Allocation](#dynamic-resource-allocation-with-mesos).
+
+## Fine-Grained
+
+In "fine-grained" mode, each Spark task inside the Spark executor runs
+as a separate Mesos task. This allows multiple instances of Spark (and
+other frameworks) to share cores at a very fine granularity, where
+each application gets more or fewer cores as it ramps up and down, but
+it comes with an additional overhead in launching each task. This mode
+may be inappropriate for low-latency requirements like interactive
+queries or serving web 

spark git commit: [MESOS] expand coarse-grained mode docs

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a8f89df3b -> 9c041990c


[MESOS] expand coarse-grained mode docs

## What changes were proposed in this pull request?

docs

## How was this patch tested?

viewed the docs in github

Author: Michael Gummelt 

Closes #14059 from mgummelt/coarse-grained.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c041990
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c041990
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c041990

Branch: refs/heads/master
Commit: 9c041990cf4d0138d9104207b5c2e7a319b42615
Parents: a8f89df
Author: Michael Gummelt 
Authored: Wed Jul 6 15:02:45 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 15:02:45 2016 -0700

--
 docs/running-on-mesos.md | 77 ---
 1 file changed, 51 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9c041990/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 4a0ab62..8ab5f30 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -180,30 +180,53 @@ Note that jars or python files that are passed to 
spark-submit should be URIs re
 
 # Mesos Run Modes
 
-Spark can run over Mesos in two modes: "coarse-grained" (default) and 
"fine-grained".
-
-The "coarse-grained" mode will launch only *one* long-running Spark task on 
each Mesos
-machine, and dynamically schedule its own "mini-tasks" within it. The benefit 
is much lower startup
-overhead, but at the cost of reserving the Mesos resources for the complete 
duration of the
-application.
-
-Coarse-grained is the default mode. You can also set `spark.mesos.coarse` 
property to true
-to turn it on explicitly in [SparkConf](configuration.html#spark-properties):
-
-{% highlight scala %}
-conf.set("spark.mesos.coarse", "true")
-{% endhighlight %}
-
-In addition, for coarse-grained mode, you can control the maximum number of 
resources Spark will
-acquire. By default, it will acquire *all* cores in the cluster (that get 
offered by Mesos), which
-only makes sense if you run just one application at a time. You can cap the 
maximum number of cores
-using `conf.set("spark.cores.max", "10")` (for example).
-
-In "fine-grained" mode, each Spark task runs as a separate Mesos task. This 
allows
-multiple instances of Spark (and other frameworks) to share machines at a very 
fine granularity,
-where each application gets more or fewer machines as it ramps up and down, 
but it comes with an
-additional overhead in launching each task. This mode may be inappropriate for 
low-latency
-requirements like interactive queries or serving web requests.
+Spark can run over Mesos in two modes: "coarse-grained" (default) and
+"fine-grained".
+
+## Coarse-Grained
+
+In "coarse-grained" mode, each Spark executor runs as a single Mesos
+task.  Spark executors are sized according to the following
+configuration variables:
+
+* Executor memory: `spark.executor.memory`
+* Executor cores: `spark.executor.cores`
+* Number of executors: `spark.cores.max`/`spark.executor.cores`
+
+Please see the [Spark Configuration](configuration.html) page for
+details and default values.
+
+Executors are brought up eagerly when the application starts, until
+`spark.cores.max` is reached.  If you don't set `spark.cores.max`, the
+Spark application will reserve all resources offered to it by Mesos,
+so we of course urge you to set this variable in any sort of
+multi-tenant cluster, including one which runs multiple concurrent
+Spark applications.
+
+The scheduler will start executors round-robin on the offers Mesos
+gives it, but there are no spread guarantees, as Mesos does not
+provide such guarantees on the offer stream.
+
+The benefit of coarse-grained mode is much lower startup overhead, but
+at the cost of reserving Mesos resources for the complete duration of
+the application.  To configure your job to dynamically adjust to its
+resource requirements, look into
+[Dynamic Allocation](#dynamic-resource-allocation-with-mesos).
+
+## Fine-Grained
+
+In "fine-grained" mode, each Spark task inside the Spark executor runs
+as a separate Mesos task. This allows multiple instances of Spark (and
+other frameworks) to share cores at a very fine granularity, where
+each application gets more or fewer cores as it ramps up and down, but
+it comes with an additional overhead in launching each task. This mode
+may be inappropriate for low-latency requirements like interactive
+queries or serving web requests.
+
+Note that while Spark tasks in fine-grained will relinquish cores as
+they terminate, they will not relinquish 

spark git commit: [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 040f6f9f4 -> a8f89df3b


[SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in 
Logging

## What changes were proposed in this pull request?

The commit 
https://github.com/apache/spark/commit/044971eca0ff3c2ce62afa665dbd3072d52cbbec 
introduced a lazy val to simplify code in Logging. Simple enough, though one 
side effect is that accessing log now means grabbing the instance's lock. This 
in turn turned up a form of deadlock in the Mesos code. It was arguably a bit 
of a problem in how this code is structured, but, in any event the safest thing 
to do seems to be to revert the commit, and that's 90% of the change here; it's 
just not worth the risk of similar more subtle issues.

What I didn't revert here was the removal of this odd override of log in the 
Mesos code. In retrospect it might have been put in place at some stage as a 
defense against this type of problem. After all the Logging code still involved 
a lock at initialization before the change in question.

Even after the revert, it doesn't seem like it does anything, given how Logging 
works now, so I left it removed. However, I also removed the particular log 
message that ended up playing a part in this problem anyway, maybe being 
paranoid, to make sure this type of problem can't happen even with how the 
current locking works in logging initialization.

## How was this patch tested?

Jenkins tests

Author: Sean Owen 

Closes #14069 from srowen/SPARK-16379.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8f89df3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8f89df3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8f89df3

Branch: refs/heads/master
Commit: a8f89df3b391e7a3fa9f73d9ec730d6eaa95bb09
Parents: 040f6f9
Author: Sean Owen 
Authored: Wed Jul 6 13:36:07 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 13:36:07 2016 -0700

--
 .../scala/org/apache/spark/internal/Logging.scala | 14 ++
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala|  1 -
 2 files changed, 10 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8f89df3/core/src/main/scala/org/apache/spark/internal/Logging.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala 
b/core/src/main/scala/org/apache/spark/internal/Logging.scala
index c51050c..66a0cfe 100644
--- a/core/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -32,10 +32,7 @@ private[spark] trait Logging {
 
   // Make the log field transient so that objects with Logging can
   // be serialized and used on another machine
-  @transient lazy val log: Logger = {
-initializeLogIfNecessary(false)
-LoggerFactory.getLogger(logName)
-  }
+  @transient private var log_ : Logger = null
 
   // Method to get the logger name for this object
   protected def logName = {
@@ -43,6 +40,15 @@ private[spark] trait Logging {
 this.getClass.getName.stripSuffix("$")
   }
 
+  // Method to get or create the logger for this object
+  protected def log: Logger = {
+if (log_ == null) {
+  initializeLogIfNecessary(false)
+  log_ = LoggerFactory.getLogger(logName)
+}
+log_
+  }
+
   // Log methods that take only a String
   protected def logInfo(msg: => String) {
 if (log.isInfoEnabled) log.info(msg)

http://git-wip-us.apache.org/repos/asf/spark/blob/a8f89df3/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e88e4ad..99e6d39 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -244,7 +244,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, 
masterInfo: MasterInfo) {
 appId = frameworkId.getValue
 mesosExternalShuffleClient.foreach(_.init(appId))
-logInfo("Registered as framework ID " + appId)
 markRegistered()
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


spark git commit: [SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in Logging

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d7926da5e -> 88be66b93


[SPARK-16379][CORE][MESOS] Spark on mesos is broken due to race condition in 
Logging

## What changes were proposed in this pull request?

The commit 
https://github.com/apache/spark/commit/044971eca0ff3c2ce62afa665dbd3072d52cbbec 
introduced a lazy val to simplify code in Logging. Simple enough, though one 
side effect is that accessing log now means grabbing the instance's lock. This 
in turn turned up a form of deadlock in the Mesos code. It was arguably a bit 
of a problem in how this code is structured, but, in any event the safest thing 
to do seems to be to revert the commit, and that's 90% of the change here; it's 
just not worth the risk of similar more subtle issues.

What I didn't revert here was the removal of this odd override of log in the 
Mesos code. In retrospect it might have been put in place at some stage as a 
defense against this type of problem. After all the Logging code still involved 
a lock at initialization before the change in question.

Even after the revert, it doesn't seem like it does anything, given how Logging 
works now, so I left it removed. However, I also removed the particular log 
message that ended up playing a part in this problem anyway, maybe being 
paranoid, to make sure this type of problem can't happen even with how the 
current locking works in logging initialization.

## How was this patch tested?

Jenkins tests

Author: Sean Owen 

Closes #14069 from srowen/SPARK-16379.

(cherry picked from commit a8f89df3b391e7a3fa9f73d9ec730d6eaa95bb09)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88be66b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88be66b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88be66b9

Branch: refs/heads/branch-2.0
Commit: 88be66b933a7b1f0f71b1eb6c88bf01ecbf1923c
Parents: d7926da
Author: Sean Owen 
Authored: Wed Jul 6 13:36:07 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 13:36:15 2016 -0700

--
 .../scala/org/apache/spark/internal/Logging.scala | 14 ++
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala|  1 -
 2 files changed, 10 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88be66b9/core/src/main/scala/org/apache/spark/internal/Logging.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala 
b/core/src/main/scala/org/apache/spark/internal/Logging.scala
index c51050c..66a0cfe 100644
--- a/core/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -32,10 +32,7 @@ private[spark] trait Logging {
 
   // Make the log field transient so that objects with Logging can
   // be serialized and used on another machine
-  @transient lazy val log: Logger = {
-initializeLogIfNecessary(false)
-LoggerFactory.getLogger(logName)
-  }
+  @transient private var log_ : Logger = null
 
   // Method to get the logger name for this object
   protected def logName = {
@@ -43,6 +40,15 @@ private[spark] trait Logging {
 this.getClass.getName.stripSuffix("$")
   }
 
+  // Method to get or create the logger for this object
+  protected def log: Logger = {
+if (log_ == null) {
+  initializeLogIfNecessary(false)
+  log_ = LoggerFactory.getLogger(logName)
+}
+log_
+  }
+
   // Log methods that take only a String
   protected def logInfo(msg: => String) {
 if (log.isInfoEnabled) log.info(msg)

http://git-wip-us.apache.org/repos/asf/spark/blob/88be66b9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e88e4ad..99e6d39 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -244,7 +244,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, 
masterInfo: MasterInfo) {
 appId = frameworkId.getValue
 mesosExternalShuffleClient.foreach(_.init(appId))
-logInfo("Registered as framework ID " + appId)
 markRegistered()
   }
 



spark git commit: [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds

2016-07-06 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2465f0728 -> d7926da5e


[SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven 
jenkins builds

## What changes were proposed in this pull request?
"test big model load / save" in Word2VecSuite, lately resulted into OOM.
Therefore we decided to make the partitioning adaptive (not based on spark 
default "spark.kryoserializer.buffer.max" conf) and then testing it using a 
small buffer size in order to trigger partitioning without allocating too much 
memory for the test.

## How was this patch tested?
It was tested running the following unit test:
org.apache.spark.mllib.feature.Word2VecSuite

Author: tmnd1991 

Closes #13509 from tmnd1991/SPARK-15740.

(cherry picked from commit 040f6f9f468f153e4c4db78c26ced0299245fb6f)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7926da5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7926da5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7926da5

Branch: refs/heads/branch-2.0
Commit: d7926da5e72ee2015e3ebe39a5fd0b322e9d1334
Parents: 2465f07
Author: tmnd1991 
Authored: Wed Jul 6 12:56:26 2016 -0700
Committer: Joseph K. Bradley 
Committed: Wed Jul 6 12:56:51 2016 -0700

--
 .../apache/spark/mllib/feature/Word2Vec.scala   | 16 +++--
 .../spark/mllib/feature/Word2VecSuite.scala | 25 +---
 2 files changed, 31 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 2f52825..f2211df 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] {
 ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
   sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
-  // We want to partition the model in partitions of size 32MB
-  val partitionSize = (1L << 25)
+  // We want to partition the model in partitions smaller than
+  // spark.kryoserializer.buffer.max
+  val bufferSize = Utils.byteStringAsBytes(
+spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
   // We calculate the approximate size of the model
-  // We only calculate the array size, not considering
-  // the string size, the formula is:
-  // floatSize * numWords * vectorSize
-  val approxSize = 4L * numWords * vectorSize
-  val nPartitions = ((approxSize / partitionSize) + 1).toInt
+  // We only calculate the array size, considering an
+  // average string size of 15 bytes, the formula is:
+  // (floatSize * vectorSize + 15) * numWords
+  val approxSize = (4L * vectorSize + 15) * numWords
+  val nPartitions = ((approxSize / bufferSize) + 1).toInt
   val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
   
spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7926da5/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index c9fb976..22de4c4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   }
 
-  ignore("big model load / save") {
-// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25
-val word2VecMap = Map((0 to 9000).map(i => s"$i" -> 
Array.fill(1000)(0.1f)): _*)
+  test("big model load / save") {
+// backupping old values
+val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", 
"64m")
+val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", 
"64k")
+
+// setting test values to trigger partitioning
+spark.conf.set("spark.kryoserializer.buffer", "50b")
+spark.conf.set("spark.kryoserializer.buffer.max", "50b")
+
+// create a model bigger than 50 Bytes
+val word2VecMap = Map((0 to 

spark git commit: [SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven jenkins builds

2016-07-06 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4f8ceed59 -> 040f6f9f4


[SPARK-15740][MLLIB] Word2VecSuite "big model load / save" caused OOM in maven 
jenkins builds

## What changes were proposed in this pull request?
"test big model load / save" in Word2VecSuite, lately resulted into OOM.
Therefore we decided to make the partitioning adaptive (not based on spark 
default "spark.kryoserializer.buffer.max" conf) and then testing it using a 
small buffer size in order to trigger partitioning without allocating too much 
memory for the test.

## How was this patch tested?
It was tested running the following unit test:
org.apache.spark.mllib.feature.Word2VecSuite

Author: tmnd1991 

Closes #13509 from tmnd1991/SPARK-15740.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/040f6f9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/040f6f9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/040f6f9f

Branch: refs/heads/master
Commit: 040f6f9f468f153e4c4db78c26ced0299245fb6f
Parents: 4f8ceed
Author: tmnd1991 
Authored: Wed Jul 6 12:56:26 2016 -0700
Committer: Joseph K. Bradley 
Committed: Wed Jul 6 12:56:26 2016 -0700

--
 .../apache/spark/mllib/feature/Word2Vec.scala   | 16 +++--
 .../spark/mllib/feature/Word2VecSuite.scala | 25 +---
 2 files changed, 31 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/040f6f9f/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 2f52825..f2211df 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -629,14 +629,16 @@ object Word2VecModel extends Loader[Word2VecModel] {
 ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
   sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
-  // We want to partition the model in partitions of size 32MB
-  val partitionSize = (1L << 25)
+  // We want to partition the model in partitions smaller than
+  // spark.kryoserializer.buffer.max
+  val bufferSize = Utils.byteStringAsBytes(
+spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
   // We calculate the approximate size of the model
-  // We only calculate the array size, not considering
-  // the string size, the formula is:
-  // floatSize * numWords * vectorSize
-  val approxSize = 4L * numWords * vectorSize
-  val nPartitions = ((approxSize / partitionSize) + 1).toInt
+  // We only calculate the array size, considering an
+  // average string size of 15 bytes, the formula is:
+  // (floatSize * vectorSize + 15) * numWords
+  val approxSize = (4L * vectorSize + 15) * numWords
+  val nPartitions = ((approxSize / bufferSize) + 1).toInt
   val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
   
spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/040f6f9f/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index c9fb976..22de4c4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -91,11 +91,23 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   }
 
-  ignore("big model load / save") {
-// create a model bigger than 32MB since 9000 * 1000 * 4 > 2^25
-val word2VecMap = Map((0 to 9000).map(i => s"$i" -> 
Array.fill(1000)(0.1f)): _*)
+  test("big model load / save") {
+// backupping old values
+val oldBufferConfValue = spark.conf.get("spark.kryoserializer.buffer.max", 
"64m")
+val oldBufferMaxConfValue = spark.conf.get("spark.kryoserializer.buffer", 
"64k")
+
+// setting test values to trigger partitioning
+spark.conf.set("spark.kryoserializer.buffer", "50b")
+spark.conf.set("spark.kryoserializer.buffer.max", "50b")
+
+// create a model bigger than 50 Bytes
+val word2VecMap = Map((0 to 10).map(i => s"$i" -> Array.fill(10)(0.1f)): 
_*)
 val model = new Word2VecModel(word2VecMap)
 
+// est. size of this model, given 

spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 03f336d89 -> 2465f0728


[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and 
outer name are the same in Parquet

## What changes were proposed in this pull request?

Currently, if there is a schema as below:

```
root
  |-- _1: struct (nullable = true)
  ||-- _1: integer (nullable = true)
```

and if we execute the codes below:

```scala
df.filter("_1 IS NOT NULL").count()
```

This pushes down a filter although this filter is being applied to 
`StructType`.(If my understanding is correct, Spark does not pushes down 
filters for those).

The reason is, `ParquetFilters.getFieldMap` produces results below:

```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```

and then it becomes a `Map`

```
(_1,IntegerType)
```

Now, because of ` lift(dataTypeOf(name)).map(_(name, value))`, this pushes 
down filters for `_1` which Parquet thinks is `IntegerType`. However, it is 
actually `StructType`.

So, Parquet filter2 produces incorrect results, for example, the codes below:

```
df.filter("_1 IS NOT NULL").count()
```

produces always 0.

This PR prevents this by not finding nested fields.

## How was this patch tested?

Unit test in `ParquetFilterSuite`.

Author: hyukjinkwon 

Closes #14067 from HyukjinKwon/SPARK-16371.

(cherry picked from commit 4f8ceed59367319300e4bfa5b957c387be81ffa3)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2465f072
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2465f072
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2465f072

Branch: refs/heads/branch-2.0
Commit: 2465f0728e95109ab851ab09b5badd697928ba2b
Parents: 03f336d
Author: hyukjinkwon 
Authored: Wed Jul 6 12:42:16 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 12:42:37 2016 -0700

--
 .../datasources/parquet/ParquetFileFormat.scala   |  2 +-
 .../datasources/parquet/ParquetFilters.scala  |  5 -
 .../datasources/parquet/ParquetFilterSuite.scala  | 14 ++
 3 files changed, 19 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f38bf81..8cbdaeb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -436,7 +436,7 @@ private[sql] class ParquetOutputWriterFactory(
 ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
 // We want to clear this temporary metadata from saving into Parquet file.
-// This metadata is only useful for detecting optional columns when 
pushdowning filters.
+// This metadata is only useful for detecting optional columns when 
pushing down filters.
 val dataSchemaToWrite = StructType.removeMetadata(
   StructType.metadataKeyForOptionalField,
   dataSchema).asInstanceOf[StructType]

http://git-wip-us.apache.org/repos/asf/spark/blob/2465f072/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 95afdc7..70ae829 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -215,10 +215,13 @@ private[sql] object ParquetFilters {
*/
   private def getFieldMap(dataType: DataType): Array[(String, DataType)] = 
dataType match {
 case StructType(fields) =>
+  // Here we don't flatten the fields in the nested schema but just look 
up through
+  // root fields. Currently, accessing to nested fields does not push down 
filters
+  // and it does not support to create filters for them.
   fields.filter { f =>
 !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
   !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-  }.map(f => f.name -> f.dataType) ++ 

spark git commit: [SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 480357cc6 -> 4f8ceed59


[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and 
outer name are the same in Parquet

## What changes were proposed in this pull request?

Currently, if there is a schema as below:

```
root
  |-- _1: struct (nullable = true)
  ||-- _1: integer (nullable = true)
```

and if we execute the codes below:

```scala
df.filter("_1 IS NOT NULL").count()
```

This pushes down a filter although this filter is being applied to 
`StructType`.(If my understanding is correct, Spark does not pushes down 
filters for those).

The reason is, `ParquetFilters.getFieldMap` produces results below:

```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```

and then it becomes a `Map`

```
(_1,IntegerType)
```

Now, because of ` lift(dataTypeOf(name)).map(_(name, value))`, this pushes 
down filters for `_1` which Parquet thinks is `IntegerType`. However, it is 
actually `StructType`.

So, Parquet filter2 produces incorrect results, for example, the codes below:

```
df.filter("_1 IS NOT NULL").count()
```

produces always 0.

This PR prevents this by not finding nested fields.

## How was this patch tested?

Unit test in `ParquetFilterSuite`.

Author: hyukjinkwon 

Closes #14067 from HyukjinKwon/SPARK-16371.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8ceed5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8ceed5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8ceed5

Branch: refs/heads/master
Commit: 4f8ceed59367319300e4bfa5b957c387be81ffa3
Parents: 480357c
Author: hyukjinkwon 
Authored: Wed Jul 6 12:42:16 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 12:42:16 2016 -0700

--
 .../datasources/parquet/ParquetFileFormat.scala   |  2 +-
 .../datasources/parquet/ParquetFilters.scala  |  5 -
 .../datasources/parquet/ParquetFilterSuite.scala  | 14 ++
 3 files changed, 19 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9833620..76d7f5c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
 ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
 
 // We want to clear this temporary metadata from saving into Parquet file.
-// This metadata is only useful for detecting optional columns when 
pushdowning filters.
+// This metadata is only useful for detecting optional columns when 
pushing down filters.
 val dataSchemaToWrite = StructType.removeMetadata(
   StructType.metadataKeyForOptionalField,
   dataSchema).asInstanceOf[StructType]

http://git-wip-us.apache.org/repos/asf/spark/blob/4f8ceed5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7213a38..e0a113a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
*/
   private def getFieldMap(dataType: DataType): Array[(String, DataType)] = 
dataType match {
 case StructType(fields) =>
+  // Here we don't flatten the fields in the nested schema but just look 
up through
+  // root fields. Currently, accessing to nested fields does not push down 
filters
+  // and it does not support to create filters for them.
   fields.filter { f =>
 !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
   !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-  }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => 
getFieldMap(f.dataType) }
+  }.map(f => f.name -> f.dataType)
 case _ => Array.empty[(String, DataType)]
  

spark git commit: [SPARK-16304] LinkageError should not crash Spark executor

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4e14199ff -> 480357cc6


[SPARK-16304] LinkageError should not crash Spark executor

## What changes were proposed in this pull request?
This patch updates the failure handling logic so Spark executor does not crash 
when seeing LinkageError.

## How was this patch tested?
Added an end-to-end test in FailureSuite.

Author: petermaxlee 

Closes #13982 from petermaxlee/SPARK-16304.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/480357cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/480357cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/480357cc

Branch: refs/heads/master
Commit: 480357cc6d71c682fe703611c71c1e6a36e6ce9a
Parents: 4e14199
Author: petermaxlee 
Authored: Wed Jul 6 10:46:22 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:46:22 2016 -0700

--
 core/src/main/scala/org/apache/spark/util/Utils.scala   | 6 +-
 core/src/test/scala/org/apache/spark/FailureSuite.scala | 9 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/480357cc/core/src/main/scala/org/apache/spark/util/Utils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 156cf17..298e624 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1881,7 +1881,11 @@ private[spark] object Utils extends Logging {
   /** Returns true if the given exception was fatal. See docs for 
scala.util.control.NonFatal. */
   def isFatalError(e: Throwable): Boolean = {
 e match {
-  case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: 
ControlThrowable =>
+  case NonFatal(_) |
+   _: InterruptedException |
+   _: NotImplementedError |
+   _: ControlThrowable |
+   _: LinkageError =>
 false
   case _ =>
 true

http://git-wip-us.apache.org/repos/asf/spark/blob/480357cc/core/src/test/scala/org/apache/spark/FailureSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala 
b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index 132f636..d805c67 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -253,6 +253,15 @@ class FailureSuite extends SparkFunSuite with 
LocalSparkContext {
 rdd.count()
   }
 
+  test("SPARK-16304: Link error should not crash executor") {
+sc = new SparkContext("local[1,2]", "test")
+intercept[SparkException] {
+  sc.parallelize(1 to 2).foreach { i =>
+throw new LinkageError()
+  }
+}
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master b1310425b -> 4e14199ff


[MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation

## What changes were proposed in this pull request?

This PR fixes wrongly formatted examples in PySpark documentation as below:

- **`SparkSession`**

  - **Before**

![2016-07-06 11 34 
41](https://cloud.githubusercontent.com/assets/6477701/16605847/ae939526-436d-11e6-8ab8-6ad578362425.png)

  - **After**

![2016-07-06 11 33 
56](https://cloud.githubusercontent.com/assets/6477701/16605845/ace9ee78-436d-11e6-8923-b76d4fc3e7c3.png)

- **`Builder`**

  - **Before**
![2016-07-06 11 34 
44](https://cloud.githubusercontent.com/assets/6477701/16605844/aba60dbc-436d-11e6-990a-c87bc0281c6b.png)

  - **After**
![2016-07-06 1 26 
37](https://cloud.githubusercontent.com/assets/6477701/16607562/586704c0-437d-11e6-9483-e0af93d8f74e.png)

This PR also fixes several similar instances across the documentation in `sql` 
PySpark module.

## How was this patch tested?

N/A

Author: hyukjinkwon 

Closes #14063 from HyukjinKwon/minor-pyspark-builder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e14199f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e14199f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e14199f

Branch: refs/heads/master
Commit: 4e14199ff740ea186eb2cec2e5cf901b58c5f90e
Parents: b131042
Author: hyukjinkwon 
Authored: Wed Jul 6 10:45:51 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:45:51 2016 -0700

--
 python/pyspark/mllib/clustering.py | 14 +++---
 python/pyspark/sql/dataframe.py|  8 
 python/pyspark/sql/functions.py|  8 
 python/pyspark/sql/group.py|  2 ++
 python/pyspark/sql/session.py  | 13 +++--
 python/pyspark/sql/types.py|  4 ++--
 6 files changed, 26 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/mllib/clustering.py
--
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index 93a0b64..c38c543 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -571,14 +571,14 @@ class PowerIterationClusteringModel(JavaModelWrapper, 
JavaSaveable, JavaLoader):
 
 >>> import math
 >>> def genCircle(r, n):
-...   points = []
-...   for i in range(0, n):
-... theta = 2.0 * math.pi * i / n
-... points.append((r * math.cos(theta), r * math.sin(theta)))
-...   return points
+... points = []
+... for i in range(0, n):
+... theta = 2.0 * math.pi * i / n
+... points.append((r * math.cos(theta), r * math.sin(theta)))
+... return points
 >>> def sim(x, y):
-...   dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
-...   return math.exp(-dist2 / 2.0)
+... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - 
y[1])
+... return math.exp(-dist2 / 2.0)
 >>> r1 = 1.0
 >>> n1 = 10
 >>> r2 = 4.0

http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e44b01b..a0ac7a9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1045,10 +1045,10 @@ class DataFrame(object):
 :func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
 
 >>> from pyspark.sql import Row
->>> df = sc.parallelize([ \
-Row(name='Alice', age=5, height=80), \
-Row(name='Alice', age=5, height=80), \
-Row(name='Alice', age=10, height=80)]).toDF()
+>>> df = sc.parallelize([ \\
+... Row(name='Alice', age=5, height=80), \\
+... Row(name='Alice', age=5, height=80), \\
+... Row(name='Alice', age=10, height=80)]).toDF()
 >>> df.dropDuplicates().show()
 +---+--+-+
 |age|height| name|

http://git-wip-us.apache.org/repos/asf/spark/blob/4e14199f/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 7a73451..92d709e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1550,8 +1550,8 @@ def translate(srcCol, matching, replace):
 The translate will happen when any character in the string matching with 
the character
 in the `matching`.
 
->>> 

spark git commit: [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 091cd5f26 -> 03f336d89


[MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation

## What changes were proposed in this pull request?

This PR fixes wrongly formatted examples in PySpark documentation as below:

- **`SparkSession`**

  - **Before**

![2016-07-06 11 34 
41](https://cloud.githubusercontent.com/assets/6477701/16605847/ae939526-436d-11e6-8ab8-6ad578362425.png)

  - **After**

![2016-07-06 11 33 
56](https://cloud.githubusercontent.com/assets/6477701/16605845/ace9ee78-436d-11e6-8923-b76d4fc3e7c3.png)

- **`Builder`**

  - **Before**
![2016-07-06 11 34 
44](https://cloud.githubusercontent.com/assets/6477701/16605844/aba60dbc-436d-11e6-990a-c87bc0281c6b.png)

  - **After**
![2016-07-06 1 26 
37](https://cloud.githubusercontent.com/assets/6477701/16607562/586704c0-437d-11e6-9483-e0af93d8f74e.png)

This PR also fixes several similar instances across the documentation in `sql` 
PySpark module.

## How was this patch tested?

N/A

Author: hyukjinkwon 

Closes #14063 from HyukjinKwon/minor-pyspark-builder.

(cherry picked from commit 4e14199ff740ea186eb2cec2e5cf901b58c5f90e)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03f336d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03f336d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03f336d8

Branch: refs/heads/branch-2.0
Commit: 03f336d8921e1f22ee4d1f6fa8869163b1f29ea9
Parents: 091cd5f
Author: hyukjinkwon 
Authored: Wed Jul 6 10:45:51 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:45:56 2016 -0700

--
 python/pyspark/mllib/clustering.py | 14 +++---
 python/pyspark/sql/dataframe.py|  8 
 python/pyspark/sql/functions.py|  8 
 python/pyspark/sql/group.py|  2 ++
 python/pyspark/sql/session.py  | 13 +++--
 python/pyspark/sql/types.py|  4 ++--
 6 files changed, 26 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/mllib/clustering.py
--
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index 93a0b64..c38c543 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -571,14 +571,14 @@ class PowerIterationClusteringModel(JavaModelWrapper, 
JavaSaveable, JavaLoader):
 
 >>> import math
 >>> def genCircle(r, n):
-...   points = []
-...   for i in range(0, n):
-... theta = 2.0 * math.pi * i / n
-... points.append((r * math.cos(theta), r * math.sin(theta)))
-...   return points
+... points = []
+... for i in range(0, n):
+... theta = 2.0 * math.pi * i / n
+... points.append((r * math.cos(theta), r * math.sin(theta)))
+... return points
 >>> def sim(x, y):
-...   dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
-...   return math.exp(-dist2 / 2.0)
+... dist2 = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - 
y[1])
+... return math.exp(-dist2 / 2.0)
 >>> r1 = 1.0
 >>> n1 = 10
 >>> r2 = 4.0

http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e6e7029..c7d704a 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1033,10 +1033,10 @@ class DataFrame(object):
 :func:`drop_duplicates` is an alias for :func:`dropDuplicates`.
 
 >>> from pyspark.sql import Row
->>> df = sc.parallelize([ \
-Row(name='Alice', age=5, height=80), \
-Row(name='Alice', age=5, height=80), \
-Row(name='Alice', age=10, height=80)]).toDF()
+>>> df = sc.parallelize([ \\
+... Row(name='Alice', age=5, height=80), \\
+... Row(name='Alice', age=5, height=80), \\
+... Row(name='Alice', age=10, height=80)]).toDF()
 >>> df.dropDuplicates().show()
 +---+--+-+
 |age|height| name|

http://git-wip-us.apache.org/repos/asf/spark/blob/03f336d8/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 15cefc8..1feca6e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1550,8 +1550,8 @@ def translate(srcCol, matching, replace):
 The translate will 

spark git commit: [DOC][SQL] update out-of-date code snippets using SQLContext in all documents.

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 23eff5e51 -> b1310425b


[DOC][SQL] update out-of-date code snippets using SQLContext in all documents.

## What changes were proposed in this pull request?

I search the whole documents directory using SQLContext, and update the 
following places:

- docs/configuration.md, sparkR code snippets.
- docs/streaming-programming-guide.md, several example code.

## How was this patch tested?

N/A

Author: WeichenXu 

Closes #14025 from WeichenXu123/WIP_SQLContext_update.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1310425
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1310425
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1310425

Branch: refs/heads/master
Commit: b1310425b30cbd711e4834d65a0accb3c5a8403a
Parents: 23eff5e
Author: WeichenXu 
Authored: Wed Jul 6 10:41:48 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:41:48 2016 -0700

--
 docs/configuration.md   |  4 ++--
 docs/streaming-programming-guide.md | 39 +---
 2 files changed, 23 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1310425/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index cee59cf..1e95b86 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1564,8 +1564,8 @@ spark.sql("SET -v").show(n=200, truncate=False)
 
 
 {% highlight r %}
-# sqlContext is an existing sqlContext.
-properties <- sql(sqlContext, "SET -v")
+sparkR.session()
+properties <- sql("SET -v")
 showDF(properties, numRows = 200, truncate = FALSE)
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b1310425/docs/streaming-programming-guide.md
--
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index db06a65..2ee3b80 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1534,7 +1534,7 @@ See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
 ***
 
 ## DataFrame and SQL Operations
-You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SQLContext using the SparkContext that 
the StreamingContext is using. Furthermore this has to done such that it can be 
restarted on driver failures. This is done by creating a lazily instantiated 
singleton instance of SQLContext. This is shown in the following example. It 
modifies the earlier [word count example](#a-quick-example) to generate word 
counts using DataFrames and SQL. Each RDD is converted to a DataFrame, 
registered as a temporary table and then queried using SQL.
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SparkSession using the SparkContext 
that the StreamingContext is using. Furthermore this has to done such that it 
can be restarted on driver failures. This is done by creating a lazily 
instantiated singleton instance of SparkSession. This is shown in the following 
example. It modifies the earlier [word count example](#a-quick-example) to 
generate word counts using DataFrames and SQL. Each RDD is converted to a 
DataFrame, registered as a temporary table and then queried using SQL.
 
 
 
@@ -1546,9 +1546,9 @@ val words: DStream[String] = ...
 
 words.foreachRDD { rdd =>
 
-  // Get the singleton instance of SQLContext
-  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
-  import sqlContext.implicits._
+  // Get the singleton instance of SparkSession
+  val spark = 
SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
+  import spark.implicits._
 
   // Convert RDD[String] to DataFrame
   val wordsDataFrame = rdd.toDF("word")
@@ -1558,7 +1558,7 @@ words.foreachRDD { rdd =>
 
   // Do word count on DataFrame using SQL and print it
   val wordCountsDataFrame = 
-sqlContext.sql("select word, count(*) as total from words group by word")
+spark.sql("select word, count(*) as total from words group by word")
   wordCountsDataFrame.show()
 }
 
@@ -1593,8 +1593,8 @@ words.foreachRDD(
 @Override
 public Void call(JavaRDD rdd, Time time) {
 
-  // Get the singleton instance of SQLContext
-  SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
+  // Get the singleton instance of SparkSession
+  SparkSession spark = 
SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
 
   // Convert RDD[String] to RDD[case class] to DataFrame
   JavaRDD rowRDD = 

spark git commit: [DOC][SQL] update out-of-date code snippets using SQLContext in all documents.

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e956bd775 -> 091cd5f26


[DOC][SQL] update out-of-date code snippets using SQLContext in all documents.

## What changes were proposed in this pull request?

I search the whole documents directory using SQLContext, and update the 
following places:

- docs/configuration.md, sparkR code snippets.
- docs/streaming-programming-guide.md, several example code.

## How was this patch tested?

N/A

Author: WeichenXu 

Closes #14025 from WeichenXu123/WIP_SQLContext_update.

(cherry picked from commit b1310425b30cbd711e4834d65a0accb3c5a8403a)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/091cd5f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/091cd5f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/091cd5f2

Branch: refs/heads/branch-2.0
Commit: 091cd5f265166512a450333946c62c3eb3440e79
Parents: e956bd7
Author: WeichenXu 
Authored: Wed Jul 6 10:41:48 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:41:54 2016 -0700

--
 docs/configuration.md   |  4 ++--
 docs/streaming-programming-guide.md | 39 +---
 2 files changed, 23 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/091cd5f2/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index cee59cf..1e95b86 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1564,8 +1564,8 @@ spark.sql("SET -v").show(n=200, truncate=False)
 
 
 {% highlight r %}
-# sqlContext is an existing sqlContext.
-properties <- sql(sqlContext, "SET -v")
+sparkR.session()
+properties <- sql("SET -v")
 showDF(properties, numRows = 200, truncate = FALSE)
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/091cd5f2/docs/streaming-programming-guide.md
--
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index db06a65..2ee3b80 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1534,7 +1534,7 @@ See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
 ***
 
 ## DataFrame and SQL Operations
-You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SQLContext using the SparkContext that 
the StreamingContext is using. Furthermore this has to done such that it can be 
restarted on driver failures. This is done by creating a lazily instantiated 
singleton instance of SQLContext. This is shown in the following example. It 
modifies the earlier [word count example](#a-quick-example) to generate word 
counts using DataFrames and SQL. Each RDD is converted to a DataFrame, 
registered as a temporary table and then queried using SQL.
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SparkSession using the SparkContext 
that the StreamingContext is using. Furthermore this has to done such that it 
can be restarted on driver failures. This is done by creating a lazily 
instantiated singleton instance of SparkSession. This is shown in the following 
example. It modifies the earlier [word count example](#a-quick-example) to 
generate word counts using DataFrames and SQL. Each RDD is converted to a 
DataFrame, registered as a temporary table and then queried using SQL.
 
 
 
@@ -1546,9 +1546,9 @@ val words: DStream[String] = ...
 
 words.foreachRDD { rdd =>
 
-  // Get the singleton instance of SQLContext
-  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
-  import sqlContext.implicits._
+  // Get the singleton instance of SparkSession
+  val spark = 
SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
+  import spark.implicits._
 
   // Convert RDD[String] to DataFrame
   val wordsDataFrame = rdd.toDF("word")
@@ -1558,7 +1558,7 @@ words.foreachRDD { rdd =>
 
   // Do word count on DataFrame using SQL and print it
   val wordCountsDataFrame = 
-sqlContext.sql("select word, count(*) as total from words group by word")
+spark.sql("select word, count(*) as total from words group by word")
   wordCountsDataFrame.show()
 }
 
@@ -1593,8 +1593,8 @@ words.foreachRDD(
 @Override
 public Void call(JavaRDD rdd, Time time) {
 
-  // Get the singleton instance of SQLContext
-  SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
+  // Get the singleton instance of SparkSession
+  SparkSession spark = 

spark git commit: [SPARK-15979][SQL] Renames CatalystWriteSupport to ParquetWriteSupport

2016-07-06 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 478b71d02 -> 23eff5e51


[SPARK-15979][SQL] Renames CatalystWriteSupport to ParquetWriteSupport

## What changes were proposed in this pull request?

PR #13696 renamed various Parquet support classes but left 
`CatalystWriteSupport` behind. This PR is renames it as a follow-up.

## How was this patch tested?

N/A.

Author: Cheng Lian 

Closes #14070 from liancheng/spark-15979-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23eff5e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23eff5e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23eff5e5

Branch: refs/heads/master
Commit: 23eff5e512df5710ea6591a3fce321b53eb3fb0b
Parents: 478b71d
Author: Cheng Lian 
Authored: Wed Jul 6 10:36:45 2016 -0700
Committer: Reynold Xin 
Committed: Wed Jul 6 10:36:45 2016 -0700

--
 .../parquet/CatalystWriteSupport.scala  | 437 ---
 .../datasources/parquet/ParquetFileFormat.scala |  14 +-
 .../parquet/ParquetWriteSupport.scala   | 437 +++
 3 files changed, 444 insertions(+), 444 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23eff5e5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
deleted file mode 100644
index 00e1bca..000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util
-
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.io.api.{Binary, RecordConsumer}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter.minBytesForPrecision
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
-
-/**
- * A Parquet [[WriteSupport]] implementation that writes Catalyst 
[[InternalRow]]s as Parquet
- * messages.  This class can write Parquet data in two modes:
- *
- *  - Standard mode: Parquet data are written in standard format defined in 
parquet-format spec.
- *  - Legacy mode: Parquet data are written in legacy format compatible with 
Spark 1.4 and prior.
- *
- * This behavior can be controlled by SQL option 
`spark.sql.parquet.writeLegacyFormat`.  The value
- * of this option is propagated to this class by the `init()` method and its 
Hadoop configuration
- * argument.
- */
-private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] 
with Logging {
-  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
the record consumer.
-  // Here we are using `SpecializedGetters` rather than `InternalRow` so that 
we can directly access
-  // data in `ArrayData` without the help of `SpecificMutableRow`.
-  private type ValueWriter = (SpecializedGetters, Int) => Unit
-
-  // Schema of the `InternalRow`s to be written
-  private var schema: StructType = _
-
-  // `ValueWriter`s for all fields of the schema
-  private var rootFieldWriters: Seq[ValueWriter] = _
-
- 

spark git commit: [SPARK-15591][WEBUI] Paginate Stage Table in Stages tab

2016-07-06 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 21eadd1d8 -> 478b71d02


[SPARK-15591][WEBUI] Paginate Stage Table in Stages tab

## What changes were proposed in this pull request?

This patch adds pagination support for the Stage Tables in the Stage tab. 
Pagination is provided for all of the four Job Tables (active, pending, 
completed, and failed). Besides, the paged stage tables are also used in 
JobPage (the detail page for one job) and PoolPage.

Interactions (jumping, sorting, and setting page size) for paged tables are 
also included.

## How was this patch tested?

Tested manually by using checking the Web UI after completing and failing 
hundreds of jobs.  Same as the testings for [Paginate Job Table in Jobs 
tab](https://github.com/apache/spark/pull/13620).

This shows the pagination for completed stages:
![paged stage 
table](https://cloud.githubusercontent.com/assets/5558370/16125696/5804e35e-3427-11e6-8923-5c5948982648.png)

Author: Tao Lin 

Closes #13708 from nblintao/stageTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/478b71d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/478b71d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/478b71d0

Branch: refs/heads/master
Commit: 478b71d028107d42fbb6d1bd300b86efbe0dcf7d
Parents: 21eadd1
Author: Tao Lin 
Authored: Wed Jul 6 10:28:05 2016 -0700
Committer: Shixiong Zhu 
Committed: Wed Jul 6 10:28:05 2016 -0700

--
 .../scala/org/apache/spark/ui/PagedTable.scala  |   1 +
 .../apache/spark/ui/jobs/AllStagesPage.scala|  25 +-
 .../org/apache/spark/ui/jobs/JobPage.scala  |  24 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala |  15 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   | 517 +++
 5 files changed, 441 insertions(+), 141 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala 
b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 9b6ed8c..2a7c16b 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -179,6 +179,7 @@ private[ui] trait PagedTable[T] {
   Splitter
 .on('&')
 .trimResults()
+.omitEmptyStrings()
 .withKeyValueSeparator("=")
 .split(querystring)
 .asScala

http://git-wip-us.apache.org/repos/asf/spark/blob/478b71d0/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
index e75f1c5..cba8f82 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -38,22 +38,24 @@ private[ui] class AllStagesPage(parent: StagesTab) extends 
WebUIPage("") {
   val numCompletedStages = listener.numCompletedStages
   val failedStages = listener.failedStages.reverse.toSeq
   val numFailedStages = listener.numFailedStages
-  val now = System.currentTimeMillis
+  val subPath = "stages"
 
   val activeStagesTable =
-new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-  parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
-  killEnabled = parent.killEnabled)
+new StageTableBase(request, activeStages, "activeStage", 
parent.basePath, subPath,
+  parent.progressListener, parent.isFairScheduler,
+  killEnabled = parent.killEnabled, isFailedStage = false)
   val pendingStagesTable =
-new StageTableBase(pendingStages.sortBy(_.submissionTime).reverse,
-  parent.basePath, parent.progressListener, isFairScheduler = 
parent.isFairScheduler,
-  killEnabled = false)
+new StageTableBase(request, pendingStages, "pendingStage", 
parent.basePath, subPath,
+  parent.progressListener, parent.isFairScheduler,
+  killEnabled = false, isFailedStage = false)
   val completedStagesTable =
-new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, 
parent.basePath,
-  parent.progressListener, isFairScheduler = parent.isFairScheduler, 
killEnabled = false)
+new StageTableBase(request, completedStages, "completedStage", 
parent.basePath, subPath,
+  parent.progressListener, parent.isFairScheduler,
+  killEnabled = false, 

spark git commit: [MINOR][CORE][1.6-BACKPORT] Fix display wrong free memory size in the log

2016-07-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 76781950f -> 2588776ad


[MINOR][CORE][1.6-BACKPORT] Fix display wrong free memory size in the log

## What changes were proposed in this pull request?

Free memory size displayed in the log is wrong (used memory), fix to make it 
correct. Backported to 1.6.

## How was this patch tested?

N/A

Author: jerryshao 

Closes #14043 from jerryshao/memory-log-fix-1.6-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2588776a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2588776a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2588776a

Branch: refs/heads/branch-1.6
Commit: 2588776ad3d91e39300d61c8a12dc0803c28e866
Parents: 7678195
Author: jerryshao 
Authored: Wed Jul 6 14:49:21 2016 +0100
Committer: Sean Owen 
Committed: Wed Jul 6 14:49:21 2016 +0100

--
 core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2588776a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
--
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 17aae6e..aed0da9 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -393,7 +393,8 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
 }
 val valuesOrBytes = if (deserialized) "values" else "bytes"
 logInfo("Block %s stored as %s in memory (estimated size %s, free 
%s)".format(
-  blockId, valuesOrBytes, Utils.bytesToString(size), 
Utils.bytesToString(blocksMemoryUsed)))
+  blockId, valuesOrBytes, Utils.bytesToString(size),
+  Utils.bytesToString(maxMemory - blocksMemoryUsed)))
   } else {
 // Tell the block manager that we couldn't put it in memory so that it 
can drop it to
 // disk if the block allows disk storage.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails

2016-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d5d2457e4 -> e956bd775


[SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails

 What changes were proposed in this pull request?
In `CREATE TABLE AS SELECT`, if the `SELECT` query failed, the table should not 
exist. For example,

```SQL
CREATE TABLE tab
STORED AS TEXTFILE
SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b
```
The above query failed as expected but an empty table `t` is created.

This PR is to drop the created table when hitting any non-fatal exception.

 How was this patch tested?
Added a test case to verify the behavior

Author: gatorsmile 

Closes #13926 from gatorsmile/dropTableAfterException.

(cherry picked from commit 21eadd1d8cbf029197e73ffca1cba54d5a890c01)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e956bd77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e956bd77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e956bd77

Branch: refs/heads/branch-2.0
Commit: e956bd7750882ce259a278e9eac7f64b4fb76286
Parents: d5d2457
Author: gatorsmile 
Authored: Wed Jul 6 21:43:55 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Jul 6 21:45:30 2016 +0800

--
 .../execution/CreateHiveTableAsSelectCommand.scala   | 13 +++--
 .../spark/sql/hive/execution/HiveDDLSuite.scala  | 15 +++
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e956bd77/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index b809938..15a5d79 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
@@ -87,8 +89,15 @@ case class CreateHiveTableAsSelectCommand(
 throw new AnalysisException(s"$tableIdentifier already exists.")
   }
 } else {
-  sparkSession.sessionState.executePlan(InsertIntoTable(
-metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+  try {
+sparkSession.sessionState.executePlan(InsertIntoTable(
+  metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+  } catch {
+case NonFatal(e) =>
+  // drop the created table.
+  sparkSession.sessionState.catalog.dropTable(tableIdentifier, 
ignoreIfNotExists = true)
+  throw e
+  }
 }
 
 Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/e956bd77/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 89f69c8..9d3c4cd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -554,6 +554,21 @@ class HiveDDLSuite
 }
   }
 
+  test("Create Cataloged Table As Select - Drop Table After Runtime 
Exception") {
+withTable("tab") {
+  intercept[RuntimeException] {
+sql(
+  """
+|CREATE TABLE tab
+|STORED AS TEXTFILE
+|SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 
AS a) t) AS b
+  """.stripMargin)
+  }
+  // After hitting runtime exception, we should drop the created table.
+  assert(!spark.sessionState.catalog.tableExists(TableIdentifier("tab")))
+}
+  }
+
   test("desc table for data source table") {
 withTable("tab1") {
   val tabName = "tab1"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails

2016-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 909c6d812 -> 21eadd1d8


[SPARK-16229][SQL] Drop Empty Table After CREATE TABLE AS SELECT fails

 What changes were proposed in this pull request?
In `CREATE TABLE AS SELECT`, if the `SELECT` query failed, the table should not 
exist. For example,

```SQL
CREATE TABLE tab
STORED AS TEXTFILE
SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b
```
The above query failed as expected but an empty table `t` is created.

This PR is to drop the created table when hitting any non-fatal exception.

 How was this patch tested?
Added a test case to verify the behavior

Author: gatorsmile 

Closes #13926 from gatorsmile/dropTableAfterException.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21eadd1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21eadd1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21eadd1d

Branch: refs/heads/master
Commit: 21eadd1d8cbf029197e73ffca1cba54d5a890c01
Parents: 909c6d8
Author: gatorsmile 
Authored: Wed Jul 6 21:43:55 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Jul 6 21:43:55 2016 +0800

--
 .../execution/CreateHiveTableAsSelectCommand.scala   | 13 +++--
 .../spark/sql/hive/execution/HiveDDLSuite.scala  | 15 +++
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21eadd1d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index b809938..15a5d79 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
@@ -87,8 +89,15 @@ case class CreateHiveTableAsSelectCommand(
 throw new AnalysisException(s"$tableIdentifier already exists.")
   }
 } else {
-  sparkSession.sessionState.executePlan(InsertIntoTable(
-metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+  try {
+sparkSession.sessionState.executePlan(InsertIntoTable(
+  metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+  } catch {
+case NonFatal(e) =>
+  // drop the created table.
+  sparkSession.sessionState.catalog.dropTable(tableIdentifier, 
ignoreIfNotExists = true)
+  throw e
+  }
 }
 
 Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/21eadd1d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 89f69c8..9d3c4cd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -554,6 +554,21 @@ class HiveDDLSuite
 }
   }
 
+  test("Create Cataloged Table As Select - Drop Table After Runtime 
Exception") {
+withTable("tab") {
+  intercept[RuntimeException] {
+sql(
+  """
+|CREATE TABLE tab
+|STORED AS TEXTFILE
+|SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 
AS a) t) AS b
+  """.stripMargin)
+  }
+  // After hitting runtime exception, we should drop the created table.
+  assert(!spark.sessionState.catalog.tableExists(TableIdentifier("tab")))
+}
+  }
+
   test("desc table for data source table") {
 withTable("tab1") {
   val tabName = "tab1"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached

2016-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 25006c8bc -> d5d2457e4


[SPARK-15968][SQL] Nonempty partitioned metastore tables are not cached

This PR backports your fix (https://github.com/apache/spark/pull/13818) to 
branch 2.0.

This PR addresses 
[SPARK-15968](https://issues.apache.org/jira/browse/SPARK-15968).

## What changes were proposed in this pull request?

The `getCached` method of 
[HiveMetastoreCatalog](https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala)
 computes `pathsInMetastore` from the metastore relation's catalog table. This 
only returns the table base path, which is incomplete/inaccurate for a nonempty 
partitioned table. As a result, cached lookups on nonempty partitioned tables 
always miss.

Rather than get `pathsInMetastore` from

metastoreRelation.catalogTable.storage.locationUri.toSeq

I modified the `getCached` method to take a `pathsInMetastore` argument. Calls 
to this method pass in the paths computed from calls to the Hive metastore. 
This is how `getCached` was implemented in Spark 1.5:

https://github.com/apache/spark/blob/e0c3212a9b42e3e704b070da4ac25b68c584427f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L444.

I also added a call in `InsertIntoHiveTable.scala` to invalidate the table from 
the SQL session catalog.

## How was this patch tested?

I've added a new unit test to `parquetSuites.scala`:

SPARK-15968: nonempty partitioned metastore Parquet table lookup should use 
cached relation

Note that the only difference between this new test and the one above it in the 
file is that the new test populates its partitioned table with a single value, 
while the existing test leaves the table empty. This reveals a subtle, 
unexpected hole in test coverage present before this patch.

Note I also modified a different but related unit test in `parquetSuites.scala`:

SPARK-15248: explicitly added partitions should be readable

This unit test asserts that Spark SQL should return data from a table partition 
which has been placed there outside a metastore query immediately after it is 
added. I changed the test so that, instead of adding the data as a parquet file 
saved in the partition's location, the data is added through a SQL `INSERT` 
query. I made this change because I could find no way to efficiently support 
partitioned table caching without failing that test.

In addition to my primary motivation, I can offer a few reasons I believe this 
is an acceptable weakening of that test. First, it still validates a fix for 
[SPARK-15248](https://issues.apache.org/jira/browse/SPARK-15248), the issue for 
which it was written. Second, the assertion made is stronger than that required 
for non-partitioned tables. If you write data to the storage location of a 
non-partitioned metastore table without using a proper SQL DML query, a 
subsequent call to show that data will not return it. I believe this is an 
intentional limitation put in place to make table caching feasible, but I'm 
only speculating.

Building a large `HadoopFsRelation` requires `stat`-ing all of its data files. 
In our environment, where we have tables with 10's of thousands of partitions, 
the difference between using a cached relation versus a new one is a matter of 
seconds versus minutes. Caching partitioned table metadata vastly improves the 
usability of Spark SQL for these cases.

Author: Reynold Xin 
Author: Michael Allman 

Closes #14064 from yhuai/spark-15968-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5d2457e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5d2457e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5d2457e

Branch: refs/heads/branch-2.0
Commit: d5d2457e41661a3402a8258026856454222a2f54
Parents: 25006c8
Author: Reynold Xin 
Authored: Wed Jul 6 21:35:56 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Jul 6 21:35:56 2016 +0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 16 -
 .../hive/execution/InsertIntoHiveTable.scala|  1 +
 .../apache/spark/sql/hive/parquetSuites.scala   | 61 ++--
 3 files changed, 59 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5d2457e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d6c5325..789f94a 100644
--- 

spark git commit: [MINOR][BUILD] Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published on Apache mirrors

2016-07-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4fcb88843 -> 76781950f


[MINOR][BUILD] Download Maven 3.3.9 instead of 3.3.3 because the latter is no 
longer published on Apache mirrors

## What changes were proposed in this pull request?

Download Maven 3.3.9 instead of 3.3.3 because the latter is no longer published 
on Apache mirrors

## How was this patch tested?

Jenkins

Author: Sean Owen 

Closes #14066 from srowen/Maven339Branch16.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76781950
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76781950
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76781950

Branch: refs/heads/branch-1.6
Commit: 76781950fd500ace0f939951fc7a94a58aca87c4
Parents: 4fcb888
Author: Sean Owen 
Authored: Wed Jul 6 12:27:17 2016 +0100
Committer: Sean Owen 
Committed: Wed Jul 6 12:27:17 2016 +0100

--
 build/mvn | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/76781950/build/mvn
--
diff --git a/build/mvn b/build/mvn
index c2b1427..8598386 100755
--- a/build/mvn
+++ b/build/mvn
@@ -69,7 +69,7 @@ install_app() {
 
 # Install maven under the build/ folder
 install_mvn() {
-  local MVN_VERSION="3.3.3"
+  local MVN_VERSION="3.3.9"
 
   install_app \
 
"https://www.apache.org/dyn/closer.lua?action=download=/maven/maven-3/${MVN_VERSION}/binaries;
 \


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16307][ML] Add test to verify the predicted variances of a DT on toy data

2016-07-06 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 7e28fabdf -> 909c6d812


[SPARK-16307][ML] Add test to verify the predicted variances of a DT on toy data

## What changes were proposed in this pull request?

The current tests assumes that `impurity.calculate()` returns the variance 
correctly. It should be better to make the tests independent of this 
assumption. In other words verify that the variance computed equals the 
variance computed manually on a small tree.

## How was this patch tested?

The patch is a test

Author: MechCoder 

Closes #13981 from MechCoder/dt_variance.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/909c6d81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/909c6d81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/909c6d81

Branch: refs/heads/master
Commit: 909c6d812f6ca3a3305e4611a700c8c17905b953
Parents: 7e28fab
Author: MechCoder 
Authored: Wed Jul 6 02:54:44 2016 -0700
Committer: Yanbo Liang 
Committed: Wed Jul 6 02:54:44 2016 -0700

--
 .../regression/DecisionTreeRegressorSuite.scala | 20 
 .../apache/spark/ml/tree/impl/TreeTests.scala   | 12 
 2 files changed, 32 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/909c6d81/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
index 9afb742..15fa26e 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala
@@ -22,6 +22,7 @@ import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.tree.impl.TreeTests
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
 import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree,
   DecisionTreeSuite => OldDecisionTreeSuite}
@@ -96,6 +97,25 @@ class DecisionTreeRegressorSuite
   assert(variance === expectedVariance,
 s"Expected variance $expectedVariance but got $variance.")
 }
+
+val varianceData: RDD[LabeledPoint] = TreeTests.varianceData(sc)
+val varianceDF = TreeTests.setMetadata(varianceData, Map.empty[Int, Int], 
0)
+dt.setMaxDepth(1)
+  .setMaxBins(6)
+  .setSeed(0)
+val transformVarDF = dt.fit(varianceDF).transform(varianceDF)
+val calculatedVariances = 
transformVarDF.select(dt.getVarianceCol).collect().map {
+  case Row(variance: Double) => variance
+}
+
+// Since max depth is set to 1, the best split point is that which splits 
the data
+// into (0.0, 1.0, 2.0) and (10.0, 12.0, 14.0). The predicted variance for 
each
+// data point in the left node is 0.667 and for each data point in the 
right node
+// is 2.667
+val expectedVariances = Array(0.667, 0.667, 0.667, 2.667, 2.667, 2.667)
+calculatedVariances.zip(expectedVariances).foreach { case (actual, 
expected) =>
+  assert(actual ~== expected absTol 1e-3)
+}
   }
 
   test("Feature importance with toy data") {

http://git-wip-us.apache.org/repos/asf/spark/blob/909c6d81/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala 
b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
index d2fa8d0..c90cb8c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/TreeTests.scala
@@ -183,6 +183,18 @@ private[ml] object TreeTests extends SparkFunSuite {
   ))
 
   /**
+   * Create some toy data for testing correctness of variance.
+   */
+  def varianceData(sc: SparkContext): RDD[LabeledPoint] = sc.parallelize(Seq(
+new LabeledPoint(1.0, Vectors.dense(Array(0.0))),
+new LabeledPoint(2.0, Vectors.dense(Array(1.0))),
+new LabeledPoint(3.0, Vectors.dense(Array(2.0))),
+new LabeledPoint(10.0, Vectors.dense(Array(3.0))),
+new LabeledPoint(12.0, Vectors.dense(Array(4.0))),
+new LabeledPoint(14.0, Vectors.dense(Array(5.0)))
+  ))
+
+  /**
* Mapping from all Params to valid settings which differ from the defaults.
* This is useful for tests which need to exercise all Params, such as 
save/load.
* This excludes input columns to 

spark git commit: [SPARK-16388][SQL] Remove spark.sql.nativeView and spark.sql.nativeView.canonical config

2016-07-06 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 5497242c7 -> 7e28fabdf


[SPARK-16388][SQL] Remove spark.sql.nativeView and 
spark.sql.nativeView.canonical config

## What changes were proposed in this pull request?
These two configs should always be true after Spark 2.0. This patch removes 
them from the config list. Note that ideally this should've gone into 
branch-2.0, but due to the timing of the release we should only merge this in 
master for Spark 2.1.

## How was this patch tested?
Updated test cases.

Author: Reynold Xin 

Closes #14061 from rxin/SPARK-16388.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e28fabd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e28fabd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e28fabd

Branch: refs/heads/master
Commit: 7e28fabdff2da1cc374efbf43372d92ae0cd07aa
Parents: 5497242
Author: Reynold Xin 
Authored: Wed Jul 6 17:40:55 2016 +0800
Committer: Cheng Lian 
Committed: Wed Jul 6 17:40:55 2016 +0800

--
 .../spark/sql/execution/command/views.scala |  39 +---
 .../org/apache/spark/sql/internal/SQLConf.scala |  23 ---
 .../spark/sql/internal/SQLConfSuite.scala   |  16 +-
 .../spark/sql/hive/execution/SQLViewSuite.scala | 206 ---
 4 files changed, 106 insertions(+), 178 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e28fabd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 088f684..007fa46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -149,37 +149,18 @@ case class CreateViewCommand(
* SQL based on the analyzed plan, and also creates the proper schema for 
the view.
*/
   private def prepareTable(sparkSession: SparkSession, analyzedPlan: 
LogicalPlan): CatalogTable = {
-val viewSQL: String =
-  if (sparkSession.sessionState.conf.canonicalView) {
-val logicalPlan =
-  if (tableDesc.schema.isEmpty) {
-analyzedPlan
-  } else {
-val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
-  case (attr, col) => Alias(attr, col.name)()
-}
-sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
-  }
-new SQLBuilder(logicalPlan).toSQL
-  } else {
-// When user specified column names for view, we should create a 
project to do the renaming.
-// When no column name specified, we still need to create a project to 
declare the columns
-// we need, to make us more robust to top level `*`s.
-val viewOutput = {
-  val columnNames = analyzedPlan.output.map(f => quote(f.name))
-  if (tableDesc.schema.isEmpty) {
-columnNames.mkString(", ")
-  } else {
-columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
-  case (name, alias) => s"$name AS $alias"
-}.mkString(", ")
+val viewSQL: String = {
+  val logicalPlan =
+if (tableDesc.schema.isEmpty) {
+  analyzedPlan
+} else {
+  val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+case (attr, col) => Alias(attr, col.name)()
   }
+  sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
 }
-
-val viewText = tableDesc.viewText.get
-val viewName = quote(tableDesc.identifier.table)
-s"SELECT $viewOutput FROM ($viewText) $viewName"
-  }
+  new SQLBuilder(logicalPlan).toSQL
+}
 
 // Validate the view SQL - make sure we can parse it and analyze it.
 // If we cannot analyze the generated query, there is probably a bug in 
SQL generation.

http://git-wip-us.apache.org/repos/asf/spark/blob/7e28fabd/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1a9bb6a..5ab0c1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -258,25 +258,6 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
-  val NATIVE_VIEW = 

spark git commit: [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading

2016-07-06 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 521fc7186 -> 25006c8bc


[SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for 
loading

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-16249
Change visibility of Object ml.clustering.LDA to public for loading, thus users 
can invoke LDA.load("path").

## How was this patch tested?

existing ut and manually test for load ( saved with current code)

Author: Yuhao Yang 
Author: Yuhao Yang 

Closes #13941 from hhbyyh/ldapublic.

(cherry picked from commit 5497242c769b40338bfa57d64f2c64996dfa57e8)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25006c8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25006c8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25006c8b

Branch: refs/heads/branch-2.0
Commit: 25006c8bcc397c9f070cc5d685ffeb5b8fb0a341
Parents: 521fc71
Author: Yuhao Yang 
Authored: Wed Jul 6 01:30:47 2016 -0700
Committer: Yanbo Liang 
Committed: Wed Jul 6 01:31:07 2016 -0700

--
 .../main/scala/org/apache/spark/ml/clustering/LDA.scala   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/25006c8b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index b333d59..778cd0f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -880,11 +880,13 @@ class LDA @Since("1.6.0") (
   }
 }
 
-
-private[clustering] object LDA extends DefaultParamsReadable[LDA] {
+@Since("2.0.0")
+object LDA extends DefaultParamsReadable[LDA] {
 
   /** Get dataset for spark.mllib LDA */
-  def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, 
OldVector)] = {
+  private[clustering] def getOldDataset(
+   dataset: Dataset[_],
+   featuresCol: String): RDD[(Long, OldVector)] = {
 dataset
   .withColumn("docId", monotonicallyIncreasingId())
   .select("docId", featuresCol)
@@ -894,6 +896,6 @@ private[clustering] object LDA extends 
DefaultParamsReadable[LDA] {
   }
   }
 
-  @Since("1.6.0")
+  @Since("2.0.0")
   override def load(path: String): LDA = super.load(path)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for loading

2016-07-06 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 5f342049c -> 5497242c7


[SPARK-16249][ML] Change visibility of Object ml.clustering.LDA to public for 
loading

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-16249
Change visibility of Object ml.clustering.LDA to public for loading, thus users 
can invoke LDA.load("path").

## How was this patch tested?

existing ut and manually test for load ( saved with current code)

Author: Yuhao Yang 
Author: Yuhao Yang 

Closes #13941 from hhbyyh/ldapublic.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5497242c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5497242c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5497242c

Branch: refs/heads/master
Commit: 5497242c769b40338bfa57d64f2c64996dfa57e8
Parents: 5f34204
Author: Yuhao Yang 
Authored: Wed Jul 6 01:30:47 2016 -0700
Committer: Yanbo Liang 
Committed: Wed Jul 6 01:30:47 2016 -0700

--
 .../main/scala/org/apache/spark/ml/clustering/LDA.scala   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5497242c/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index b333d59..778cd0f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -880,11 +880,13 @@ class LDA @Since("1.6.0") (
   }
 }
 
-
-private[clustering] object LDA extends DefaultParamsReadable[LDA] {
+@Since("2.0.0")
+object LDA extends DefaultParamsReadable[LDA] {
 
   /** Get dataset for spark.mllib LDA */
-  def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, 
OldVector)] = {
+  private[clustering] def getOldDataset(
+   dataset: Dataset[_],
+   featuresCol: String): RDD[(Long, OldVector)] = {
 dataset
   .withColumn("docId", monotonicallyIncreasingId())
   .select("docId", featuresCol)
@@ -894,6 +896,6 @@ private[clustering] object LDA extends 
DefaultParamsReadable[LDA] {
   }
   }
 
-  @Since("1.6.0")
+  @Since("2.0.0")
   override def load(path: String): LDA = super.load(path)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost

2016-07-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6e8fa86eb -> 521fc7186


[SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost

## What changes were proposed in this pull request?

Currently, if due to some failure, the outstream gets destroyed or closed and 
later `outstream.close()` leads to IOException in such case. Due to this, the 
`stderrBuffer` does not get logged and there is no way for users to see why the 
job failed.

The change is to first display the stderr buffer and then try closing the 
outstream.

## How was this patch tested?

The correct way to test this fix would be to grep the log to see if the 
`stderrBuffer` gets logged but I dont think having test cases which do that is 
a good idea.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

…

Author: Tejas Patil 

Closes #13834 from tejasapatil/script_transform.

(cherry picked from commit 5f342049cce9102fb62b4de2d8d8fa691c2e8ac4)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/521fc718
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/521fc718
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/521fc718

Branch: refs/heads/branch-2.0
Commit: 521fc7186a2637321f7a7cfac713537de73ae66f
Parents: 6e8fa86
Author: Tejas Patil 
Authored: Wed Jul 6 09:18:04 2016 +0100
Committer: Sean Owen 
Committed: Wed Jul 6 09:18:13 2016 +0100

--
 .../spark/sql/hive/execution/ScriptTransformation.scala  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/521fc718/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 9e25e1d..dfb1251 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -312,15 +312,15 @@ private class ScriptTransformationWriterThread(
   }
   threwException = false
 } catch {
-  case NonFatal(e) =>
+  case t: Throwable =>
 // An error occurred while writing input, so kill the child process. 
According to the
 // Javadoc this call will not throw an exception:
-_exception = e
+_exception = t
 proc.destroy()
-throw e
+throw t
 } finally {
   try {
-outputStream.close()
+Utils.tryLogNonFatalError(outputStream.close())
 if (proc.waitFor() != 0) {
   logError(stderrBuffer.toString) // log the stderr circular buffer
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost

2016-07-06 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ec79183ac -> 5f342049c


[SPARK-16339][CORE] ScriptTransform does not print stderr when outstream is lost

## What changes were proposed in this pull request?

Currently, if due to some failure, the outstream gets destroyed or closed and 
later `outstream.close()` leads to IOException in such case. Due to this, the 
`stderrBuffer` does not get logged and there is no way for users to see why the 
job failed.

The change is to first display the stderr buffer and then try closing the 
outstream.

## How was this patch tested?

The correct way to test this fix would be to grep the log to see if the 
`stderrBuffer` gets logged but I dont think having test cases which do that is 
a good idea.

(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

…

Author: Tejas Patil 

Closes #13834 from tejasapatil/script_transform.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f342049
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f342049
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f342049

Branch: refs/heads/master
Commit: 5f342049cce9102fb62b4de2d8d8fa691c2e8ac4
Parents: ec79183
Author: Tejas Patil 
Authored: Wed Jul 6 09:18:04 2016 +0100
Committer: Sean Owen 
Committed: Wed Jul 6 09:18:04 2016 +0100

--
 .../spark/sql/hive/execution/ScriptTransformation.scala  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f342049/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 84990d3..d063dd6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -314,15 +314,15 @@ private class ScriptTransformationWriterThread(
   }
   threwException = false
 } catch {
-  case NonFatal(e) =>
+  case t: Throwable =>
 // An error occurred while writing input, so kill the child process. 
According to the
 // Javadoc this call will not throw an exception:
-_exception = e
+_exception = t
 proc.destroy()
-throw e
+throw t
 } finally {
   try {
-outputStream.close()
+Utils.tryLogNonFatalError(outputStream.close())
 if (proc.waitFor() != 0) {
   logError(stderrBuffer.toString) // log the stderr circular buffer
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org