spark git commit: Tiny style improvement.
Repository: spark Updated Branches: refs/heads/master f923c849e -> 150d26cad Tiny style improvement. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/150d26ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/150d26ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/150d26ca Branch: refs/heads/master Commit: 150d26cad435e48441859ff266183161be893fac Parents: f923c84 Author: Reynold Xin Authored: Mon Dec 19 22:50:23 2016 -0800 Committer: Reynold Xin Committed: Mon Dec 19 22:50:23 2016 -0800 -- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/150d26ca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9d28e27..d33f7da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable -import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation} +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType /** @@ -214,6 +214,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { dataSource.write(mode, df) } + /** * Inserts the content of the `DataFrame` to the specified table. It requires that * the schema of the `DataFrame` is the same as the schema of the table. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table
Repository: spark Updated Branches: refs/heads/master fa829ce21 -> f923c849e [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table ## What changes were proposed in this pull request? When we append data to an existing table with `DataFrameWriter.saveAsTable`, we will do various checks to make sure the appended data is consistent with the existing data. However, we get the information of the existing table by matching the table relation, instead of looking at the table metadata. This is error-prone, e.g. we only check the number of columns for `HadoopFsRelation`, we forget to check bucketing, etc. This PR refactors the error checking by looking at the metadata of the existing table, and fix several bugs: * SPARK-18899: We forget to check if the specified bucketing matched the existing table, which may lead to a problematic table that has different bucketing in different data files. * SPARK-18912: We forget to check the number of columns for non-file-based data source table * SPARK-18913: We don't support append data to a table with special column names. ## How was this patch tested? new regression test. Author: Wenchen Fan Closes #16313 from cloud-fan/bug1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f923c849 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f923c849 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f923c849 Branch: refs/heads/master Commit: f923c849e5b8f7e7aeafee59db598a9bf4970f50 Parents: fa829ce Author: Wenchen Fan Authored: Mon Dec 19 20:03:33 2016 -0800 Committer: gatorsmile Committed: Mon Dec 19 20:03:33 2016 -0800 -- .../catalyst/catalog/ExternalCatalogUtils.scala | 37 +++ .../spark/sql/catalyst/catalog/interface.scala | 10 ++ .../command/createDataSourceTables.scala| 110 --- .../spark/sql/execution/datasources/rules.scala | 53 +++-- .../spark/sql/execution/command/DDLSuite.scala | 4 +- .../sql/test/DataFrameReaderWriterSuite.scala | 38 ++- .../sql/hive/MetastoreDataSourcesSuite.scala| 17 ++- .../sql/sources/HadoopFsRelationTest.scala | 2 +- 8 files changed, 180 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f923c849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 817c1ab..4331841 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec object ExternalCatalogUtils { @@ -133,4 +135,39 @@ object CatalogUtils { case o => o } } + + def normalizePartCols( + tableName: String, + tableCols: Seq[String], + partCols: Seq[String], + resolver: Resolver): Seq[String] = { +partCols.map(normalizeColumnName(tableName, tableCols, _, "partition", resolver)) + } + + def normalizeBucketSpec( + tableName: String, + tableCols: Seq[String], + bucketSpec: BucketSpec, + resolver: Resolver): BucketSpec = { +val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec +val normalizedBucketCols = bucketColumnNames.map { colName => + normalizeColumnName(tableName, tableCols, colName, "bucket", resolver) +} +val normalizedSortCols = sortColumnNames.map { colName => + normalizeColumnName(tableName, tableCols, colName, "sort", resolver) +} +BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols) + } + + private def normalizeColumnName( + tableName: String, + tableCols: Seq[String], + colName: String, + colType: String, + resolver: Resolver): String = { +tableCols.find(resolver(_, colName)).getOrElse { + throw new AnalysisException(s"$colType column $colName is not defined in table $tableName, " + +s"defined table columns are: ${tableCols.mkString(", ")}") +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/f923c849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --
spark git commit: [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors
Repository: spark Updated Branches: refs/heads/master 5857b9ac2 -> fa829ce21 [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors ## What changes were proposed in this pull request? Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks. This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details. ## How was this patch tested? Tested via a new test case in `JobCancellationSuite`, plus manual testing. Author: Josh Rosen Closes #16189 from JoshRosen/cancellation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa829ce2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa829ce2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa829ce2 Branch: refs/heads/master Commit: fa829ce21fb84028d90b739a49c4ece70a17ccfd Parents: 5857b9a Author: Josh Rosen Authored: Mon Dec 19 18:43:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 18:43:59 2016 -0800 -- .../org/apache/spark/executor/Executor.scala| 169 ++- .../scala/org/apache/spark/util/Utils.scala | 56 +++--- .../org/apache/spark/JobCancellationSuite.scala | 77 + docs/configuration.md | 42 + 4 files changed, 316 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa829ce2/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 9501dd9..3346f6d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -84,6 +84,16 @@ private[spark] class Executor( // Start worker thread pool private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) + // Pool used for threads that supervise task killing / cancellation + private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task reaper") + // For tasks which are in the process of being killed, this map holds the most recently created + // TaskReaper. All accesses to this map should be synchronized on the map itself (this isn't + // a ConcurrentHashMap because we use the synchronization for purposes other than simply guarding + // the integrity of the map's internal state). The purpose of this map is to prevent the creation + // of a separate TaskReaper for every killTask() of a given task. Instead, this map allows us to + // track whether an existing TaskReaper fulfills the role of a TaskReaper that we would otherwise + // create. The map key is a task id. + private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]() if (!isLocal) { env.metricsSystem.registerSource(executorSource) @@ -93,6 +103,9 @@ private[spark] class Executor( // Whether to load classes in user jars before those in Spark jars private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + // Whether to monitor killed / interrupted tasks + private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) + // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager private val urlClassLoader = createClassLoader() @@ -148,9 +161,27 @@ private[spa
spark git commit: [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter
Repository: spark Updated Branches: refs/heads/master 4cb49412d -> 5857b9ac2 [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter ## What changes were proposed in this pull request? In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189). This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses. ## How was this patch tested? Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here. Author: Josh Rosen Closes #16340 from JoshRosen/sql-task-interruption. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5857b9ac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5857b9ac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5857b9ac Branch: refs/heads/master Commit: 5857b9ac2d9808d9b89a5b29620b5052e2beebf5 Parents: 4cb4941 Author: Josh Rosen Authored: Tue Dec 20 01:19:38 2016 +0100 Committer: Herman van Hovell Committed: Tue Dec 20 01:19:38 2016 +0100 -- .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++ .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++ .../spark/sql/execution/datasources/FileScanRDD.scala | 12 ++-- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 5 +++-- 4 files changed, 35 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5857b9ac/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 252a35e..5b42843 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -22,6 +22,8 @@ import java.util.LinkedList; import org.apache.avro.reflect.Nullable; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskKilledException; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -253,6 +255,7 @@ public final class UnsafeInMemorySorter { private long keyPrefix; private int recordLength; private long currentPageNumber; +private final TaskContext taskContext = TaskContext.get(); private SortedIterator(int numRecords, int offset) { this.numRecords = numRecords; @@ -283,6 +286,14 @@ public final class UnsafeInMemorySorter { @Override public void loadNext() { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This check is added here in `loadNext()` instead of in + // `hasNext()` because it's technically possible for the caller to be relying on + // `getNumRecords()` instead of `hasNext()` to know when to stop. + if (taskContext != null && taskContext.isInterrupted()) { +throw new TaskKilledException(); + } // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer); http://git-wip-us.apache.org/repos/asf/spark/blob/5857b9ac/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index a658e5e..b6323c6 100644 --- a/core/sr
spark git commit: [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter
Repository: spark Updated Branches: refs/heads/branch-2.1 c1a26b458 -> f07e989c0 [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter ## What changes were proposed in this pull request? In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189). This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses. ## How was this patch tested? Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here. Author: Josh Rosen Closes #16340 from JoshRosen/sql-task-interruption. (cherry picked from commit 5857b9ac2d9808d9b89a5b29620b5052e2beebf5) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f07e989c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f07e989c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f07e989c Branch: refs/heads/branch-2.1 Commit: f07e989c02844151587f9a29fe77ea65facea422 Parents: c1a26b4 Author: Josh Rosen Authored: Tue Dec 20 01:19:38 2016 +0100 Committer: Herman van Hovell Committed: Tue Dec 20 01:19:51 2016 +0100 -- .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++ .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++ .../spark/sql/execution/datasources/FileScanRDD.scala | 12 ++-- .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala | 5 +++-- 4 files changed, 35 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f07e989c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 252a35e..5b42843 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -22,6 +22,8 @@ import java.util.LinkedList; import org.apache.avro.reflect.Nullable; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskKilledException; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -253,6 +255,7 @@ public final class UnsafeInMemorySorter { private long keyPrefix; private int recordLength; private long currentPageNumber; +private final TaskContext taskContext = TaskContext.get(); private SortedIterator(int numRecords, int offset) { this.numRecords = numRecords; @@ -283,6 +286,14 @@ public final class UnsafeInMemorySorter { @Override public void loadNext() { + // Kill the task in case it has been marked as killed. This logic is from + // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order + // to avoid performance overhead. This check is added here in `loadNext()` instead of in + // `hasNext()` because it's technically possible for the caller to be relying on + // `getNumRecords()` instead of `hasNext()` to know when to stop. + if (taskContext != null && taskContext.isInterrupted()) { +throw new TaskKilledException(); + } // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer); http://git-wip-us.apache.org/repos/asf/spark/blob/f07e989c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/ap
spark git commit: [SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler
Repository: spark Updated Branches: refs/heads/master 70d495dce -> 4cb49412d [SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler ## What changes were proposed in this pull request? Right now we serialize the empty task metrics once per task â Since this is shared across all tasks we could use the same serialized task metrics across all tasks of a stage. ## How was this patch tested? - [x] Run tests on EC2 to measure performance improvement Author: Shivaram Venkataraman Closes #16261 from shivaram/task-metrics-one-copy. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cb49412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cb49412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cb49412 Branch: refs/heads/master Commit: 4cb49412d1d7d10ffcc738475928c7de2bc59fd4 Parents: 70d495d Author: Shivaram Venkataraman Authored: Mon Dec 19 14:53:01 2016 -0800 Committer: Kay Ousterhout Committed: Mon Dec 19 14:53:01 2016 -0800 -- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../scala/org/apache/spark/scheduler/ResultTask.scala| 9 + .../org/apache/spark/scheduler/ShuffleMapTask.scala | 11 ++- .../src/main/scala/org/apache/spark/scheduler/Task.scala | 10 -- .../scala/org/apache/spark/executor/ExecutorSuite.scala | 4 +++- .../test/scala/org/apache/spark/scheduler/FakeTask.scala | 10 +- .../org/apache/spark/scheduler/TaskContextSuite.scala| 6 -- 7 files changed, 38 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0a1c500..6177baf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1009,13 +1009,14 @@ class DAGScheduler( } val tasks: Seq[Task[_]] = try { + val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId), + taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } @@ -1025,7 +1026,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics, + taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index d19353f..6abdf0f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. - * @param metrics a `TaskMetrics` that is created at driver side and sent to executor side. + * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized on the driver side + * and sent to executor side. * * The parameters below are optional: * @param jobId id of the job this task belongs to @@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U]( locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, -metrics: TaskMetrics, +serializedTaskMetrics: Array[Byte], jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) - extends Task[U](stageId, stageAttemptId, partition.inde
spark git commit: [SPARK-18624][SQL] Implicit cast ArrayType(InternalType)
Repository: spark Updated Branches: refs/heads/master 7a75ee1c9 -> 70d495dce [SPARK-18624][SQL] Implicit cast ArrayType(InternalType) ## What changes were proposed in this pull request? Currently `ImplicitTypeCasts` doesn't handle casts between `ArrayType`s, this is not convenient, we should add a rule to enable casting from `ArrayType(InternalType)` to `ArrayType(newInternalType)`. Goals: 1. Add a rule to `ImplicitTypeCasts` to enable casting between `ArrayType`s; 2. Simplify `Percentile` and `ApproximatePercentile`. ## How was this patch tested? Updated test cases in `TypeCoercionSuite`. Author: jiangxingbo Closes #16057 from jiangxb1987/implicit-cast-complex-types. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70d495dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70d495dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70d495dc Branch: refs/heads/master Commit: 70d495dcecce8617b7099fc599fe7c43d7eae66e Parents: 7a75ee1 Author: jiangxingbo Authored: Mon Dec 19 21:20:47 2016 +0100 Committer: Herman van Hovell Committed: Mon Dec 19 21:20:47 2016 +0100 -- .../sql/catalyst/analysis/TypeCoercion.scala| 57 +--- .../spark/sql/catalyst/expressions/Cast.scala | 6 +-- .../aggregate/ApproximatePercentile.scala | 19 +++ .../expressions/aggregate/Percentile.scala | 14 ++--- .../catalyst/analysis/TypeCoercionSuite.scala | 45 ++-- 5 files changed, 92 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/70d495dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 6662a9e..cd73f9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -673,48 +673,69 @@ object TypeCoercion { * If the expression has an incompatible type that cannot be implicitly cast, return None. */ def implicitCast(e: Expression, expectedType: AbstractDataType): Option[Expression] = { - val inType = e.dataType + implicitCast(e.dataType, expectedType).map { dt => +if (dt == e.dataType) e else Cast(e, dt) + } +} +private def implicitCast(inType: DataType, expectedType: AbstractDataType): Option[DataType] = { // Note that ret is nullable to avoid typing a lot of Some(...) in this local scope. // We wrap immediately an Option after this. - @Nullable val ret: Expression = (inType, expectedType) match { - + @Nullable val ret: DataType = (inType, expectedType) match { // If the expected type is already a parent of the input type, no need to cast. -case _ if expectedType.acceptsType(inType) => e +case _ if expectedType.acceptsType(inType) => inType // Cast null type (usually from null literals) into target types -case (NullType, target) => Cast(e, target.defaultConcreteType) +case (NullType, target) => target.defaultConcreteType // If the function accepts any numeric type and the input is a string, we follow the hive // convention and cast that input into a double -case (StringType, NumericType) => Cast(e, NumericType.defaultConcreteType) +case (StringType, NumericType) => NumericType.defaultConcreteType // Implicit cast among numeric types. When we reach here, input type is not acceptable. // If input is a numeric type but not decimal, and we expect a decimal type, // cast the input to decimal. -case (d: NumericType, DecimalType) => Cast(e, DecimalType.forType(d)) +case (d: NumericType, DecimalType) => DecimalType.forType(d) // For any other numeric types, implicitly cast to each other, e.g. long -> int, int -> long -case (_: NumericType, target: NumericType) => Cast(e, target) +case (_: NumericType, target: NumericType) => target // Implicit cast between date time types -case (DateType, TimestampType) => Cast(e, TimestampType) -case (TimestampType, DateType) => Cast(e, DateType) +case (DateType, TimestampType) => TimestampType +case (TimestampType, DateType) => DateType // Implicit cast from/to string -case (StringType, DecimalType) => Cast(e, DecimalType.SYSTEM_DEFAULT) -case (StringType, target: NumericType) => Cast(e, target) -case (StringType, DateType) => Cast(e, DateType)
spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase
Repository: spark Updated Branches: refs/heads/branch-2.1 fc1b25660 -> c1a26b458 [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase ## What changes were proposed in this pull request? It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface. What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence. This PR fixes this and use `Hive.databaseExists` to check database existence. ## How was this patch tested? N/A Author: Wenchen Fan Closes #16332 from cloud-fan/minor. (cherry picked from commit 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a26b45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a26b45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a26b45 Branch: refs/heads/branch-2.1 Commit: c1a26b458dd353be3ab1a2b3f9bb80809cf63479 Parents: fc1b256 Author: Wenchen Fan Authored: Mon Dec 19 11:42:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 11:43:55 2016 -0800 -- .../apache/spark/sql/hive/HiveExternalCatalog.scala| 2 +- .../org/apache/spark/sql/hive/client/HiveClient.scala | 8 +++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 12 .../apache/spark/sql/hive/client/VersionsSuite.scala | 13 +++-- 4 files changed, 19 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f67ddc9..f321c45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } override def databaseExists(db: String): Boolean = withClient { -client.getDatabaseOption(db).isDefined +client.databaseExists(db) } override def listDatabases(): Seq[String] = withClient { http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 8e7c871..0be5b0b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -58,12 +58,10 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - final def getDatabase(name: String): CatalogDatabase = { -getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) - } + def getDatabase(name: String): CatalogDatabase - /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[CatalogDatabase] + /** Return whether a table/view with the specified name exists. */ + def databaseExists(dbName: String): Boolean /** List the names of all the databases that match the specified pattern. */ def listDatabases(pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index db73596..e0f7156 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -300,7 +300,7 @@ private[hive] class HiveClientImpl( } override def setCurrentDatabase(databaseName: String): Unit = withHiveState { -if (getDatabaseOption(databaseName).isDefined) { +if (databaseExists(databaseName)) { state.setCurrentDatabase(databaseName) } else { throw new NoSuchDatabaseException(databaseName) @@ -336,14 +336,18 @@ private[hive] class HiveClientImpl(
spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase
Repository: spark Updated Branches: refs/heads/master 24482858e -> 7a75ee1c9 [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase ## What changes were proposed in this pull request? It's weird that we use `Hive.getDatabase` to check the existence of a database, while Hive has a `databaseExists` interface. What's worse, `Hive.getDatabase` will produce an error message if the database doesn't exist, which is annoying when we only want to check the database existence. This PR fixes this and use `Hive.databaseExists` to check database existence. ## How was this patch tested? N/A Author: Wenchen Fan Closes #16332 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a75ee1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a75ee1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a75ee1c Branch: refs/heads/master Commit: 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b Parents: 2448285 Author: Wenchen Fan Authored: Mon Dec 19 11:42:59 2016 -0800 Committer: Yin Huai Committed: Mon Dec 19 11:42:59 2016 -0800 -- .../apache/spark/sql/hive/HiveExternalCatalog.scala| 2 +- .../org/apache/spark/sql/hive/client/HiveClient.scala | 8 +++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 12 .../apache/spark/sql/hive/client/VersionsSuite.scala | 13 +++-- 4 files changed, 19 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 544f277..9c19a0e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } override def databaseExists(db: String): Boolean = withClient { -client.getDatabaseOption(db).isDefined +client.databaseExists(db) } override def listDatabases(): Seq[String] = withClient { http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 837b6c5..8bdcf31 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -58,12 +58,10 @@ private[hive] trait HiveClient { def setCurrentDatabase(databaseName: String): Unit /** Returns the metadata for specified database, throwing an exception if it doesn't exist */ - final def getDatabase(name: String): CatalogDatabase = { -getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name)) - } + def getDatabase(name: String): CatalogDatabase - /** Returns the metadata for a given database, or None if it doesn't exist. */ - def getDatabaseOption(name: String): Option[CatalogDatabase] + /** Return whether a table/view with the specified name exists. */ + def databaseExists(dbName: String): Boolean /** List the names of all the databases that match the specified pattern. */ def listDatabases(pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b75f6e9..bacae8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -300,7 +300,7 @@ private[hive] class HiveClientImpl( } override def setCurrentDatabase(databaseName: String): Unit = withHiveState { -if (getDatabaseOption(databaseName).isDefined) { +if (databaseExists(databaseName)) { state.setCurrentDatabase(databaseName) } else { throw new NoSuchDatabaseException(databaseName) @@ -336,14 +336,18 @@ private[hive] class HiveClientImpl( Option(database.properties).map(_.asJava).orNull)) } - override def getDatabaseOption(na
spark git commit: Fix test case for SubquerySuite.
Repository: spark Updated Branches: refs/heads/branch-2.0 b41668349 -> 2a5ab1490 Fix test case for SubquerySuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a5ab149 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a5ab149 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a5ab149 Branch: refs/heads/branch-2.0 Commit: 2a5ab149005f305b7bfcb160c5023e1a83d2dce1 Parents: b416683 Author: Reynold Xin Authored: Mon Dec 19 11:33:31 2016 -0800 Committer: Reynold Xin Committed: Mon Dec 19 11:33:31 2016 -0800 -- sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a5ab149/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index eff8894..dfbfa3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -59,7 +59,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { "(select id from range(2) union all select id from range(2))") // The depth first traversal of the plan tree -val dfs = Seq("Project", "Filter", "Union", "Project", "Range", "Project", "Range", "Range") +val dfs = Seq("Project", "Filter", "SubqueryAlias", "Union", "Project", "Range", "Project", + "Range", "Range") val numbered = df.queryExecution.analyzed.numberedTreeString.split("\n") // There should be 8 plan nodes in total - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18700][SQL] Add StripedLock for each table's relation in cache
Repository: spark Updated Branches: refs/heads/branch-2.1 3080f995c -> fc1b25660 [SPARK-18700][SQL] Add StripedLock for each table's relation in cache ## What changes were proposed in this pull request? As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM. In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it. ## How was this patch tested? Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics` Author: xuanyuanking Closes #16135 from xuanyuanking/SPARK-18700. (cherry picked from commit 24482858e05bea84cacb41c62be0a9aaa33897ee) Signed-off-by: Herman van Hovell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc1b2566 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc1b2566 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc1b2566 Branch: refs/heads/branch-2.1 Commit: fc1b25660d8d2ac676c0b020208bcb9b711978c8 Parents: 3080f99 Author: xuanyuanking Authored: Mon Dec 19 20:31:43 2016 +0100 Committer: Herman van Hovell Committed: Mon Dec 19 20:32:02 2016 +0100 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 134 +++ .../hive/PartitionedTablePerfStatsSuite.scala | 31 + 2 files changed, 106 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc1b2566/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index edbde5d..0407cf6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -32,7 +33,6 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ - /** * Legacy catalog for interacting with the Hive metastore. * @@ -53,6 +53,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log tableIdent.table.toLowerCase) } + /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ + private val tableCreationLocks = Striped.lazyWeakLock(100) + + /** Acquires a lock on the table cache for the duration of `f`. */ + private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = { +val lock = tableCreationLocks.get(tableName) +lock.lock() +try f finally { + lock.unlock() +} + } + /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { @@ -209,72 +221,76 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - val cached = getCached( -tableIdentifier, -rootPaths, -metastoreRelation, -metastoreSchema, -fileFormatClass, -bucketSpec, -Some(partitionSchema)) - - val logicalRelation = cached.getOrElse { -val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong -val fileCatalog = { - val catalog = new CatalogFileIndex( -sparkSession, metastoreRelation.catalogTable, sizeInBytes) - if (lazyPruningEnabled) { -catalog - } else { -catalog.filterPartitions(Nil) // materialize all the partitions in memory + withTableCreationLock(tableIdentifier, { +val cached = getCached( + tableIdentifier, + rootPaths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSchema)) + +val logicalRelation = cached.getOrElse { + val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong +
spark git commit: [SPARK-18700][SQL] Add StripedLock for each table's relation in cache
Repository: spark Updated Branches: refs/heads/master 7db09abb0 -> 24482858e [SPARK-18700][SQL] Add StripedLock for each table's relation in cache ## What changes were proposed in this pull request? As the scenario describe in [SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when cachedDataSourceTables invalided, the coming few queries will fetch all FileStatus in listLeafFiles function. In the condition of table has many partitions, these jobs will occupy much memory of driver finally may cause driver OOM. In this patch, add StripedLock for each table's relation in cache not for the whole cachedDataSourceTables, each table's load cache operation protected by it. ## How was this patch tested? Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and check it only loading once using metrics in `HiveCatalogMetrics` Author: xuanyuanking Closes #16135 from xuanyuanking/SPARK-18700. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24482858 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24482858 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24482858 Branch: refs/heads/master Commit: 24482858e05bea84cacb41c62be0a9aaa33897ee Parents: 7db09ab Author: xuanyuanking Authored: Mon Dec 19 20:31:43 2016 +0100 Committer: Herman van Hovell Committed: Mon Dec 19 20:31:43 2016 +0100 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 134 +++ .../hive/PartitionedTablePerfStatsSuite.scala | 31 + 2 files changed, 106 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24482858/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index edbde5d..0407cf6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -32,7 +33,6 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ - /** * Legacy catalog for interacting with the Hive metastore. * @@ -53,6 +53,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log tableIdent.table.toLowerCase) } + /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ + private val tableCreationLocks = Striped.lazyWeakLock(100) + + /** Acquires a lock on the table cache for the duration of `f`. */ + private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = { +val lock = tableCreationLocks.get(tableName) +lock.lock() +try f finally { + lock.unlock() +} + } + /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { @@ -209,72 +221,76 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - val cached = getCached( -tableIdentifier, -rootPaths, -metastoreRelation, -metastoreSchema, -fileFormatClass, -bucketSpec, -Some(partitionSchema)) - - val logicalRelation = cached.getOrElse { -val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong -val fileCatalog = { - val catalog = new CatalogFileIndex( -sparkSession, metastoreRelation.catalogTable, sizeInBytes) - if (lazyPruningEnabled) { -catalog - } else { -catalog.filterPartitions(Nil) // materialize all the partitions in memory + withTableCreationLock(tableIdentifier, { +val cached = getCached( + tableIdentifier, + rootPaths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSchema)) + +val logicalRelation = cached.getOrElse { + val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong + val fileCatalog = { +val catalog = new CatalogFileIndex( + sparkSession, metastore
spark git commit: [SPARK-18356][ML] KMeans should cache RDD before training
Repository: spark Updated Branches: refs/heads/master 1e5c51f33 -> 7db09abb0 [SPARK-18356][ML] KMeans should cache RDD before training ## What changes were proposed in this pull request? According to request of Mr. Joseph Bradley , I did this update of my PR https://github.com/apache/spark/pull/15965 in order to eliminate the extrat fit() method. jkbradley ## How was this patch tested? Pass existing tests Author: Zakaria_Hili Author: HILI Zakaria Closes #16295 from ZakariaHili/zakbranch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db09abb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db09abb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db09abb Branch: refs/heads/master Commit: 7db09abb0168b77697064c69126ee82ca89609a0 Parents: 1e5c51f Author: Zakaria_Hili Authored: Mon Dec 19 10:30:38 2016 + Committer: Sean Owen Committed: Mon Dec 19 10:30:38 2016 + -- .../scala/org/apache/spark/ml/clustering/KMeans.scala | 12 +--- 1 file changed, 5 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7db09abb/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index e168a41..e02b532 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -302,22 +302,19 @@ class KMeans @Since("1.5.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE -fit(dataset, handlePersistence) - } - - @Since("2.2.0") - protected def fit(dataset: Dataset[_], handlePersistence: Boolean): KMeansModel = { transformSchema(dataset.schema, logging = true) + +val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } + if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) } + val instr = Instrumentation.create(this, instances) instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol) - val algo = new MLlibKMeans() .setK($(k)) .setInitializationMode($(initMode)) @@ -329,6 +326,7 @@ class KMeans @Since("1.5.0") ( val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) val summary = new KMeansSummary( model.transform(dataset), $(predictionCol), $(featuresCol), $(k)) + model.setSummary(Some(summary)) instr.logSuccess(model) if (handlePersistence) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org