spark git commit: [SPARK-19734][PYTHON][ML] Correct OneHotEncoder doc string to say dropLast
Repository: spark Updated Branches: refs/heads/master 3bd8ddf7c -> d2a879762 [SPARK-19734][PYTHON][ML] Correct OneHotEncoder doc string to say dropLast ## What changes were proposed in this pull request? Updates the doc string to match up with the code i.e. say dropLast instead of includeFirst ## How was this patch tested? Not much, since it's a doc-like change. Will run unit tests via Jenkins job. Author: Mark GroverCloses #17127 from markgrover/spark_19734. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2a87976 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2a87976 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2a87976 Branch: refs/heads/master Commit: d2a879762a2b4f3c4d703cc183275af12b3c7de1 Parents: 3bd8ddf Author: Mark Grover Authored: Wed Mar 1 22:57:34 2017 -0800 Committer: Yanbo Liang Committed: Wed Mar 1 22:57:34 2017 -0800 -- python/pyspark/ml/feature.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2a87976/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 67c12d8..83cf763 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -1363,7 +1363,7 @@ class OneHotEncoder(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, @keyword_only def __init__(self, dropLast=True, inputCol=None, outputCol=None): """ -__init__(self, includeFirst=True, inputCol=None, outputCol=None) +__init__(self, dropLast=True, inputCol=None, outputCol=None) """ super(OneHotEncoder, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.OneHotEncoder", self.uid) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][ML] Fix comments in LSH Examples and Python API
Repository: spark Updated Branches: refs/heads/master de2b53df4 -> 3bd8ddf7c [MINOR][ML] Fix comments in LSH Examples and Python API ## What changes were proposed in this pull request? Remove `org.apache.spark.examples.` in Add slash in one of the python doc. ## How was this patch tested? Run examples using the commands in the comments. Author: Yun NiCloses #17104 from Yunni/yunn_minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd8ddf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd8ddf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd8ddf7 Branch: refs/heads/master Commit: 3bd8ddf7c34be35e5adeb802d6e63120f9f11713 Parents: de2b53d Author: Yun Ni Authored: Wed Mar 1 22:55:13 2017 -0800 Committer: Yanbo Liang Committed: Wed Mar 1 22:55:13 2017 -0800 -- .../spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java | 2 +- .../java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java | 2 +- .../spark/examples/ml/BucketedRandomProjectionLSHExample.scala | 2 +- .../scala/org/apache/spark/examples/ml/MinHashLSHExample.scala | 2 +- python/pyspark/ml/feature.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java index 4594e34..ff917b7 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java @@ -42,7 +42,7 @@ import static org.apache.spark.sql.functions.col; /** * An example demonstrating BucketedRandomProjectionLSH. * Run with: - * bin/run-example org.apache.spark.examples.ml.JavaBucketedRandomProjectionLSHExample + * bin/run-example ml.JavaBucketedRandomProjectionLSHExample */ public class JavaBucketedRandomProjectionLSHExample { public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java -- diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java index 0aace46..e164598 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java @@ -42,7 +42,7 @@ import static org.apache.spark.sql.functions.col; /** * An example demonstrating MinHashLSH. * Run with: - * bin/run-example org.apache.spark.examples.ml.JavaMinHashLSHExample + * bin/run-example ml.JavaMinHashLSHExample */ public class JavaMinHashLSHExample { public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala index 654535c..16da4fa 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession /** * An example demonstrating BucketedRandomProjectionLSH. * Run with: - * bin/run-example org.apache.spark.examples.ml.BucketedRandomProjectionLSHExample + * bin/run-example ml.BucketedRandomProjectionLSHExample */ object BucketedRandomProjectionLSHExample { def main(args: Array[String]): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/3bd8ddf7/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala index 6c1e222..b94ab9b 100644 ---
spark git commit: [SPARK-19583][SQL] CTAS for data source table with a created location should succeed
Repository: spark Updated Branches: refs/heads/master 89990a010 -> de2b53df4 [SPARK-19583][SQL] CTAS for data source table with a created location should succeed ## What changes were proposed in this pull request? ``` spark.sql( s""" |CREATE TABLE t |USING parquet |PARTITIONED BY(a, b) |LOCATION '$dir' |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d """.stripMargin) ``` Failed with the error message: ``` path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4cgn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; org.apache.spark.sql.AnalysisException: path file:/private/var/folders/6r/15tqm8hn3ldb3rmbfqm1gf4cgn/T/spark-195cd513-428a-4df9-b196-87db0c73e772 already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:102) ``` while hive table is ok ,so we should fix it for datasource table. The reason is that the SaveMode check is put in `InsertIntoHadoopFsRelationCommand` , and the SaveMode check actually use `path`, this is fine when we use `DataFrameWriter.save()`, because this situation of SaveMode act on `path`. While when we use `CreateDataSourceAsSelectCommand`, the situation of SaveMode act on table, and we have already do SaveMode check in `CreateDataSourceAsSelectCommand` for table , so we should not do SaveMode check in the following logic in `InsertIntoHadoopFsRelationCommand` for path, this is redundant and wrong logic for `CreateDataSourceAsSelectCommand` After this PR, the following DDL will succeed, when the location has been created we will append it or overwrite it. ``` CREATE TABLE ... (PARTITIONED BY ...) LOCATION path AS SELECT ... ``` ## How was this patch tested? unit test added Author: windpigerCloses #16938 from windpiger/CTASDataSourceWitLocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de2b53df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de2b53df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de2b53df Branch: refs/heads/master Commit: de2b53df4c779b265ae038d88f298786a9236234 Parents: 89990a0 Author: windpiger Authored: Wed Mar 1 22:50:25 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 1 22:50:25 2017 -0800 -- .../command/createDataSourceTables.scala| 4 +- .../spark/sql/execution/command/DDLSuite.scala | 66 ++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 99 3 files changed, 156 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5abd579..d835b52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -141,7 +141,7 @@ case class CreateDataSourceTableAsSelectCommand( } saveDataIntoTable( -sparkSession, table, table.storage.locationUri, query, mode, tableExists = true) +sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true) } else { assert(table.schema.isEmpty) @@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( -sparkSession, table, tableLocation, query, mode, tableExists = false) +sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of http://git-wip-us.apache.org/repos/asf/spark/blob/de2b53df/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b44f20e..8b8cd0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1836,18 +1836,17 @@
spark git commit: [SPARK-13931] Stage can hang if an executor fails while speculated tasks are running
Repository: spark Updated Branches: refs/heads/master 51be63365 -> 89990a010 [SPARK-13931] Stage can hang if an executor fails while speculated tasks are running ## What changes were proposed in this pull request? When function 'executorLost' is invoked in class 'TaskSetManager', it's significant to judge whether variable 'isZombie' is set to true. This pull request fixes the following hang: 1.Open speculation switch in the application. 2.Run this app and suppose last task of shuffleMapStage 1 finishes. Let's get the record straight, from the eyes of DAG, this stage really finishes, and from the eyes of TaskSetManager, variable 'isZombie' is set to true, but variable runningTasksSet isn't empty because of speculation. 3.Suddenly, executor 3 is lost. TaskScheduler receiving this signal, invokes all executorLost functions of rootPool's taskSetManagers. DAG receiving this signal, removes all this executor's outputLocs. 4.TaskSetManager adds all this executor's tasks to pendingTasks and tells DAG they will be resubmitted (Attention: possibly not on time). 5.DAG starts to submit a new waitingStage, let's say shuffleMapStage 2, and going to find that shuffleMapStage 1 is its missing parent because some outputLocs are removed due to executor lost. Then DAG submits shuffleMapStage 1 again. 6.DAG still receives Task 'Resubmitted' signal from old taskSetManager, and increases the number of pendingTasks of shuffleMapStage 1 each time. However, old taskSetManager won't resolve new task to submit because its variable 'isZombie' is set to true. 7.Finally shuffleMapStage 1 never finishes in DAG together with all stages depending on it. ## How was this patch tested? It's quite difficult to construct test cases. Author: GavinGavinNo1Author: 16092929 <16092...@cnsuning.com> Closes #16855 from GavinGavinNo1/resolve-stage-blocked2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89990a01 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89990a01 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89990a01 Branch: refs/heads/master Commit: 89990a01099b2d632b65112eb755de648aa54c16 Parents: 51be633 Author: GavinGavinNo1 Authored: Wed Mar 1 21:40:41 2017 -0800 Committer: Kay Ousterhout Committed: Wed Mar 1 21:40:41 2017 -0800 -- .../apache/spark/scheduler/TaskSetManager.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 64 +++- 2 files changed, 65 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/89990a01/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e63feb8..19ebaf8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -874,7 +874,8 @@ private[spark] class TaskSetManager( // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. -if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) { +if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled +&& !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (successful(index)) { http://git-wip-us.apache.org/repos/asf/spark/blob/89990a01/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d03a0c9..2c2cda9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.util.Random +import java.util.{Properties, Random} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when} import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.internal.Logging +import org.apache.spark.serializer.SerializerInstance import
spark git commit: [SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager.
Repository: spark Updated Branches: refs/heads/master db0ddce52 -> 51be63365 [SPARK-19777] Scan runningTasksSet when check speculatable tasks in TaskSetManager. ## What changes were proposed in this pull request? When check speculatable tasks in `TaskSetManager`, only scan `runningTasksSet` instead of scanning all `taskInfos`. ## How was this patch tested? Existing tests. Author: jinxingCloses #17111 from jinxing64/SPARK-19777. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51be6336 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51be6336 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51be6336 Branch: refs/heads/master Commit: 51be633657800d470de5dcebbed09e6bf08f6e2a Parents: db0ddce Author: jinxing Authored: Wed Mar 1 21:15:22 2017 -0800 Committer: Kay Ousterhout Committed: Wed Mar 1 21:15:22 2017 -0800 -- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51be6336/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513..e63feb8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -906,8 +906,6 @@ private[spark] class TaskSetManager( * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * - * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that - * we don't scan the whole task set. It might also help to make this sorted by launch time. */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { // Can't speculate if we only have one task, and no need to speculate if the task set is a @@ -927,7 +925,8 @@ private[spark] class TaskSetManager( // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) - for ((tid, info) <- taskInfos) { + for (tid <- runningTasksSet) { +val info = taskInfos(tid) val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19775][SQL] Remove an obsolete `partitionBy().insertInto()` test case
Repository: spark Updated Branches: refs/heads/master 2ff1467d6 -> db0ddce52 [SPARK-19775][SQL] Remove an obsolete `partitionBy().insertInto()` test case ## What changes were proposed in this pull request? This issue removes [a test case](https://github.com/apache/spark/blame/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala#L287-L298) which was introduced by [SPARK-14459](https://github.com/apache/spark/commit/652bbb1bf62722b08a062c7a2bf72019f85e179e) and was superseded by [SPARK-16033](https://github.com/apache/spark/blame/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala#L365-L371). Basically, we cannot use `partitionBy` and `insertInto` together. ```scala test("Reject partitioning that does not match table") { withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) .toDF("id", "data", "part") intercept[AnalysisException] { // cannot partition by 2 fields when there is only one in the table definition data.write.partitionBy("part", "data").insertInto("partitioned") } } } ``` ## How was this patch tested? This only removes a test case. Pass the existing Jenkins test. Author: Dongjoon HyunCloses #17106 from dongjoon-hyun/SPARK-19775. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db0ddce5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db0ddce5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db0ddce5 Branch: refs/heads/master Commit: db0ddce523bb823cba996e92ef36ceca31492d2c Parents: 2ff1467 Author: Dongjoon Hyun Authored: Thu Mar 2 00:45:59 2017 +0100 Committer: Sean Owen Committed: Thu Mar 2 00:45:59 2017 +0100 -- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 13 - 1 file changed, 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/db0ddce5/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 71ce5a7..d6999af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -284,19 +284,6 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef sql("DROP TABLE hiveTableWithStructValue") } - test("Reject partitioning that does not match table") { -withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { - sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")) - .toDF("id", "data", "part") - - intercept[AnalysisException] { -// cannot partition by 2 fields when there is only one in the table definition -data.write.partitionBy("part", "data").insertInto("partitioned") - } -} - } - test("Test partition mode = strict") { withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) { sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …
Repository: spark Updated Branches: refs/heads/branch-2.1 bbe0d8caa -> 27347b5f2 [SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio ⦠â¦on registered cores rather than accepted cores See JIRA Unit tests, Mesos/Spark integration tests cc skonto susanxhuynh Author: Michael Gummelt Closes #17045 from mgummelt/SPARK-19373-registered-resources. ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Michael GummeltCloses #17129 from mgummelt/SPARK-19373-registered-resources-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27347b5f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27347b5f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27347b5f Branch: refs/heads/branch-2.1 Commit: 27347b5f26f668783d8ded89149a5e761b67f786 Parents: bbe0d8c Author: Michael Gummelt Authored: Thu Mar 2 00:32:32 2017 +0100 Committer: Sean Owen Committed: Thu Mar 2 00:32:32 2017 +0100 -- .../MesosCoarseGrainedSchedulerBackend.scala| 27 +++-- ...esosCoarseGrainedSchedulerBackendSuite.scala | 111 +-- 2 files changed, 70 insertions(+), 68 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27347b5f/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala -- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5063c1f..22df2b1 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( with org.apache.mesos.Scheduler with MesosSchedulerUtils { - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + // Blacklist a slave after this many failures + private val MAX_SLAVE_FAILURES = 2 - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) - val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + // Maximum number of cores to acquire + private val maxCores = maxCoresOption.getOrElse(Int.MaxValue) - val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + + private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") @@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new mutable.HashMap[String, Int] - val gpusByTaskId = new mutable.HashMap[String, Int] - var totalCoresAcquired = 0 - var totalGpusAcquired = 0 + private val coresByTaskId = new mutable.HashMap[String, Int] + private val gpusByTaskId = new mutable.HashMap[String, Int] + private var totalCoresAcquired = 0 + private var totalGpusAcquired = 0 // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because @@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock - val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) + private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) // Offer constraints private val slaveOfferConstraints = @@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( securityManager.isSaslEncryptionEnabled()) } - var nextMesosTaskId = 0 + private var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( }
spark git commit: [DOC][MINOR][SPARKR] Update SparkR doc for names, columns and colnames
Repository: spark Updated Branches: refs/heads/master 417140e44 -> 2ff1467d6 [DOC][MINOR][SPARKR] Update SparkR doc for names, columns and colnames Update R doc: 1. columns, names and colnames returns a vector of strings, not **list** as in current doc. 2. `colnames<-` does allow the subset assignment, so the length of `value` can be less than the number of columns, e.g., `colnames(df)[1] <- "a"`. felixcheung Author: actuaryzhangCloses #17115 from actuaryzhang/sparkRMinorDoc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ff1467d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ff1467d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ff1467d Branch: refs/heads/master Commit: 2ff1467d676c9671da231db86bdc8e09c7450f80 Parents: 417140e Author: actuaryzhang Authored: Wed Mar 1 12:35:56 2017 -0800 Committer: Felix Cheung Committed: Wed Mar 1 12:35:56 2017 -0800 -- R/pkg/R/DataFrame.R | 4 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 6 ++ 2 files changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ff1467d/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index cc4cfa3..e33d0d8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -280,7 +280,7 @@ setMethod("dtypes", #' Column Names of SparkDataFrame #' -#' Return all column names as a list. +#' Return a vector of column names. #' #' @param x a SparkDataFrame. #' @@ -338,7 +338,7 @@ setMethod("colnames", }) #' @param value a character vector. Must have the same length as the number -#' of columns in the SparkDataFrame. +#' of columns to be renamed. #' @rdname columns #' @aliases colnames<-,SparkDataFrame-method #' @name colnames<- http://git-wip-us.apache.org/repos/asf/spark/blob/2ff1467d/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ce0f5a1..1dd8c5c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -898,6 +898,12 @@ test_that("names() colnames() set the column names", { expect_equal(names(z)[3], "c") names(z)[3] <- "c2" expect_equal(names(z)[3], "c2") + + # Test subset assignment + colnames(df)[1] <- "col5" + expect_equal(colnames(df)[1], "col5") + names(df)[2] <- "col6" + expect_equal(names(df)[2], "col6") }) test_that("head() and first() return the correct data", { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19787][ML] Changing the default parameter of regParam.
Repository: spark Updated Branches: refs/heads/master 8aa560b75 -> 417140e44 [SPARK-19787][ML] Changing the default parameter of regParam. ## What changes were proposed in this pull request? In the ALS method the default values of regParam do not match within the same file (lines [224](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L224) and [714](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L714)). In one place we set it to 1.0 and in the other to 0.1. I changed the one of train() method to 0.1 and now it matches the default value which is visible to Spark users. The method is marked with DeveloperApi so it should not affect the users. Whenever we use the particular method we provide all parameters, so the default does not matter. Only exception is the unit-tests on ALSSuite but the change does not break them. Note: This PR should get the award of the laziest commit in Spark history. Originally I wanted to correct this on another PR but MLnick [suggested](https://github.com/apache/spark/pull/17059#issuecomment-28572) to create a separate PR & ticket. If you think this change is too insignificant/minor, you are probably right, so feel free to reject and close this. :) ## How was this patch tested? Unit-tests Author: Vasilis VryniotisCloses #17121 from datumbox/als_regparam. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/417140e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/417140e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/417140e4 Branch: refs/heads/master Commit: 417140e441505f20eb5bd4943ce216c3ec6adc10 Parents: 8aa560b Author: Vasilis Vryniotis Authored: Wed Mar 1 20:55:17 2017 +0200 Committer: Nick Pentreath Committed: Wed Mar 1 20:55:17 2017 +0200 -- mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/417140e4/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index af00762..04273a4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -711,7 +711,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { numUserBlocks: Int = 10, numItemBlocks: Int = 10, maxIter: Int = 10, - regParam: Double = 1.0, + regParam: Double = 0.1, implicitPrefs: Boolean = false, alpha: Double = 1.0, nonnegative: Boolean = false, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed
Repository: spark Updated Branches: refs/heads/master 5502a9cf8 -> 8aa560b75 [SPARK-19761][SQL] create InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero failed ## What changes were proposed in this pull request? If we create a InMemoryFileIndex with an empty rootPaths when set PARALLEL_PARTITION_DISCOVERY_THRESHOLD to zero, it will throw an exception: ``` Positive number of slices required java.lang.IllegalArgumentException: Positive number of slices required at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$.org$apache$spark$sql$execution$datasources$PartitioningAwareFileIndex$$bulkListLeafFiles(PartitioningAwareFileIndex.scala:357) at org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.listLeafFiles(PartitioningAwareFileIndex.scala:256) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:74) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.(InMemoryFileIndex.scala:50) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9$$anonfun$apply$mcV$sp$2.apply$mcV$sp(FileIndexSuite.scala:186) at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:105) at org.apache.spark.sql.execution.datasources.FileIndexSuite.withSQLConf(FileIndexSuite.scala:33) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply$mcV$sp(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.apache.spark.sql.execution.datasources.FileIndexSuite$$anonfun$9.apply(FileIndexSuite.scala:185) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit test added Author: windpigerCloses #17093 from windpiger/fixEmptiPathInBulkListFiles. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8aa560b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8aa560b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8aa560b7 Branch: refs/heads/master Commit: 8aa560b75e6b083b2a890c52301414285ba35c3d Parents: 5502a9c Author: windpiger Authored: Wed Mar 1 08:16:29 2017 -0800 Committer: Xiao Li Committed: Wed Mar 1 08:16:29 2017 -0800 -- .../datasources/PartitioningAwareFileIndex.scala| 2 +- .../org/apache/spark/sql/internal/SQLConf.scala | 6 -- .../sql/execution/datasources/FileIndexSuite.scala | 16 3 files changed, 21 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8aa560b7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
spark git commit: [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
Repository: spark Updated Branches: refs/heads/branch-2.1 f719cccdc -> bbe0d8caa [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`. For the following query(): ``` val sqlA = """ |create temporary view ta as |select a, 'a' as tag from t1 union all |select a, 'b' as tag from t2 """.stripMargin val sqlB = """ |create temporary view tb as |select a, 'a' as tag from t3 union all |select a, 'b' as tag from t4 """.stripMargin val sql = """ |select tb.* from ta inner join tb on |ta.a = tb.a and |ta.tag = tb.tag """.stripMargin ``` The tag column is an constant alias column, it's folded by `FoldablePropagation` like this: ``` TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation === Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, ((a#0 = a#4) && (a = a)) :- Union :- Union : :- Project [a#0, a AS tag#8]: :- Project [a#0, a AS tag#8] : : +- LocalRelation [a#0] : : +- LocalRelation [a#0] : +- Project [a#2, b AS tag#9]: +- Project [a#2, b AS tag#9] : +- LocalRelation [a#2] : +- LocalRelation [a#2] +- Union +- Union :- Project [a#4, a AS tag#14] :- Project [a#4, a AS tag#14] : +- LocalRelation [a#4] : +- LocalRelation [a#4] +- Project [a#6, b AS tag#15] +- Project [a#6, b AS tag#15] +- LocalRelation [a#6] +- LocalRelation [a#6] ``` Finally the Result of Batch Operator Optimizations is: ``` Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, (a#0 = a#4) ! :- SubqueryAlias ta, `ta` :- Union ! : +- Union: :- LocalRelation [a#0] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2] ! : : +- SubqueryAlias t1, `t1` +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#14] ! : :+- SubqueryAlias grouping +- LocalRelation [a#6, tag#15] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! :+- SubqueryAlias t2, `t2` ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb, `tb` ! +- Union ! :- Project [a#4, a AS tag#14] ! : +- SubqueryAlias t3, `t3` ! : +- Project [a#4] ! :+- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#15] !+- SubqueryAlias t4, `t4` ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong. After fix: ``` === Result of Batch LocalRelation === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [a#4, tag#11] +- Project [a#4, tag#11] +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) ! :- SubqueryAlias ta :- Union ! : +- Union : :- LocalRelation [a#0, tag#8] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2, tag#9] ! : : +- SubqueryAlias t1 +- Union ! : : +- Project [a#0]:- LocalRelation [a#4, tag#11] ! : :+- SubqueryAlias grouping+- LocalRelation [a#6, tag#12] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! :+- SubqueryAlias t2 ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb !+- Union ! :- Project [a#4, a AS tag#11] ! : +- SubqueryAlias t3 ! : +- Project [a#4] ! :+- SubqueryAlias grouping !
spark git commit: [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule
Repository: spark Updated Branches: refs/heads/master 38e783534 -> 5502a9cf8 [SPARK-19766][SQL] Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule ## What changes were proposed in this pull request? This PR fixes the code in Optimizer phase where the constant alias columns of a `INNER JOIN` query are folded in Rule `FoldablePropagation`. For the following query(): ``` val sqlA = """ |create temporary view ta as |select a, 'a' as tag from t1 union all |select a, 'b' as tag from t2 """.stripMargin val sqlB = """ |create temporary view tb as |select a, 'a' as tag from t3 union all |select a, 'b' as tag from t4 """.stripMargin val sql = """ |select tb.* from ta inner join tb on |ta.a = tb.a and |ta.tag = tb.tag """.stripMargin ``` The tag column is an constant alias column, it's folded by `FoldablePropagation` like this: ``` TRACE SparkOptimizer: === Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation === Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, ((a#0 = a#4) && (a = a)) :- Union :- Union : :- Project [a#0, a AS tag#8]: :- Project [a#0, a AS tag#8] : : +- LocalRelation [a#0] : : +- LocalRelation [a#0] : +- Project [a#2, b AS tag#9]: +- Project [a#2, b AS tag#9] : +- LocalRelation [a#2] : +- LocalRelation [a#2] +- Union +- Union :- Project [a#4, a AS tag#14] :- Project [a#4, a AS tag#14] : +- LocalRelation [a#4] : +- LocalRelation [a#4] +- Project [a#6, b AS tag#15] +- Project [a#6, b AS tag#15] +- LocalRelation [a#6] +- LocalRelation [a#6] ``` Finally the Result of Batch Operator Optimizations is: ``` Project [a#4, tag#14] Project [a#4, tag#14] !+- Join Inner, ((a#0 = a#4) && (tag#8 = tag#14)) +- Join Inner, (a#0 = a#4) ! :- SubqueryAlias ta, `ta` :- Union ! : +- Union: :- LocalRelation [a#0] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2] ! : : +- SubqueryAlias t1, `t1` +- Union ! : : +- Project [a#0] :- LocalRelation [a#4, tag#14] ! : :+- SubqueryAlias grouping +- LocalRelation [a#6, tag#15] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! :+- SubqueryAlias t2, `t2` ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb, `tb` ! +- Union ! :- Project [a#4, a AS tag#14] ! : +- SubqueryAlias t3, `t3` ! : +- Project [a#4] ! :+- SubqueryAlias grouping ! : +- LocalRelation [a#4] ! +- Project [a#6, b AS tag#15] !+- SubqueryAlias t4, `t4` ! +- Project [a#6] ! +- SubqueryAlias grouping ! +- LocalRelation [a#6] ``` The condition `tag#8 = tag#14` of INNER JOIN has been removed. This leads to the data of inner join being wrong. After fix: ``` === Result of Batch LocalRelation === GlobalLimit 21 GlobalLimit 21 +- LocalLimit 21 +- LocalLimit 21 +- Project [a#4, tag#11] +- Project [a#4, tag#11] +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) +- Join Inner, ((a#0 = a#4) && (tag#8 = tag#11)) ! :- SubqueryAlias ta :- Union ! : +- Union : :- LocalRelation [a#0, tag#8] ! : :- Project [a#0, a AS tag#8] : +- LocalRelation [a#2, tag#9] ! : : +- SubqueryAlias t1 +- Union ! : : +- Project [a#0]:- LocalRelation [a#4, tag#11] ! : :+- SubqueryAlias grouping+- LocalRelation [a#6, tag#12] ! : : +- LocalRelation [a#0] ! : +- Project [a#2, b AS tag#9] ! :+- SubqueryAlias t2 ! : +- Project [a#2] ! : +- SubqueryAlias grouping ! : +- LocalRelation [a#2] ! +- SubqueryAlias tb !+- Union ! :- Project [a#4, a AS tag#11] ! : +- SubqueryAlias t3 ! : +- Project [a#4] ! :+- SubqueryAlias grouping !
spark git commit: [SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path
Repository: spark Updated Branches: refs/heads/master 4913c92c2 -> 38e783534 [SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path ## What changes were proposed in this pull request? `Catalog.refreshByPath` can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, `CacheManager.invalidateCachedPath` doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi HsiehCloses #17064 from viirya/fix-refreshByPath. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38e78353 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38e78353 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38e78353 Branch: refs/heads/master Commit: 38e7835347a2e1803b1df5e73cf8b749951b11b2 Parents: 4913c92 Author: Liang-Chi Hsieh Authored: Wed Mar 1 00:19:57 2017 -0800 Committer: Wenchen Fan Committed: Wed Mar 1 00:19:57 2017 -0800 -- .../spark/sql/execution/CacheManager.scala | 19 ++- .../org/apache/spark/sql/CachedTableSuite.scala | 16 2 files changed, 26 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 4ca1347..8013851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -168,15 +168,16 @@ class CacheManager extends Logging { (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } -cachedData.foreach { - case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => -val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) -if (dataIndex >= 0) { - data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) - cachedData.remove(dataIndex) -} - sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) - case _ => // Do Nothing +cachedData.filter { + case data if data.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined => true + case _ => false +}.foreach { data => + val dataIndex = cachedData.indexWhere(cd => data.plan.sameResult(cd.plan)) + if (dataIndex >= 0) { +data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = true) +cachedData.remove(dataIndex) + } + sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/38e78353/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 1af1a36..2a0e088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -634,4 +634,20 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(getNumInMemoryRelations(cachedPlan2) == 4) } } + + test("refreshByPath should refresh all cached plans with the specified path") { +withTempDir { dir => + val path = dir.getCanonicalPath() + + spark.range(10).write.mode("overwrite").parquet(path) + spark.read.parquet(path).cache() + spark.read.parquet(path).filter($"id" > 4).cache() + assert(spark.read.parquet(path).filter($"id" > 4).count() == 5) + + spark.range(20).write.mode("overwrite").parquet(path) + spark.catalog.refreshByPath(path) + assert(spark.read.parquet(path).count() == 20) + assert(spark.read.parquet(path).filter($"id" > 4).count() == 15) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org