spark git commit: Tiny style improvement.

2016-12-19 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f923c849e -> 150d26cad


Tiny style improvement.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/150d26ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/150d26ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/150d26ca

Branch: refs/heads/master
Commit: 150d26cad435e48441859ff266183161be893fac
Parents: f923c84
Author: Reynold Xin 
Authored: Mon Dec 19 22:50:23 2016 -0800
Committer: Reynold Xin 
Committed: Mon Dec 19 22:50:23 2016 -0800

--
 .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/150d26ca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9d28e27..d33f7da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
DDLUtils}
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
HadoopFsRelation}
+import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -214,6 +214,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 
 dataSource.write(mode, df)
   }
+
   /**
* Inserts the content of the `DataFrame` to the specified table. It 
requires that
* the schema of the `DataFrame` is the same as the schema of the table.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when append data to an existing table

2016-12-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master fa829ce21 -> f923c849e


[SPARK-18899][SPARK-18912][SPARK-18913][SQL] refactor the error checking when 
append data to an existing table

## What changes were proposed in this pull request?

When we append data to an existing table with `DataFrameWriter.saveAsTable`, we 
will do various checks to make sure the appended data is consistent with the 
existing data.

However, we get the information of the existing table by matching the table 
relation, instead of looking at the table metadata. This is error-prone, e.g. 
we only check the number of columns for `HadoopFsRelation`, we forget to check 
bucketing, etc.

This PR refactors the error checking by looking at the metadata of the existing 
table, and fix several bugs:
* SPARK-18899: We forget to check if the specified bucketing matched the 
existing table, which may lead to a problematic table that has different 
bucketing in different data files.
* SPARK-18912: We forget to check the number of columns for non-file-based data 
source table
* SPARK-18913: We don't support append data to a table with special column 
names.

## How was this patch tested?
new regression test.

Author: Wenchen Fan 

Closes #16313 from cloud-fan/bug1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f923c849
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f923c849
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f923c849

Branch: refs/heads/master
Commit: f923c849e5b8f7e7aeafee59db598a9bf4970f50
Parents: fa829ce
Author: Wenchen Fan 
Authored: Mon Dec 19 20:03:33 2016 -0800
Committer: gatorsmile 
Committed: Mon Dec 19 20:03:33 2016 -0800

--
 .../catalyst/catalog/ExternalCatalogUtils.scala |  37 +++
 .../spark/sql/catalyst/catalog/interface.scala  |  10 ++
 .../command/createDataSourceTables.scala| 110 ---
 .../spark/sql/execution/datasources/rules.scala |  53 +++--
 .../spark/sql/execution/command/DDLSuite.scala  |   4 +-
 .../sql/test/DataFrameReaderWriterSuite.scala   |  38 ++-
 .../sql/hive/MetastoreDataSourcesSuite.scala|  17 ++-
 .../sql/sources/HadoopFsRelationTest.scala  |   2 +-
 8 files changed, 180 insertions(+), 91 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f923c849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 817c1ab..4331841 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
 
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 
 object ExternalCatalogUtils {
@@ -133,4 +135,39 @@ object CatalogUtils {
   case o => o
 }
   }
+
+  def normalizePartCols(
+  tableName: String,
+  tableCols: Seq[String],
+  partCols: Seq[String],
+  resolver: Resolver): Seq[String] = {
+partCols.map(normalizeColumnName(tableName, tableCols, _, "partition", 
resolver))
+  }
+
+  def normalizeBucketSpec(
+  tableName: String,
+  tableCols: Seq[String],
+  bucketSpec: BucketSpec,
+  resolver: Resolver): BucketSpec = {
+val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec
+val normalizedBucketCols = bucketColumnNames.map { colName =>
+  normalizeColumnName(tableName, tableCols, colName, "bucket", resolver)
+}
+val normalizedSortCols = sortColumnNames.map { colName =>
+  normalizeColumnName(tableName, tableCols, colName, "sort", resolver)
+}
+BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols)
+  }
+
+  private def normalizeColumnName(
+  tableName: String,
+  tableCols: Seq[String],
+  colName: String,
+  colType: String,
+  resolver: Resolver): String = {
+tableCols.find(resolver(_, colName)).getOrElse {
+  throw new AnalysisException(s"$colType column $colName is not defined in 
table $tableName, " +
+s"defined table columns are: ${tableCols.mkString(", ")}")
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f923c849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--

spark git commit: [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 5857b9ac2 -> fa829ce21


[SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" 
because some tasks may not be interruptible or may not respond to their 
"killed" flags being set. If a significant fraction of a cluster's task slots 
are occupied by tasks that have been marked as killed but remain running then 
this can lead to a situation where new jobs and tasks are starved of resources 
that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to 
executors. At a high-level, task killing now launches a new thread which 
attempts to kill the task and then watches the task and periodically checks 
whether it has been killed. The TaskReaper will periodically re-attempt to call 
`TaskRunner.kill()` and will log warnings if the task keeps running. I modified 
TaskRunner to rename its thread at the start of the task, allowing TaskReaper 
to take a thread dump and filter it in order to log stacktraces from the exact 
task thread that we are waiting to finish. If the task has not stopped after a 
configurable timeout then the TaskReaper will throw an exception to trigger 
executor JVM death, thereby forcibly freeing any resources consumed by the 
zombie tasks.

This feature is flagged off by default and is controlled by four new 
configurations under the `spark.task.reaper.*` namespace. See the updated 
`configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen 

Closes #16189 from JoshRosen/cancellation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa829ce2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa829ce2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa829ce2

Branch: refs/heads/master
Commit: fa829ce21fb84028d90b739a49c4ece70a17ccfd
Parents: 5857b9a
Author: Josh Rosen 
Authored: Mon Dec 19 18:43:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 18:43:59 2016 -0800

--
 .../org/apache/spark/executor/Executor.scala| 169 ++-
 .../scala/org/apache/spark/util/Utils.scala |  56 +++---
 .../org/apache/spark/JobCancellationSuite.scala |  77 +
 docs/configuration.md   |  42 +
 4 files changed, 316 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa829ce2/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9501dd9..3346f6d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -84,6 +84,16 @@ private[spark] class Executor(
   // Start worker thread pool
   private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor 
task launch worker")
   private val executorSource = new ExecutorSource(threadPool, executorId)
+  // Pool used for threads that supervise task killing / cancellation
+  private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task 
reaper")
+  // For tasks which are in the process of being killed, this map holds the 
most recently created
+  // TaskReaper. All accesses to this map should be synchronized on the map 
itself (this isn't
+  // a ConcurrentHashMap because we use the synchronization for purposes other 
than simply guarding
+  // the integrity of the map's internal state). The purpose of this map is to 
prevent the creation
+  // of a separate TaskReaper for every killTask() of a given task. Instead, 
this map allows us to
+  // track whether an existing TaskReaper fulfills the role of a TaskReaper 
that we would otherwise
+  // create. The map key is a task id.
+  private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, 
TaskReaper]()
 
   if (!isLocal) {
 env.metricsSystem.registerSource(executorSource)
@@ -93,6 +103,9 @@ private[spark] class Executor(
   // Whether to load classes in user jars before those in Spark jars
   private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
 
+  // Whether to monitor killed / interrupted tasks
+  private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", 
false)
+
   // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager
   private val urlClassLoader = createClassLoader()
@@ -148,9 +161,27 @@ private[spa

spark git commit: [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter

2016-12-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 4cb49412d -> 5857b9ac2


[SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & 
UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check 
`TaskContext.isInterrupted()`, but this check is missing on a few critical read 
paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and 
UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to 
continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` 
checks to these paths. Note that I could have used `InterruptibleIterator` to 
simply wrap a bunch of iterators but in some cases this would have an adverse 
performance penalty or might not be effective due to certain special uses of 
Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic 
into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of 
non-cancellable tasks, one involving scans of huge files and another involving 
sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by 
the changes added here.

Author: Josh Rosen 

Closes #16340 from JoshRosen/sql-task-interruption.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5857b9ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5857b9ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5857b9ac

Branch: refs/heads/master
Commit: 5857b9ac2d9808d9b89a5b29620b5052e2beebf5
Parents: 4cb4941
Author: Josh Rosen 
Authored: Tue Dec 20 01:19:38 2016 +0100
Committer: Herman van Hovell 
Committed: Tue Dec 20 01:19:38 2016 +0100

--
 .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++
 .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++
 .../spark/sql/execution/datasources/FileScanRDD.scala   | 12 ++--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala  |  5 +++--
 4 files changed, 35 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5857b9ac/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 252a35e..5b42843 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
 
 import org.apache.avro.reflect.Nullable;
 
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskKilledException;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.Platform;
@@ -253,6 +255,7 @@ public final class UnsafeInMemorySorter {
 private long keyPrefix;
 private int recordLength;
 private long currentPageNumber;
+private final TaskContext taskContext = TaskContext.get();
 
 private SortedIterator(int numRecords, int offset) {
   this.numRecords = numRecords;
@@ -283,6 +286,14 @@ public final class UnsafeInMemorySorter {
 
 @Override
 public void loadNext() {
+  // Kill the task in case it has been marked as killed. This logic is from
+  // InterruptibleIterator, but we inline it here instead of wrapping the 
iterator in order
+  // to avoid performance overhead. This check is added here in 
`loadNext()` instead of in
+  // `hasNext()` because it's technically possible for the caller to be 
relying on
+  // `getNumRecords()` instead of `hasNext()` to know when to stop.
+  if (taskContext != null && taskContext.isInterrupted()) {
+throw new TaskKilledException();
+  }
   // This pointer points to a 4-byte record length, followed by the 
record's bytes
   final long recordPointer = array.get(offset + position);
   currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);

http://git-wip-us.apache.org/repos/asf/spark/blob/5857b9ac/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index a658e5e..b6323c6 100644
--- 
a/core/sr

spark git commit: [SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter

2016-12-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c1a26b458 -> f07e989c0


[SPARK-18928] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & 
UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check 
`TaskContext.isInterrupted()`, but this check is missing on a few critical read 
paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and 
UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to 
continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` 
checks to these paths. Note that I could have used `InterruptibleIterator` to 
simply wrap a bunch of iterators but in some cases this would have an adverse 
performance penalty or might not be effective due to certain special uses of 
Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic 
into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of 
non-cancellable tasks, one involving scans of huge files and another involving 
sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by 
the changes added here.

Author: Josh Rosen 

Closes #16340 from JoshRosen/sql-task-interruption.

(cherry picked from commit 5857b9ac2d9808d9b89a5b29620b5052e2beebf5)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f07e989c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f07e989c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f07e989c

Branch: refs/heads/branch-2.1
Commit: f07e989c02844151587f9a29fe77ea65facea422
Parents: c1a26b4
Author: Josh Rosen 
Authored: Tue Dec 20 01:19:38 2016 +0100
Committer: Herman van Hovell 
Committed: Tue Dec 20 01:19:51 2016 +0100

--
 .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++
 .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++
 .../spark/sql/execution/datasources/FileScanRDD.scala   | 12 ++--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala  |  5 +++--
 4 files changed, 35 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f07e989c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 252a35e..5b42843 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
 
 import org.apache.avro.reflect.Nullable;
 
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskKilledException;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.Platform;
@@ -253,6 +255,7 @@ public final class UnsafeInMemorySorter {
 private long keyPrefix;
 private int recordLength;
 private long currentPageNumber;
+private final TaskContext taskContext = TaskContext.get();
 
 private SortedIterator(int numRecords, int offset) {
   this.numRecords = numRecords;
@@ -283,6 +286,14 @@ public final class UnsafeInMemorySorter {
 
 @Override
 public void loadNext() {
+  // Kill the task in case it has been marked as killed. This logic is from
+  // InterruptibleIterator, but we inline it here instead of wrapping the 
iterator in order
+  // to avoid performance overhead. This check is added here in 
`loadNext()` instead of in
+  // `hasNext()` because it's technically possible for the caller to be 
relying on
+  // `getNumRecords()` instead of `hasNext()` to know when to stop.
+  if (taskContext != null && taskContext.isInterrupted()) {
+throw new TaskKilledException();
+  }
   // This pointer points to a 4-byte record length, followed by the 
record's bytes
   final long recordPointer = array.get(offset + position);
   currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);

http://git-wip-us.apache.org/repos/asf/spark/blob/f07e989c/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/ap

spark git commit: [SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler

2016-12-19 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 70d495dce -> 4cb49412d


[SPARK-18836][CORE] Serialize one copy of task metrics in DAGScheduler

## What changes were proposed in this pull request?

Right now we serialize the empty task metrics once per task – Since this is 
shared across all tasks we could use the same serialized task metrics across 
all tasks of a stage.

## How was this patch tested?

- [x] Run tests on EC2 to measure performance improvement

Author: Shivaram Venkataraman 

Closes #16261 from shivaram/task-metrics-one-copy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cb49412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cb49412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cb49412

Branch: refs/heads/master
Commit: 4cb49412d1d7d10ffcc738475928c7de2bc59fd4
Parents: 70d495d
Author: Shivaram Venkataraman 
Authored: Mon Dec 19 14:53:01 2016 -0800
Committer: Kay Ousterhout 
Committed: Mon Dec 19 14:53:01 2016 -0800

--
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala  |  5 +++--
 .../scala/org/apache/spark/scheduler/ResultTask.scala|  9 +
 .../org/apache/spark/scheduler/ShuffleMapTask.scala  | 11 ++-
 .../src/main/scala/org/apache/spark/scheduler/Task.scala | 10 --
 .../scala/org/apache/spark/executor/ExecutorSuite.scala  |  4 +++-
 .../test/scala/org/apache/spark/scheduler/FakeTask.scala | 10 +-
 .../org/apache/spark/scheduler/TaskContextSuite.scala|  6 --
 7 files changed, 38 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0a1c500..6177baf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1009,13 +1009,14 @@ class DAGScheduler(
 }
 
 val tasks: Seq[Task[_]] = try {
+  val serializedTaskMetrics = 
closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
   stage match {
 case stage: ShuffleMapStage =>
   partitionsToCompute.map { id =>
 val locs = taskIdToLocations(id)
 val part = stage.rdd.partitions(id)
 new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
-  taskBinary, part, locs, stage.latestInfo.taskMetrics, 
properties, Option(jobId),
+  taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
   Option(sc.applicationId), sc.applicationAttemptId)
   }
 
@@ -1025,7 +1026,7 @@ class DAGScheduler(
 val part = stage.rdd.partitions(p)
 val locs = taskIdToLocations(id)
 new ResultTask(stage.id, stage.latestInfo.attemptId,
-  taskBinary, part, locs, id, properties, 
stage.latestInfo.taskMetrics,
+  taskBinary, part, locs, id, properties, serializedTaskMetrics,
   Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
   }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4cb49412/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index d19353f..6abdf0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -42,7 +42,8 @@ import org.apache.spark.rdd.RDD
  * @param outputId index of the task in this job (a job can launch tasks on 
only a subset of the
  * input RDD's partitions).
  * @param localProperties copy of thread-local properties set by the user on 
the driver side.
- * @param metrics a `TaskMetrics` that is created at driver side and sent to 
executor side.
+ * @param serializedTaskMetrics a `TaskMetrics` that is created and serialized 
on the driver side
+ *  and sent to executor side.
  *
  * The parameters below are optional:
  * @param jobId id of the job this task belongs to
@@ -57,12 +58,12 @@ private[spark] class ResultTask[T, U](
 locs: Seq[TaskLocation],
 val outputId: Int,
 localProperties: Properties,
-metrics: TaskMetrics,
+serializedTaskMetrics: Array[Byte],
 jobId: Option[Int] = None,
 appId: Option[String] = None,
 appAttemptId: Option[String] = None)
-  extends Task[U](stageId, stageAttemptId, partition.inde

spark git commit: [SPARK-18624][SQL] Implicit cast ArrayType(InternalType)

2016-12-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 7a75ee1c9 -> 70d495dce


[SPARK-18624][SQL] Implicit cast ArrayType(InternalType)

## What changes were proposed in this pull request?

Currently `ImplicitTypeCasts` doesn't handle casts between `ArrayType`s, this 
is not convenient, we should add a rule to enable casting from 
`ArrayType(InternalType)` to `ArrayType(newInternalType)`.

Goals:
1. Add a rule to `ImplicitTypeCasts` to enable casting between `ArrayType`s;
2. Simplify `Percentile` and `ApproximatePercentile`.

## How was this patch tested?

Updated test cases in `TypeCoercionSuite`.

Author: jiangxingbo 

Closes #16057 from jiangxb1987/implicit-cast-complex-types.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70d495dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70d495dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70d495dc

Branch: refs/heads/master
Commit: 70d495dcecce8617b7099fc599fe7c43d7eae66e
Parents: 7a75ee1
Author: jiangxingbo 
Authored: Mon Dec 19 21:20:47 2016 +0100
Committer: Herman van Hovell 
Committed: Mon Dec 19 21:20:47 2016 +0100

--
 .../sql/catalyst/analysis/TypeCoercion.scala| 57 +---
 .../spark/sql/catalyst/expressions/Cast.scala   |  6 +--
 .../aggregate/ApproximatePercentile.scala   | 19 +++
 .../expressions/aggregate/Percentile.scala  | 14 ++---
 .../catalyst/analysis/TypeCoercionSuite.scala   | 45 ++--
 5 files changed, 92 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/70d495dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 6662a9e..cd73f9c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -673,48 +673,69 @@ object TypeCoercion {
  * If the expression has an incompatible type that cannot be implicitly 
cast, return None.
  */
 def implicitCast(e: Expression, expectedType: AbstractDataType): 
Option[Expression] = {
-  val inType = e.dataType
+  implicitCast(e.dataType, expectedType).map { dt =>
+if (dt == e.dataType) e else Cast(e, dt)
+  }
+}
 
+private def implicitCast(inType: DataType, expectedType: 
AbstractDataType): Option[DataType] = {
   // Note that ret is nullable to avoid typing a lot of Some(...) in this 
local scope.
   // We wrap immediately an Option after this.
-  @Nullable val ret: Expression = (inType, expectedType) match {
-
+  @Nullable val ret: DataType = (inType, expectedType) match {
 // If the expected type is already a parent of the input type, no need 
to cast.
-case _ if expectedType.acceptsType(inType) => e
+case _ if expectedType.acceptsType(inType) => inType
 
 // Cast null type (usually from null literals) into target types
-case (NullType, target) => Cast(e, target.defaultConcreteType)
+case (NullType, target) => target.defaultConcreteType
 
 // If the function accepts any numeric type and the input is a string, 
we follow the hive
 // convention and cast that input into a double
-case (StringType, NumericType) => Cast(e, 
NumericType.defaultConcreteType)
+case (StringType, NumericType) => NumericType.defaultConcreteType
 
 // Implicit cast among numeric types. When we reach here, input type 
is not acceptable.
 
 // If input is a numeric type but not decimal, and we expect a decimal 
type,
 // cast the input to decimal.
-case (d: NumericType, DecimalType) => Cast(e, DecimalType.forType(d))
+case (d: NumericType, DecimalType) => DecimalType.forType(d)
 // For any other numeric types, implicitly cast to each other, e.g. 
long -> int, int -> long
-case (_: NumericType, target: NumericType) => Cast(e, target)
+case (_: NumericType, target: NumericType) => target
 
 // Implicit cast between date time types
-case (DateType, TimestampType) => Cast(e, TimestampType)
-case (TimestampType, DateType) => Cast(e, DateType)
+case (DateType, TimestampType) => TimestampType
+case (TimestampType, DateType) => DateType
 
 // Implicit cast from/to string
-case (StringType, DecimalType) => Cast(e, DecimalType.SYSTEM_DEFAULT)
-case (StringType, target: NumericType) => Cast(e, target)
-case (StringType, DateType) => Cast(e, DateType)

spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 fc1b25660 -> c1a26b458


[SPARK-18921][SQL] check database existence with Hive.databaseExists instead of 
getDatabase

## What changes were proposed in this pull request?

It's weird that we use `Hive.getDatabase` to check the existence of a database, 
while Hive has a `databaseExists` interface.

What's worse, `Hive.getDatabase` will produce an error message if the database 
doesn't exist, which is annoying when we only want to check the database 
existence.

This PR fixes this and use `Hive.databaseExists` to check database existence.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16332 from cloud-fan/minor.

(cherry picked from commit 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a26b45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a26b45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a26b45

Branch: refs/heads/branch-2.1
Commit: c1a26b458dd353be3ab1a2b3f9bb80809cf63479
Parents: fc1b256
Author: Wenchen Fan 
Authored: Mon Dec 19 11:42:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 11:43:55 2016 -0800

--
 .../apache/spark/sql/hive/HiveExternalCatalog.scala|  2 +-
 .../org/apache/spark/sql/hive/client/HiveClient.scala  |  8 +++-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala  | 12 
 .../apache/spark/sql/hive/client/VersionsSuite.scala   | 13 +++--
 4 files changed, 19 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f67ddc9..f321c45 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   override def databaseExists(db: String): Boolean = withClient {
-client.getDatabaseOption(db).isDefined
+client.databaseExists(db)
   }
 
   override def listDatabases(): Seq[String] = withClient {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8e7c871..0be5b0b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -58,12 +58,10 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it 
doesn't exist */
-  final def getDatabase(name: String): CatalogDatabase = {
-getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
-  }
+  def getDatabase(name: String): CatalogDatabase
 
-  /** Returns the metadata for a given database, or None if it doesn't exist. 
*/
-  def getDatabaseOption(name: String): Option[CatalogDatabase]
+  /** Return whether a table/view with the specified name exists. */
+  def databaseExists(dbName: String): Boolean
 
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db73596..e0f7156 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -300,7 +300,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
-if (getDatabaseOption(databaseName).isDefined) {
+if (databaseExists(databaseName)) {
   state.setCurrentDatabase(databaseName)
 } else {
   throw new NoSuchDatabaseException(databaseName)
@@ -336,14 +336,18 @@ private[hive] class HiveClientImpl(

spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 24482858e -> 7a75ee1c9


[SPARK-18921][SQL] check database existence with Hive.databaseExists instead of 
getDatabase

## What changes were proposed in this pull request?

It's weird that we use `Hive.getDatabase` to check the existence of a database, 
while Hive has a `databaseExists` interface.

What's worse, `Hive.getDatabase` will produce an error message if the database 
doesn't exist, which is annoying when we only want to check the database 
existence.

This PR fixes this and use `Hive.databaseExists` to check database existence.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16332 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a75ee1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a75ee1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a75ee1c

Branch: refs/heads/master
Commit: 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b
Parents: 2448285
Author: Wenchen Fan 
Authored: Mon Dec 19 11:42:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 11:42:59 2016 -0800

--
 .../apache/spark/sql/hive/HiveExternalCatalog.scala|  2 +-
 .../org/apache/spark/sql/hive/client/HiveClient.scala  |  8 +++-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala  | 12 
 .../apache/spark/sql/hive/client/VersionsSuite.scala   | 13 +++--
 4 files changed, 19 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 544f277..9c19a0e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   override def databaseExists(db: String): Boolean = withClient {
-client.getDatabaseOption(db).isDefined
+client.databaseExists(db)
   }
 
   override def listDatabases(): Seq[String] = withClient {

http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 837b6c5..8bdcf31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -58,12 +58,10 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it 
doesn't exist */
-  final def getDatabase(name: String): CatalogDatabase = {
-getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
-  }
+  def getDatabase(name: String): CatalogDatabase
 
-  /** Returns the metadata for a given database, or None if it doesn't exist. 
*/
-  def getDatabaseOption(name: String): Option[CatalogDatabase]
+  /** Return whether a table/view with the specified name exists. */
+  def databaseExists(dbName: String): Boolean
 
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b75f6e9..bacae8a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -300,7 +300,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
-if (getDatabaseOption(databaseName).isDefined) {
+if (databaseExists(databaseName)) {
   state.setCurrentDatabase(databaseName)
 } else {
   throw new NoSuchDatabaseException(databaseName)
@@ -336,14 +336,18 @@ private[hive] class HiveClientImpl(
 Option(database.properties).map(_.asJava).orNull))
   }
 
-  override def getDatabaseOption(na

spark git commit: Fix test case for SubquerySuite.

2016-12-19 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b41668349 -> 2a5ab1490


Fix test case for SubquerySuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a5ab149
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a5ab149
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a5ab149

Branch: refs/heads/branch-2.0
Commit: 2a5ab149005f305b7bfcb160c5023e1a83d2dce1
Parents: b416683
Author: Reynold Xin 
Authored: Mon Dec 19 11:33:31 2016 -0800
Committer: Reynold Xin 
Committed: Mon Dec 19 11:33:31 2016 -0800

--
 sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a5ab149/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index eff8894..dfbfa3f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -59,7 +59,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
   "(select id from range(2) union all select id from range(2))")
 
 // The depth first traversal of the plan tree
-val dfs = Seq("Project", "Filter", "Union", "Project", "Range", "Project", 
"Range", "Range")
+val dfs = Seq("Project", "Filter", "SubqueryAlias", "Union", "Project", 
"Range", "Project",
+  "Range", "Range")
 val numbered = df.queryExecution.analyzed.numberedTreeString.split("\n")
 
 // There should be 8 plan nodes in total


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18700][SQL] Add StripedLock for each table's relation in cache

2016-12-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3080f995c -> fc1b25660


[SPARK-18700][SQL] Add StripedLock for each table's relation in cache

## What changes were proposed in this pull request?

As the scenario describe in 
[SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when 
cachedDataSourceTables invalided, the coming few queries will fetch all 
FileStatus in listLeafFiles function. In the condition of table has many 
partitions, these jobs will occupy much memory of driver finally may cause 
driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the 
whole cachedDataSourceTables, each table's load cache operation protected by it.

## How was this patch tested?

Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and 
check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking 

Closes #16135 from xuanyuanking/SPARK-18700.

(cherry picked from commit 24482858e05bea84cacb41c62be0a9aaa33897ee)
Signed-off-by: Herman van Hovell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc1b2566
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc1b2566
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc1b2566

Branch: refs/heads/branch-2.1
Commit: fc1b25660d8d2ac676c0b020208bcb9b711978c8
Parents: 3080f99
Author: xuanyuanking 
Authored: Mon Dec 19 20:31:43 2016 +0100
Committer: Herman van Hovell 
Committed: Mon Dec 19 20:32:02 2016 +0100

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 134 +++
 .../hive/PartitionedTablePerfStatsSuite.scala   |  31 +
 2 files changed, 106 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc1b2566/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index edbde5d..0407cf6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -32,7 +33,6 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.types._
 
-
 /**
  * Legacy catalog for interacting with the Hive metastore.
  *
@@ -53,6 +53,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   tableIdent.table.toLowerCase)
   }
 
+  /** These locks guard against multiple attempts to instantiate a table, 
which wastes memory. */
+  private val tableCreationLocks = Striped.lazyWeakLock(100)
+
+  /** Acquires a lock on the table cache for the duration of `f`. */
+  private def withTableCreationLock[A](tableName: QualifiedTableName, f: => 
A): A = {
+val lock = tableCreationLocks.get(tableName)
+lock.lock()
+try f finally {
+  lock.unlock()
+}
+  }
+
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
 val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -209,72 +221,76 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 }
   }
 
-  val cached = getCached(
-tableIdentifier,
-rootPaths,
-metastoreRelation,
-metastoreSchema,
-fileFormatClass,
-bucketSpec,
-Some(partitionSchema))
-
-  val logicalRelation = cached.getOrElse {
-val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
-val fileCatalog = {
-  val catalog = new CatalogFileIndex(
-sparkSession, metastoreRelation.catalogTable, sizeInBytes)
-  if (lazyPruningEnabled) {
-catalog
-  } else {
-catalog.filterPartitions(Nil)  // materialize all the partitions 
in memory
+  withTableCreationLock(tableIdentifier, {
+val cached = getCached(
+  tableIdentifier,
+  rootPaths,
+  metastoreRelation,
+  metastoreSchema,
+  fileFormatClass,
+  bucketSpec,
+  Some(partitionSchema))
+
+val logicalRelation = cached.getOrElse {
+  val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
+ 

spark git commit: [SPARK-18700][SQL] Add StripedLock for each table's relation in cache

2016-12-19 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 7db09abb0 -> 24482858e


[SPARK-18700][SQL] Add StripedLock for each table's relation in cache

## What changes were proposed in this pull request?

As the scenario describe in 
[SPARK-18700](https://issues.apache.org/jira/browse/SPARK-18700), when 
cachedDataSourceTables invalided, the coming few queries will fetch all 
FileStatus in listLeafFiles function. In the condition of table has many 
partitions, these jobs will occupy much memory of driver finally may cause 
driver OOM.

In this patch, add StripedLock for each table's relation in cache not for the 
whole cachedDataSourceTables, each table's load cache operation protected by it.

## How was this patch tested?

Add a multi-thread access table test in `PartitionedTablePerfStatsSuite` and 
check it only loading once using metrics in `HiveCatalogMetrics`

Author: xuanyuanking 

Closes #16135 from xuanyuanking/SPARK-18700.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24482858
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24482858
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24482858

Branch: refs/heads/master
Commit: 24482858e05bea84cacb41c62be0a9aaa33897ee
Parents: 7db09ab
Author: xuanyuanking 
Authored: Mon Dec 19 20:31:43 2016 +0100
Committer: Herman van Hovell 
Committed: Mon Dec 19 20:31:43 2016 +0100

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 134 +++
 .../hive/PartitionedTablePerfStatsSuite.scala   |  31 +
 2 files changed, 106 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24482858/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index edbde5d..0407cf6 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
@@ -32,7 +33,6 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.types._
 
-
 /**
  * Legacy catalog for interacting with the Hive metastore.
  *
@@ -53,6 +53,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   tableIdent.table.toLowerCase)
   }
 
+  /** These locks guard against multiple attempts to instantiate a table, 
which wastes memory. */
+  private val tableCreationLocks = Striped.lazyWeakLock(100)
+
+  /** Acquires a lock on the table cache for the duration of `f`. */
+  private def withTableCreationLock[A](tableName: QualifiedTableName, f: => 
A): A = {
+val lock = tableCreationLocks.get(tableName)
+lock.lock()
+try f finally {
+  lock.unlock()
+}
+  }
+
   /** A cache of Spark SQL data source tables that have been accessed. */
   protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
 val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
@@ -209,72 +221,76 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 }
   }
 
-  val cached = getCached(
-tableIdentifier,
-rootPaths,
-metastoreRelation,
-metastoreSchema,
-fileFormatClass,
-bucketSpec,
-Some(partitionSchema))
-
-  val logicalRelation = cached.getOrElse {
-val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
-val fileCatalog = {
-  val catalog = new CatalogFileIndex(
-sparkSession, metastoreRelation.catalogTable, sizeInBytes)
-  if (lazyPruningEnabled) {
-catalog
-  } else {
-catalog.filterPartitions(Nil)  // materialize all the partitions 
in memory
+  withTableCreationLock(tableIdentifier, {
+val cached = getCached(
+  tableIdentifier,
+  rootPaths,
+  metastoreRelation,
+  metastoreSchema,
+  fileFormatClass,
+  bucketSpec,
+  Some(partitionSchema))
+
+val logicalRelation = cached.getOrElse {
+  val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
+  val fileCatalog = {
+val catalog = new CatalogFileIndex(
+  sparkSession, metastore

spark git commit: [SPARK-18356][ML] KMeans should cache RDD before training

2016-12-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 1e5c51f33 -> 7db09abb0


[SPARK-18356][ML] KMeans should cache RDD before training

## What changes were proposed in this pull request?

According to request of Mr. Joseph Bradley , I did this update of my PR 
https://github.com/apache/spark/pull/15965 in order to eliminate the extrat 
fit() method.

jkbradley
## How was this patch tested?
Pass existing tests

Author: Zakaria_Hili 
Author: HILI Zakaria 

Closes #16295 from ZakariaHili/zakbranch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db09abb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db09abb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db09abb

Branch: refs/heads/master
Commit: 7db09abb0168b77697064c69126ee82ca89609a0
Parents: 1e5c51f
Author: Zakaria_Hili 
Authored: Mon Dec 19 10:30:38 2016 +
Committer: Sean Owen 
Committed: Mon Dec 19 10:30:38 2016 +

--
 .../scala/org/apache/spark/ml/clustering/KMeans.scala   | 12 +---
 1 file changed, 5 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7db09abb/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index e168a41..e02b532 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -302,22 +302,19 @@ class KMeans @Since("1.5.0") (
 
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): KMeansModel = {
-val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
-fit(dataset, handlePersistence)
-  }
-
-  @Since("2.2.0")
-  protected def fit(dataset: Dataset[_], handlePersistence: Boolean): 
KMeansModel = {
 transformSchema(dataset.schema, logging = true)
+
+val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
 val instances: RDD[OldVector] = 
dataset.select(col($(featuresCol))).rdd.map {
   case Row(point: Vector) => OldVectors.fromML(point)
 }
+
 if (handlePersistence) {
   instances.persist(StorageLevel.MEMORY_AND_DISK)
 }
+
 val instr = Instrumentation.create(this, instances)
 instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, 
maxIter, seed, tol)
-
 val algo = new MLlibKMeans()
   .setK($(k))
   .setInitializationMode($(initMode))
@@ -329,6 +326,7 @@ class KMeans @Since("1.5.0") (
 val model = copyValues(new KMeansModel(uid, parentModel).setParent(this))
 val summary = new KMeansSummary(
   model.transform(dataset), $(predictionCol), $(featuresCol), $(k))
+
 model.setSummary(Some(summary))
 instr.logSuccess(model)
 if (handlePersistence) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org