spark git commit: [SPARK-19734][PYTHON][ML] Correct OneHotEncoder doc string to say dropLast

2017-03-01 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 3bd8ddf7c -> d2a879762


[SPARK-19734][PYTHON][ML] Correct OneHotEncoder doc string to say dropLast

## What changes were proposed in this pull request?
Updates the doc string to match up with the code
i.e. say dropLast instead of includeFirst

## How was this patch tested?
Not much, since it's a doc-like change. Will run unit tests via Jenkins job.

Author: Mark Grover 

Closes #17127 from markgrover/spark_19734.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2a87976
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2a87976
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2a87976

Branch: refs/heads/master
Commit: d2a879762a2b4f3c4d703cc183275af12b3c7de1
Parents: 3bd8ddf
Author: Mark Grover 
Authored: Wed Mar 1 22:57:34 2017 -0800
Committer: Yanbo Liang 
Committed: Wed Mar 1 22:57:34 2017 -0800

--
 python/pyspark/ml/feature.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2a87976/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 67c12d8..83cf763 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -1363,7 +1363,7 @@ class OneHotEncoder(JavaTransformer, HasInputCol, 
HasOutputCol, JavaMLReadable,
 @keyword_only
 def __init__(self, dropLast=True, inputCol=None, outputCol=None):
 """
-__init__(self, includeFirst=True, inputCol=None, outputCol=None)
+__init__(self, dropLast=True, inputCol=None, outputCol=None)
 """
 super(OneHotEncoder, self).__init__()
 self._java_obj = 
self._new_java_obj("org.apache.spark.ml.feature.OneHotEncoder", self.uid)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][ML] Fix comments in LSH Examples and Python API

2017-03-01 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master de2b53df4 -> 3bd8ddf7c


[MINOR][ML] Fix comments in LSH Examples and Python API

## What changes were proposed in this pull request?
Remove `org.apache.spark.examples.` in
Add slash in one of the python doc.

## How was this patch tested?
Run examples using the commands in the comments.

Author: Yun Ni 

Closes #17104 from Yunni/yunn_minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd8ddf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd8ddf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd8ddf7

Branch: refs/heads/master
Commit: 3bd8ddf7c34be35e5adeb802d6e63120f9f11713
Parents: de2b53d
Author: Yun Ni 
Authored: Wed Mar 1 22:55:13 2017 -0800
Committer: Yanbo Liang 
Committed: Wed Mar 1 22:55:13 2017 -0800

--
 .../spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java  | 2 +-
 .../java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java   | 2 +-
 .../spark/examples/ml/BucketedRandomProjectionLSHExample.scala | 2 +-
 .../scala/org/apache/spark/examples/ml/MinHashLSHExample.scala | 2 +-
 python/pyspark/ml/feature.py   | 2 +-
 5 files changed, 5 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
index 4594e34..ff917b7 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java
@@ -42,7 +42,7 @@ import static org.apache.spark.sql.functions.col;
 /**
  * An example demonstrating BucketedRandomProjectionLSH.
  * Run with:
- *   bin/run-example 
org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample
+ *   bin/run-example ml.JavaBucketedRandomProjectionLSHExample
  */
 public class JavaBucketedRandomProjectionLSHExample {
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
index 0aace46..e164598 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java
@@ -42,7 +42,7 @@ import static org.apache.spark.sql.functions.col;
 /**
  * An example demonstrating MinHashLSH.
  * Run with:
- *   bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample
+ *   bin/run-example ml.JavaMinHashLSHExample
  */
 public class JavaMinHashLSHExample {
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
index 654535c..16da4fa 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
 /**
  * An example demonstrating BucketedRandomProjectionLSH.
  * Run with:
- *   bin/run-example 
org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample
+ *   bin/run-example ml.BucketedRandomProjectionLSHExample
  */
 object BucketedRandomProjectionLSHExample {
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala
index 6c1e222..b94ab9b 100644
--- 

spark git commit: [SPARK-19583][SQL] CTAS for data source table with a created location should succeed

2017-03-01 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 89990a010 -> de2b53df4


[SPARK-19583][SQL] CTAS for data source table with a created location should 
succeed

## What changes were proposed in this pull request?

```
  spark.sql(
  s"""
 |CREATE TABLE t
 |USING parquet
 |PARTITIONED BY(a, b)
 |LOCATION '$dir'
 |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
   """.stripMargin)
```

Failed with the error message:
```
path 
file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4cgn/T/spark-195cd513-428a-4df9-b196-87db0c73e772
 already exists.;
org.apache.spark.sql.AnalysisException: path 
file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4cgn/T/spark-195cd513-428a-4df9-b196-87db0c73e772
 already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102)
```
while hive table is ok ,so we should fix it for datasource table.

The reason is that the SaveMode check is put in  
`InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use 
`path`, this is fine when we use `DataFrameWriter.save()`, because this 
situation of SaveMode act on `path`.

While when we use  `CreateDataSourceAsSelectCommand`, the situation of SaveMode 
act on table, and
we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for 
table , so we should not do SaveMode check in the following logic in 
`InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic 
for `CreateDataSourceAsSelectCommand`

After this PR, the following DDL will succeed, when the location has been 
created we will append it or overwrite it.
```
CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ...
```

## How was this patch tested?
unit test added

Author: windpiger 

Closes #16938 from windpiger/CTASDataSourceWitLocation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de2b53df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de2b53df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de2b53df

Branch: refs/heads/master
Commit: de2b53df4c779b265ae038d88f298786a9236234
Parents: 89990a0
Author: windpiger 
Authored: Wed Mar 1 22:50:25 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 1 22:50:25 2017 -0800

--
 .../command/createDataSourceTables.scala|  4 +-
 .../spark/sql/execution/command/DDLSuite.scala  | 66 ++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 99 
 3 files changed, 156 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5abd579..d835b52 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand(
   }
 
   saveDataIntoTable(
-sparkSession, table, table.storage.locationUri, query, mode, 
tableExists = true)
+sparkSession, table, table.storage.locationUri, query, 
SaveMode.Append, tableExists = true)
 } else {
   assert(table.schema.isEmpty)
 
@@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand(
 table.storage.locationUri
   }
   val result = saveDataIntoTable(
-sparkSession, table, tableLocation, query, mode, tableExists = false)
+sparkSession, table, tableLocation, query, SaveMode.Overwrite, 
tableExists = false)
   val newTable = table.copy(
 storage = table.storage.copy(locationUri = tableLocation),
 // We will use the schema of resolved.relation as the schema of the 
table (instead of

http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b44f20e..8b8cd0f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1836,18 +1836,17 @@ 

spark git commit: [SPARK-13931] Stage can hang if an executor fails while speculated tasks are running

2017-03-01 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 51be63365 -> 89990a010


[SPARK-13931] Stage can hang if an executor fails while speculated tasks are 
running

## What changes were proposed in this pull request?
When function 'executorLost' is invoked in class 'TaskSetManager', it's 
significant to judge whether variable 'isZombie' is set to true.

This pull request fixes the following hang:

1.Open speculation switch in the application.
2.Run this app and suppose last task of shuffleMapStage 1 finishes. Let's get 
the record straight, from the eyes of DAG, this stage really finishes, and from 
the eyes of TaskSetManager, variable 'isZombie' is set to true, but variable 
runningTasksSet isn't empty because of speculation.
3.Suddenly, executor 3 is lost. TaskScheduler receiving this signal, invokes 
all executorLost functions of rootPool's taskSetManagers. DAG receiving this 
signal, removes all this executor's outputLocs.
4.TaskSetManager adds all this executor's tasks to pendingTasks and tells DAG 
they will be resubmitted (Attention: possibly not on time).
5.DAG starts to submit a new waitingStage, let's say shuffleMapStage 2, and 
going to find that shuffleMapStage 1 is its missing parent because some 
outputLocs are removed due to executor lost. Then DAG submits shuffleMapStage 1 
again.
6.DAG still receives Task 'Resubmitted' signal from old taskSetManager, and 
increases the number of pendingTasks of shuffleMapStage 1 each time. However, 
old taskSetManager won't resolve new task to submit because its variable 
'isZombie' is set to true.
7.Finally shuffleMapStage 1 never finishes in DAG together with all stages 
depending on it.

## How was this patch tested?

It's quite difficult to construct test cases.

Author: GavinGavinNo1 
Author: 16092929 <16092...@cnsuning.com>

Closes #16855 from GavinGavinNo1/resolve-stage-blocked2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89990a01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89990a01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89990a01

Branch: refs/heads/master
Commit: 89990a01099b2d632b65112eb755de648aa54c16
Parents: 51be633
Author: GavinGavinNo1 
Authored: Wed Mar 1 21:40:41 2017 -0800
Committer: Kay Ousterhout 
Committed: Wed Mar 1 21:40:41 2017 -0800

--
 .../apache/spark/scheduler/TaskSetManager.scala |  3 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 64 +++-
 2 files changed, 65 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/89990a01/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index e63feb8..19ebaf8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -874,7 +874,8 @@ private[spark] class TaskSetManager(
 // and we are not using an external shuffle server which could serve the 
shuffle outputs.
 // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
 // so we would need to rerun these tasks on other executors.
-if (tasks(0).isInstanceOf[ShuffleMapTask] && 
!env.blockManager.externalShuffleServiceEnabled) {
+if (tasks(0).isInstanceOf[ShuffleMapTask] && 
!env.blockManager.externalShuffleServiceEnabled
+&& !isZombie) {
   for ((tid, info) <- taskInfos if info.executorId == execId) {
 val index = taskInfos(tid).index
 if (successful(index)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/89990a01/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d03a0c9..2c2cda9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.util.Random
+import java.util.{Properties, Random}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when}
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializerInstance
 import 

spark git commit: [SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager.

2017-03-01 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master db0ddce52 -> 51be63365


[SPARK-19777] Scan runningTasksSet when check speculatable tasks in 
TaskSetManager.

## What changes were proposed in this pull request?

When check speculatable tasks in `TaskSetManager`, only scan `runningTasksSet` 
instead of scanning all `taskInfos`.

## How was this patch tested?
Existing tests.

Author: jinxing 

Closes #17111 from jinxing64/SPARK-19777.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51be6336
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51be6336
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51be6336

Branch: refs/heads/master
Commit: 51be633657800d470de5dcebbed09e6bf08f6e2a
Parents: db0ddce
Author: jinxing 
Authored: Wed Mar 1 21:15:22 2017 -0800
Committer: Kay Ousterhout 
Committed: Wed Mar 1 21:15:22 2017 -0800

--
 .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala  | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51be6336/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3b25513..e63feb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -906,8 +906,6 @@ private[spark] class TaskSetManager(
* Check for tasks to be speculated and return true if there are any. This 
is called periodically
* by the TaskScheduler.
*
-   * TODO: To make this scale to large jobs, we need to maintain a list of 
running tasks, so that
-   * we don't scan the whole task set. It might also help to make this sorted 
by launch time.
*/
   override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
 // Can't speculate if we only have one task, and no need to speculate if 
the task set is a
@@ -927,7 +925,8 @@ private[spark] class TaskSetManager(
   // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
   // bound based on that.
   logDebug("Task length threshold for speculation: " + threshold)
-  for ((tid, info) <- taskInfos) {
+  for (tid <- runningTasksSet) {
+val info = taskInfos(tid)
 val index = info.index
 if (!successful(index) && copiesRunning(index) == 1 && 
info.timeRunning(time) > threshold &&
   !speculatableTasks.contains(index)) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19775][SQL] Remove an obsolete `partitionBy().insertInto()` test case

2017-03-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 2ff1467d6 -> db0ddce52


[SPARK-19775][SQL] Remove an obsolete `partitionBy().insertInto()` test case

## What changes were proposed in this pull request?

This issue removes [a test 
case](https://github.com/apache/spark/blame/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala#L287-L298)
 which was introduced by 
[SPARK-14459](https://github.com/apache/spark/commit/652bbb1bf62722b08a062c7a2bf72019f85e179e)
 and was superseded by 
[SPARK-16033](https://github.com/apache/spark/blame/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala#L365-L371).
 Basically, we cannot use `partitionBy` and `insertInto` together.

```scala
  test("Reject partitioning that does not match table") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
  sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY 
(part string)")
  val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" 
else "odd"))
  .toDF("id", "data", "part")

  intercept[AnalysisException] {
// cannot partition by 2 fields when there is only one in the table 
definition
data.write.partitionBy("part", "data").insertInto("partitioned")
  }
}
  }
```

## How was this patch tested?

This only removes a test case. Pass the existing Jenkins test.

Author: Dongjoon Hyun 

Closes #17106 from dongjoon-hyun/SPARK-19775.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db0ddce5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db0ddce5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db0ddce5

Branch: refs/heads/master
Commit: db0ddce523bb823cba996e92ef36ceca31492d2c
Parents: 2ff1467
Author: Dongjoon Hyun 
Authored: Thu Mar 2 00:45:59 2017 +0100
Committer: Sean Owen 
Committed: Thu Mar 2 00:45:59 2017 +0100

--
 .../spark/sql/hive/InsertIntoHiveTableSuite.scala  | 13 -
 1 file changed, 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/db0ddce5/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 71ce5a7..d6999af 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -284,19 +284,6 @@ class InsertIntoHiveTableSuite extends QueryTest with 
TestHiveSingleton with Bef
 sql("DROP TABLE hiveTableWithStructValue")
   }
 
-  test("Reject partitioning that does not match table") {
-withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
-  sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY 
(part string)")
-  val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" 
else "odd"))
-  .toDF("id", "data", "part")
-
-  intercept[AnalysisException] {
-// cannot partition by 2 fields when there is only one in the table 
definition
-data.write.partitionBy("part", "data").insertInto("partitioned")
-  }
-}
-  }
-
   test("Test partition mode = strict") {
 withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
   sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY 
(part string)")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …

2017-03-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 bbe0d8caa -> 27347b5f2


[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …

…on registered cores rather than accepted cores

See JIRA

Unit tests, Mesos/Spark integration tests

cc skonto susanxhuynh

Author: Michael Gummelt 

Closes #17045 from mgummelt/SPARK-19373-registered-resources.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Michael Gummelt 

Closes #17129 from mgummelt/SPARK-19373-registered-resources-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27347b5f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27347b5f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27347b5f

Branch: refs/heads/branch-2.1
Commit: 27347b5f26f668783d8ded89149a5e761b67f786
Parents: bbe0d8c
Author: Michael Gummelt 
Authored: Thu Mar 2 00:32:32 2017 +0100
Committer: Sean Owen 
Committed: Thu Mar 2 00:32:32 2017 +0100

--
 .../MesosCoarseGrainedSchedulerBackend.scala|  27 +++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 111 +--
 2 files changed, 70 insertions(+), 68 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5063c1f..22df2b1 100644
--- 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   with org.apache.mesos.Scheduler
   with MesosSchedulerUtils {
 
-  val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+  // Blacklist a slave after this many failures
+  private val MAX_SLAVE_FAILURES = 2
 
-  // Maximum number of cores to acquire (TODO: we'll need more flexible 
controls here)
-  val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
+  private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
 
-  val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", 
false)
+  // Maximum number of cores to acquire
+  private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
 
-  val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+  private val useFetcherCache = 
conf.getBoolean("spark.mesos.fetcherCache.enable", false)
+
+  private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
 
   private[this] val shutdownTimeoutMS =
 conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
@@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
 
   // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new mutable.HashMap[String, Int]
-  val gpusByTaskId = new mutable.HashMap[String, Int]
-  var totalCoresAcquired = 0
-  var totalGpusAcquired = 0
+  private val coresByTaskId = new mutable.HashMap[String, Int]
+  private val gpusByTaskId = new mutable.HashMap[String, Int]
+  private var totalCoresAcquired = 0
+  private var totalGpusAcquired = 0
 
   // SlaveID -> Slave
   // This map accumulates entries for the duration of the job.  Slaves are 
never deleted, because
@@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // may lead to deadlocks since the superclass might also try to lock
   private val stateLock = new ReentrantLock
 
-  val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
+  private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
 
   // Offer constraints
   private val slaveOfferConstraints =
@@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   securityManager.isSaslEncryptionEnabled())
   }
 
-  var nextMesosTaskId = 0
+  private var nextMesosTaskId = 0
 
   @volatile var appId: String = _
 
@@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
  

spark git commit: [DOC][MINOR][SPARKR] Update SparkR doc for names, columns and colnames

2017-03-01 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 417140e44 -> 2ff1467d6


[DOC][MINOR][SPARKR] Update SparkR doc for names, columns and colnames

Update R doc:
1. columns, names and colnames returns a vector of strings, not **list** as in 
current doc.
2. `colnames<-` does allow the subset assignment, so the length of `value` can 
be less than the number of columns, e.g., `colnames(df)[1] <- "a"`.

felixcheung

Author: actuaryzhang 

Closes #17115 from actuaryzhang/sparkRMinorDoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ff1467d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ff1467d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ff1467d

Branch: refs/heads/master
Commit: 2ff1467d676c9671da231db86bdc8e09c7450f80
Parents: 417140e
Author: actuaryzhang 
Authored: Wed Mar 1 12:35:56 2017 -0800
Committer: Felix Cheung 
Committed: Wed Mar 1 12:35:56 2017 -0800

--
 R/pkg/R/DataFrame.R   | 4 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 6 ++
 2 files changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ff1467d/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index cc4cfa3..e33d0d8 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -280,7 +280,7 @@ setMethod("dtypes",
 
 #' Column Names of SparkDataFrame
 #'
-#' Return all column names as a list.
+#' Return a vector of column names.
 #'
 #' @param x a SparkDataFrame.
 #'
@@ -338,7 +338,7 @@ setMethod("colnames",
   })
 
 #' @param value a character vector. Must have the same length as the number
-#'  of columns in the SparkDataFrame.
+#'  of columns to be renamed.
 #' @rdname columns
 #' @aliases colnames<-,SparkDataFrame-method
 #' @name colnames<-

http://git-wip-us.apache.org/repos/asf/spark/blob/2ff1467d/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index ce0f5a1..1dd8c5c 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -898,6 +898,12 @@ test_that("names() colnames() set the column names", {
   expect_equal(names(z)[3], "c")
   names(z)[3] <- "c2"
   expect_equal(names(z)[3], "c2")
+
+  # Test subset assignment
+  colnames(df)[1] <- "col5"
+  expect_equal(colnames(df)[1], "col5")
+  names(df)[2] <- "col6"
+  expect_equal(names(df)[2], "col6")
 })
 
 test_that("head() and first() return the correct data", {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19787][ML] Changing the default parameter of regParam.

2017-03-01 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master 8aa560b75 -> 417140e44


[SPARK-19787][ML] Changing the default parameter of regParam.

## What changes were proposed in this pull request?

In the ALS method the default values of regParam do not match within the same 
file (lines 
[224](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L224)
 and 
[714](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L714)).
 In one place we set it to 1.0 and in the other to 0.1.

I changed the one of train() method to 0.1 and now it matches the default value 
which is visible to Spark users. The method is marked with DeveloperApi so it 
should not affect the users. Whenever we use the particular method we provide 
all parameters, so the default does not matter. Only exception is the 
unit-tests on ALSSuite but the change does not break them.

Note: This PR should get the award of the laziest commit in Spark history. 
Originally I wanted to correct this on another PR but MLnick 
[suggested](https://github.com/apache/spark/pull/17059#issuecomment-28572) 
to create a separate PR & ticket. If you think this change is too 
insignificant/minor, you are probably right, so feel free to reject and close 
this. :)

## How was this patch tested?

Unit-tests

Author: Vasilis Vryniotis 

Closes #17121 from datumbox/als_regparam.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/417140e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/417140e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/417140e4

Branch: refs/heads/master
Commit: 417140e441505f20eb5bd4943ce216c3ec6adc10
Parents: 8aa560b
Author: Vasilis Vryniotis 
Authored: Wed Mar 1 20:55:17 2017 +0200
Committer: Nick Pentreath 
Committed: Wed Mar 1 20:55:17 2017 +0200

--
 mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/417140e4/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index af00762..04273a4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -711,7 +711,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
   numUserBlocks: Int = 10,
   numItemBlocks: Int = 10,
   maxIter: Int = 10,
-  regParam: Double = 1.0,
+  regParam: Double = 0.1,
   implicitPrefs: Boolean = false,
   alpha: Double = 1.0,
   nonnegative: Boolean = false,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed

2017-03-01 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 5502a9cf8 -> 8aa560b75


[SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set 
PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed

## What changes were proposed in this pull request?

If we create a InMemoryFileIndex with an empty rootPaths when set 
PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an  exception:

```
Positive number of slices required
java.lang.IllegalArgumentException: Positive number of slices required
at 
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
at 
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357)
at 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74)
at 
org.apache.spark.sql.execution.datasources.InMemoryFileIndex.(InMemoryFileIndex.scala:50)
at 
org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186)
at 
org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105)
at 
org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33)
at 
org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185)
at 
org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at 
org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```

## How was this patch tested?
unit test added

Author: windpiger 

Closes #17093 from windpiger/fixEmptiPathInBulkListFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa560b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa560b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa560b7

Branch: refs/heads/master
Commit: 8aa560b75e6b083b2a890c52301414285ba35c3d
Parents: 5502a9c
Author: windpiger 
Authored: Wed Mar 1 08:16:29 2017 -0800
Committer: Xiao Li 
Committed: Wed Mar 1 08:16:29 2017 -0800

--
 .../datasources/PartitioningAwareFileIndex.scala|  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 --
 .../sql/execution/datasources/FileIndexSuite.scala  | 16 
 3 files changed, 21 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8aa560b7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 

spark git commit: [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule

2017-03-01 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 f719cccdc -> bbe0d8caa


[SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by 
FoldablePropagation rule

## What changes were proposed in this pull request?
This PR fixes the code in Optimizer phase where the constant alias columns of a 
`INNER JOIN` query are folded in Rule `FoldablePropagation`.

For the following query():

```
val sqlA =
  """
|create temporary view ta as
|select a, 'a' as tag from t1 union all
|select a, 'b' as tag from t2
  """.stripMargin

val sqlB =
  """
|create temporary view tb as
|select a, 'a' as tag from t3 union all
|select a, 'b' as tag from t4
  """.stripMargin

val sql =
  """
|select tb.* from ta inner join tb on
|ta.a = tb.a and
|ta.tag = tb.tag
  """.stripMargin
```

The tag column is an constant alias column, it's folded by 
`FoldablePropagation` like this:

```
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 
===
 Project [a#4, tag#14]  Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) 
&& (a = a))
:- Union   :- Union
:  :- Project [a#0, a AS tag#8]:  :- Project [a#0, a AS 
tag#8]
:  :  +- LocalRelation [a#0]   :  :  +- LocalRelation 
[a#0]
:  +- Project [a#2, b AS tag#9]:  +- Project [a#2, b AS 
tag#9]
: +- LocalRelation [a#2]   : +- LocalRelation 
[a#2]
+- Union   +- Union
   :- Project [a#4, a AS tag#14]  :- Project [a#4, a AS 
tag#14]
   :  +- LocalRelation [a#4]  :  +- LocalRelation 
[a#4]
   +- Project [a#6, b AS tag#15]  +- Project [a#6, b AS 
tag#15]
  +- LocalRelation [a#6] +- LocalRelation 
[a#6]
```

Finally the Result of Batch Operator Optimizations is:

```
Project [a#4, tag#14]  Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
!   :- SubqueryAlias ta, `ta`  :- Union
!   :  +- Union:  :- LocalRelation [a#0]
!   : :- Project [a#0, a AS tag#8] :  +- LocalRelation [a#2]
!   : :  +- SubqueryAlias t1, `t1` +- Union
!   : : +- Project [a#0]  :- LocalRelation 
[a#4, tag#14]
!   : :+- SubqueryAlias grouping  +- LocalRelation 
[a#6, tag#15]
!   : :   +- LocalRelation [a#0]
!   : +- Project [a#2, b AS tag#9]
!   :+- SubqueryAlias t2, `t2`
!   :   +- Project [a#2]
!   :  +- SubqueryAlias grouping
!   : +- LocalRelation [a#2]
!   +- SubqueryAlias tb, `tb`
!  +- Union
! :- Project [a#4, a AS tag#14]
! :  +- SubqueryAlias t3, `t3`
! : +- Project [a#4]
! :+- SubqueryAlias grouping
! :   +- LocalRelation [a#4]
! +- Project [a#6, b AS tag#15]
!+- SubqueryAlias t4, `t4`
!   +- Project [a#6]
!  +- SubqueryAlias grouping
! +- LocalRelation [a#6]
```

The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to 
the data of inner join being wrong.

After fix:

```
=== Result of Batch LocalRelation ===
 GlobalLimit 21   GlobalLimit 21
 +- LocalLimit 21 +- LocalLimit 21
+- Project [a#4, tag#11] +- Project [a#4, 
tag#11]
   +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, 
((a#0 = a#4) && (tag#8 = tag#11))
! :- SubqueryAlias ta  :- Union
! :  +- Union  :  :- 
LocalRelation [a#0, tag#8]
! : :- Project [a#0, a AS tag#8]   :  +- 
LocalRelation [a#2, tag#9]
! : :  +- SubqueryAlias t1 +- Union
! : : +- Project [a#0]:- 
LocalRelation [a#4, tag#11]
! : :+- SubqueryAlias grouping+- 
LocalRelation [a#6, tag#12]
! : :   +- LocalRelation [a#0]
! : +- Project [a#2, b AS tag#9]
! :+- SubqueryAlias t2
! :   +- Project [a#2]
! :  +- SubqueryAlias grouping
! : +- LocalRelation [a#2]
! +- SubqueryAlias tb
!+- Union
!   :- Project [a#4, a AS tag#11]
!   :  +- SubqueryAlias t3
!   : +- Project [a#4]
!   :+- SubqueryAlias grouping
!   

spark git commit: [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule

2017-03-01 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 38e783534 -> 5502a9cf8


[SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by 
FoldablePropagation rule

## What changes were proposed in this pull request?
This PR fixes the code in Optimizer phase where the constant alias columns of a 
`INNER JOIN` query are folded in Rule `FoldablePropagation`.

For the following query():

```
val sqlA =
  """
|create temporary view ta as
|select a, 'a' as tag from t1 union all
|select a, 'b' as tag from t2
  """.stripMargin

val sqlB =
  """
|create temporary view tb as
|select a, 'a' as tag from t3 union all
|select a, 'b' as tag from t4
  """.stripMargin

val sql =
  """
|select tb.* from ta inner join tb on
|ta.a = tb.a and
|ta.tag = tb.tag
  """.stripMargin
```

The tag column is an constant alias column, it's folded by 
`FoldablePropagation` like this:

```
TRACE SparkOptimizer:
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 
===
 Project [a#4, tag#14]  Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, ((a#0 = a#4) 
&& (a = a))
:- Union   :- Union
:  :- Project [a#0, a AS tag#8]:  :- Project [a#0, a AS 
tag#8]
:  :  +- LocalRelation [a#0]   :  :  +- LocalRelation 
[a#0]
:  +- Project [a#2, b AS tag#9]:  +- Project [a#2, b AS 
tag#9]
: +- LocalRelation [a#2]   : +- LocalRelation 
[a#2]
+- Union   +- Union
   :- Project [a#4, a AS tag#14]  :- Project [a#4, a AS 
tag#14]
   :  +- LocalRelation [a#4]  :  +- LocalRelation 
[a#4]
   +- Project [a#6, b AS tag#15]  +- Project [a#6, b AS 
tag#15]
  +- LocalRelation [a#6] +- LocalRelation 
[a#6]
```

Finally the Result of Batch Operator Optimizations is:

```
Project [a#4, tag#14]  Project [a#4, tag#14]
!+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14))   +- Join Inner, (a#0 = a#4)
!   :- SubqueryAlias ta, `ta`  :- Union
!   :  +- Union:  :- LocalRelation [a#0]
!   : :- Project [a#0, a AS tag#8] :  +- LocalRelation [a#2]
!   : :  +- SubqueryAlias t1, `t1` +- Union
!   : : +- Project [a#0]  :- LocalRelation 
[a#4, tag#14]
!   : :+- SubqueryAlias grouping  +- LocalRelation 
[a#6, tag#15]
!   : :   +- LocalRelation [a#0]
!   : +- Project [a#2, b AS tag#9]
!   :+- SubqueryAlias t2, `t2`
!   :   +- Project [a#2]
!   :  +- SubqueryAlias grouping
!   : +- LocalRelation [a#2]
!   +- SubqueryAlias tb, `tb`
!  +- Union
! :- Project [a#4, a AS tag#14]
! :  +- SubqueryAlias t3, `t3`
! : +- Project [a#4]
! :+- SubqueryAlias grouping
! :   +- LocalRelation [a#4]
! +- Project [a#6, b AS tag#15]
!+- SubqueryAlias t4, `t4`
!   +- Project [a#6]
!  +- SubqueryAlias grouping
! +- LocalRelation [a#6]
```

The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to 
the data of inner join being wrong.

After fix:

```
=== Result of Batch LocalRelation ===
 GlobalLimit 21   GlobalLimit 21
 +- LocalLimit 21 +- LocalLimit 21
+- Project [a#4, tag#11] +- Project [a#4, 
tag#11]
   +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, 
((a#0 = a#4) && (tag#8 = tag#11))
! :- SubqueryAlias ta  :- Union
! :  +- Union  :  :- 
LocalRelation [a#0, tag#8]
! : :- Project [a#0, a AS tag#8]   :  +- 
LocalRelation [a#2, tag#9]
! : :  +- SubqueryAlias t1 +- Union
! : : +- Project [a#0]:- 
LocalRelation [a#4, tag#11]
! : :+- SubqueryAlias grouping+- 
LocalRelation [a#6, tag#12]
! : :   +- LocalRelation [a#0]
! : +- Project [a#2, b AS tag#9]
! :+- SubqueryAlias t2
! :   +- Project [a#2]
! :  +- SubqueryAlias grouping
! : +- LocalRelation [a#2]
! +- SubqueryAlias tb
!+- Union
!   :- Project [a#4, a AS tag#11]
!   :  +- SubqueryAlias t3
!   : +- Project [a#4]
!   :+- SubqueryAlias grouping
!   

spark git commit: [SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path

2017-03-01 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 4913c92c2 -> 38e783534


[SPARK-19736][SQL] refreshByPath should clear all cached plans with the 
specified path

## What changes were proposed in this pull request?

`Catalog.refreshByPath` can refresh the cache entry and the associated metadata 
for all dataframes (if any), that contain the given data source path.

However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans 
with the specified path. It causes some strange behaviors reported in 
SPARK-15678.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17064 from viirya/fix-refreshByPath.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e78353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e78353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e78353

Branch: refs/heads/master
Commit: 38e7835347a2e1803b1df5e73cf8b749951b11b2
Parents: 4913c92
Author: Liang-Chi Hsieh 
Authored: Wed Mar 1 00:19:57 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 1 00:19:57 2017 -0800

--
 .../spark/sql/execution/CacheManager.scala   | 19 ++-
 .../org/apache/spark/sql/CachedTableSuite.scala  | 16 
 2 files changed, 26 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 4ca1347..8013851 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -168,15 +168,16 @@ class CacheManager extends Logging {
   (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
 }
 
-cachedData.foreach {
-  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
-val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
-if (dataIndex >= 0) {
-  data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = 
true)
-  cachedData.remove(dataIndex)
-}
-
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
-  case _ => // Do Nothing
+cachedData.filter {
+  case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined => true
+  case _ => false
+}.foreach { data =>
+  val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
+  if (dataIndex >= 0) {
+data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = 
true)
+cachedData.remove(dataIndex)
+  }
+  
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1af1a36..2a0e088 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   assert(getNumInMemoryRelations(cachedPlan2) == 4)
 }
   }
+
+  test("refreshByPath should refresh all cached plans with the specified 
path") {
+withTempDir { dir =>
+  val path = dir.getCanonicalPath()
+
+  spark.range(10).write.mode("overwrite").parquet(path)
+  spark.read.parquet(path).cache()
+  spark.read.parquet(path).filter($"id" > 4).cache()
+  assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)
+
+  spark.range(20).write.mode("overwrite").parquet(path)
+  spark.catalog.refreshByPath(path)
+  assert(spark.read.parquet(path).count() == 20)
+  assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org